You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 16:50:50 UTC

[13/13] git commit: dropping streams-pojo-extensions, BC activity can be extended without it move streams cassandra under contrib move streams-eip-routes under runtimes, renaming streams-runtime-webapp

dropping streams-pojo-extensions, BC activity can be extended without it
move streams cassandra under contrib
move streams-eip-routes under runtimes, renaming streams-runtime-webapp


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/adb43b29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/adb43b29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/adb43b29

Branch: refs/heads/springcleaning
Commit: adb43b29567351e8849385be95806d4be2fcb7e5
Parents: 381d758
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Mar 24 03:07:06 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon Mar 24 03:07:06 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |   8 +-
 streams-cassandra/pom.xml                       | 142 -------------
 .../configuration/CassandraConfiguration.java   |  63 ------
 .../model/CassandraActivityStreamsEntry.java    |  45 ----
 .../CassandraActivityStreamsRepository.java     | 176 ----------------
 .../repository/impl/CassandraKeyspace.java      |  43 ----
 .../impl/CassandraSubscriptionRepository.java   |  69 ------
 .../spring/streams-cassandra-context.xml        |  25 ---
 .../CassandraActivityStreamsRepositoryTest.java |  99 ---------
 .../impl/CassandraActivitySubscriptionTest.java |  54 -----
 streams-config-graph/pom.xml                    |  71 -------
 .../config/graph/PipelineGraphConfigurator.java |  88 --------
 .../streams-persist-cassandra/pom.xml           | 142 +++++++++++++
 .../configuration/CassandraConfiguration.java   |  63 ++++++
 .../model/CassandraActivityStreamsEntry.java    |  43 ++++
 .../CassandraActivityStreamsRepository.java     | 176 ++++++++++++++++
 .../repository/impl/CassandraKeyspace.java      |  43 ++++
 .../impl/CassandraSubscriptionRepository.java   |  69 ++++++
 .../spring/streams-cassandra-context.xml        |  25 +++
 .../CassandraActivityStreamsRepositoryTest.java |  99 +++++++++
 .../impl/CassandraActivitySubscriptionTest.java |  54 +++++
 streams-eip-routes/ReadMe.txt                   |  32 ---
 streams-eip-routes/pom.xml                      | 209 -------------------
 .../aggregation/ActivityAggregator.java         |  59 ------
 .../configuration/EipConfigurator.java          | 201 ------------------
 .../ActivityPublisherRegistrationProcessor.java |  73 -------
 ...yStreamsSubscriberRegistrationProcessor.java |  94 ---------
 .../routers/ActivityConsumerRouteBuilder.java   |  32 ---
 .../ActivityStreamsSubscriberRouteBuilder.java  |  32 ---
 .../routers/impl/ActivityConsumerRouter.java    | 144 -------------
 .../impl/ActivityStreamsSubscriberRouter.java   | 142 -------------
 .../messaging/service/ActivityService.java      |  31 ---
 .../messaging/service/SubscriptionService.java  |  29 ---
 .../service/impl/CassandraActivityService.java  |  96 ---------
 .../impl/CassandraSubscriptionService.java      |  43 ----
 .../META-INF/spring/propertiesLoader.xml        |  35 ----
 .../spring/streams-eip-applicationContext.xml   | 113 ----------
 .../streams-eip-osgi-component-import.xml       |  38 ----
 .../META-INF/spring/streamsCamelContext.xml     |  96 ---------
 .../main/resources/META-INF/streams.properties  |  41 ----
 .../impl/CassandraActivityServiceTest.java      |  87 --------
 streams-pojo-extensions/pom.xml                 | 149 -------------
 .../org/apache/streams/ActivityExtended.json    |  17 --
 .../pojo/test/ActivityExtendedSerDeTest.java    | 108 ----------
 .../test/resources/gnip_twitter_extended.json   | 146 -------------
 streams-runtimes/pom.xml                        |   1 +
 .../streams-runtime-webapp/ReadMe.txt           |  32 +++
 streams-runtimes/streams-runtime-webapp/pom.xml | 209 +++++++++++++++++++
 .../aggregation/ActivityAggregator.java         |  59 ++++++
 .../configuration/EipConfigurator.java          | 201 ++++++++++++++++++
 .../ActivityPublisherRegistrationProcessor.java |  72 +++++++
 ...yStreamsSubscriberRegistrationProcessor.java |  93 +++++++++
 .../routers/ActivityConsumerRouteBuilder.java   |  32 +++
 .../ActivityStreamsSubscriberRouteBuilder.java  |  32 +++
 .../routers/impl/ActivityConsumerRouter.java    | 144 +++++++++++++
 .../impl/ActivityStreamsSubscriberRouter.java   | 141 +++++++++++++
 .../messaging/service/ActivityService.java      |  31 +++
 .../messaging/service/SubscriptionService.java  |  29 +++
 .../service/impl/CassandraActivityService.java  |  96 +++++++++
 .../impl/CassandraSubscriptionService.java      |  43 ++++
 .../META-INF/spring/propertiesLoader.xml        |  35 ++++
 .../spring/streams-eip-applicationContext.xml   | 113 ++++++++++
 .../streams-eip-osgi-component-import.xml       |  38 ++++
 .../META-INF/spring/streamsCamelContext.xml     |  96 +++++++++
 .../main/resources/META-INF/streams.properties  |  41 ++++
 .../impl/CassandraActivityServiceTest.java      |  86 ++++++++
 66 files changed, 2340 insertions(+), 2928 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c552cf1..a476c07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,17 +88,13 @@
         <module>poms</module>
         <module>provision</module>
         <module>streams-osgi-components</module>
-        <module>streams-eip-routes</module>
-        <module>streams-cassandra</module>
-        <module>streams-web</module>
+        <module>streams-core</module>
         <module>streams-config</module>
-        <module>streams-config-graph</module>
         <module>streams-pojo</module>
         <module>streams-util</module>
-        <module>streams-pojo-extensions</module>
         <module>streams-contrib</module>
-        <module>streams-core</module>
         <module>streams-runtimes</module>
+        <module>streams-web</module>
     </modules>
 
     <packaging>pom</packaging>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-cassandra/pom.xml b/streams-cassandra/pom.xml
deleted file mode 100644
index 4837649..0000000
--- a/streams-cassandra/pom.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xmlns="http://maven.apache.org/POM/4.0.0"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.streams</groupId>
-        <artifactId>streams-project</artifactId>
-        <version>0.1-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>streams-cassandra</artifactId>
-
-    <name>${bundle.symbolicName} [${bundle.namespace}]</name>
-
-    <properties>
-        <bundle.symbolicName>streams-cassandra</bundle.symbolicName>
-        <bundle.namespace>org.apache.streams</bundle.namespace>
-        <easymock.version>3.2</easymock.version>
-    </properties>
-
-    <packaging>bundle</packaging>
-    <build>
-        <resources>
-        <resource>
-            <directory>src/main/resources</directory>
-        </resource>
-
-            <resource>
-                <directory>.</directory>
-                <includes>
-                    <include>plugin.xml</include>
-                    <include>plugin.properties</include>
-                    <include>icons/**</include>
-                </includes>
-            </resource>
-        </resources>
-    <plugins>
-        <plugin>
-            <groupId>org.ops4j</groupId>
-            <artifactId>maven-pax-plugin</artifactId>
-            <!--
-             | enable improved OSGi compilation support for the bundle life-cycle.
-             | to switch back to the standard bundle life-cycle, move this setting
-             | down to the maven-bundle-plugin section
-            -->
-            <extensions>true</extensions>
-        </plugin>
-        <plugin>
-            <groupId>org.apache.felix</groupId>
-            <artifactId>maven-bundle-plugin</artifactId>
-            <version>1.4.3</version>
-            <!--
-             | the following instructions build a simple set of public/private classes into an OSGi bundle
-            -->
-            <configuration>
-                <instructions>
-                    <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
-                    <Bundle-Version>${project.version}</Bundle-Version>
-                    <Export-Package>
-                        ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model, org.apache.streams.cassandra.configuration
-                    </Export-Package>
-                    <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model, ${bundle.namespace}.cassandra.configuration </Private-Package>
-                    <Import-Package>
-                        org.apache.rave.model,org.apache.rave.portal.model.impl,
-                        com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
-                        javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map,
-                        org.apache.commons.lang,
-                        org.apache.streams.osgi.components.activitysubscriber,
-                        org.springframework.beans.factory.annotation, org.springframework.stereotype
-                    </Import-Package>
-                </instructions>
-            </configuration>
-        </plugin>
-    </plugins>
-    </build>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.rave</groupId>
-            <artifactId>rave-core-api</artifactId>
-            <version>${rave.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.rave</groupId>
-            <artifactId>rave-core</artifactId>
-            <version>${rave.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.8.2</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.datastax.cassandra</groupId>
-            <artifactId>cassandra-driver-core</artifactId>
-            <version>${datastax.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.jboss.netty</groupId>
-            <artifactId>netty</artifactId>
-            <version>3.2.9.Final</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams.osgi.components</groupId>
-            <artifactId>activity-subscriber</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.easymock</groupId>
-            <artifactId>easymock</artifactId>
-            <version>${easymock.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-    </dependencies>
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
deleted file mode 100644
index 39e74a7..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.streams.cassandra.configuration;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CassandraConfiguration {
-    @Value("${keyspaceName}")
-    private String keyspaceName;
-
-    @Value("${activitystreamsColumnFamilyName}")
-    private String activitystreamsColumnFamilyName;
-
-    @Value("${subscriptionColumnFamilyName}")
-    private String subscriptionColumnFamilyName;
-
-    @Value("${publisherColumnFamilyName}")
-    private String publisherColumnFamilyName;
-
-    @Value("${cassandraPort}")
-    private String cassandraPort;
-
-    public String getKeyspaceName() {
-        return keyspaceName;
-    }
-
-    public void setKeyspaceName(String keyspaceName) {
-        this.keyspaceName = keyspaceName;
-    }
-
-    public String getActivitystreamsColumnFamilyName() {
-        return activitystreamsColumnFamilyName;
-    }
-
-    public void setActivitystreamsColumnFamilyName(String activitystreamsColumnFamilyName) {
-        this.activitystreamsColumnFamilyName = activitystreamsColumnFamilyName;
-    }
-
-    public String getSubscriptionColumnFamilyName() {
-        return subscriptionColumnFamilyName;
-    }
-
-    public void setSubscriptionColumnFamilyName(String subscriptionColumnFamilyName) {
-        this.subscriptionColumnFamilyName = subscriptionColumnFamilyName;
-    }
-
-    public String getPublisherColumnFamilyName() {
-        return publisherColumnFamilyName;
-    }
-
-    public void setPublisherColumnFamilyName(String publisherColumnFamilyName) {
-        this.publisherColumnFamilyName = publisherColumnFamilyName;
-    }
-
-    public String getCassandraPort() {
-        return cassandraPort;
-    }
-
-    public void setCassandraPort(String cassandraPort) {
-        this.cassandraPort = cassandraPort;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
deleted file mode 100644
index 2a2321a..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.model;
-
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.codehaus.jackson.map.annotate.JsonDeserialize;
-
-import java.util.Date;
-
-public class CassandraActivityStreamsEntry extends ActivityStreamsEntryImpl implements Comparable<CassandraActivityStreamsEntry>{
-
-    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
-    private ActivityStreamsObject object;
-
-    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
-    private ActivityStreamsObject target;
-
-    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
-    private ActivityStreamsObject actor;
-
-    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
-    private ActivityStreamsObject provider;
-
-    public int compareTo(CassandraActivityStreamsEntry entry){
-        return (this.getPublished()).compareTo(entry.getPublished());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
deleted file mode 100644
index 56e5416..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-
-public class CassandraActivityStreamsRepository {
-
-    private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
-
-    private CassandraKeyspace keyspace;
-    private CassandraConfiguration configuration;
-
-    @Autowired
-    public CassandraActivityStreamsRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
-        this.configuration = configuration;
-        this.keyspace = keyspace;
-
-        try {
-            keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
-                    "id text, " +
-                    "published timestamp, " +
-                    "verb text, " +
-                    "tags text, " +
-
-                    "actor_displayname text, " +
-                    "actor_id text, " +
-                    "actor_url text, " +
-                    "actor_objecttype text, " +
-
-                    "target_displayname text, " +
-                    "target_id text, " +
-                    "target_url text, " +
-
-                    "provider_url text, " +
-
-                    "object_url text, " +
-                    "object_displayname text, " +
-                    "object_id text, " +
-                    "object_objecttype text, " +
-
-                    "PRIMARY KEY (id, tags, published));");
-        } catch (AlreadyExistsException ignored) {
-        }
-    }
-
-    public void save(ActivityStreamsEntry entry) {
-        String sql = "INSERT INTO " + configuration.getActivitystreamsColumnFamilyName() + " (" +
-                "id, published, verb, tags, " +
-                "actor_displayname, actor_objecttype, actor_id, actor_url, " +
-                "target_displayname, target_id, target_url, " +
-                "provider_url, " +
-                "object_displayname, object_objecttype, object_id, object_url) " +
-                "VALUES ('" +
-                entry.getId() + "','" +
-                entry.getPublished().getTime() + "','" +
-                entry.getVerb() + "','" +
-                entry.getTags() + "','" +
-
-                entry.getActor().getDisplayName() + "','" +
-                entry.getActor().getObjectType() + "','" +
-                entry.getActor().getId() + "','" +
-                entry.getActor().getUrl() + "','" +
-
-                entry.getTarget().getDisplayName() + "','" +
-                entry.getTarget().getId() + "','" +
-                entry.getTarget().getUrl() + "','" +
-
-                entry.getProvider().getUrl() + "','" +
-
-                entry.getObject().getDisplayName() + "','" +
-                entry.getObject().getObjectType() + "','" +
-                entry.getObject().getId() + "','" +
-                entry.getObject().getUrl() +
-
-                "')";
-        keyspace.getSession().execute(sql);
-    }
-
-    public List<CassandraActivityStreamsEntry> getActivitiesForFilters(List<String> filters, Date lastUpdated) {
-        List<CassandraActivityStreamsEntry> results = new ArrayList<CassandraActivityStreamsEntry>();
-
-        for (String tag : filters) {
-            String cql = "SELECT * FROM " + configuration.getActivitystreamsColumnFamilyName() + " WHERE ";
-
-            //add filters
-            cql = cql + " tags = '" + tag + "' AND ";
-
-            //specify last modified
-            cql = cql + "published > " + lastUpdated.getTime() + " ALLOW FILTERING";
-
-            //execute the cql query and store the results
-            ResultSet set = keyspace.getSession().execute(cql);
-
-            //iterate through the results and create a new ActivityStreamsEntry for every result returned
-
-            for (Row row : set) {
-                CassandraActivityStreamsEntry entry = new CassandraActivityStreamsEntry();
-                ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
-                ActivityStreamsObject target = new ActivityStreamsObjectImpl();
-                ActivityStreamsObject object = new ActivityStreamsObjectImpl();
-                ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
-
-                actor.setDisplayName(row.getString("actor_displayname"));
-                actor.setId(row.getString("actor_id"));
-                actor.setObjectType(row.getString("actor_objecttype"));
-                actor.setUrl(row.getString("actor_url"));
-
-                target.setDisplayName(row.getString("target_displayname"));
-                target.setId(row.getString("target_id"));
-                target.setUrl(row.getString("target_url"));
-
-                object.setDisplayName(row.getString("object_displayname"));
-                object.setObjectType(row.getString("object_objecttype"));
-                object.setUrl(row.getString("object_url"));
-                object.setId(row.getString("object_id"));
-
-                provider.setUrl(row.getString("provider_url"));
-
-                entry.setPublished(row.getDate("published"));
-                entry.setVerb(row.getString("verb"));
-                entry.setId(row.getString("id"));
-                entry.setTags(row.getString("tags"));
-                entry.setActor(actor);
-                entry.setTarget(target);
-                entry.setObject(object);
-                entry.setProvider(provider);
-
-                results.add(entry);
-            }
-        }
-
-        return results;
-    }
-
-    public void dropTable(String table) {
-        String cql = "DROP TABLE " + table;
-        keyspace.getSession().execute(cql);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
deleted file mode 100644
index 0551bf2..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class CassandraKeyspace {
-    private CassandraConfiguration configuration;
-    private Cluster cluster;
-    private Session session;
-
-    @Autowired
-    public CassandraKeyspace(CassandraConfiguration configuration){
-        this.configuration = configuration;
-
-        cluster = Cluster.builder().addContactPoint(configuration.getCassandraPort()).build();
-        session = cluster.connect();
-
-        //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS
-        try {
-            session.execute("CREATE KEYSPACE " + configuration.getKeyspaceName() + " WITH replication = { 'class': 'SimpleStrategy','replication_factor' : 1 };");
-        } catch (AlreadyExistsException ignored) {
-        }
-
-        //connect to the keyspace
-        session = cluster.connect(configuration.getKeyspaceName());
-    }
-
-    public Session getSession(){
-        return session;
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        try {
-            cluster.shutdown();
-        } finally {
-            super.finalize();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
deleted file mode 100644
index f5fe471..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class CassandraSubscriptionRepository {
-    private static final Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
-
-    private CassandraKeyspace keyspace;
-    private CassandraConfiguration configuration;
-
-    @Autowired
-    public CassandraSubscriptionRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
-        this.keyspace = keyspace;
-        this.configuration = configuration;
-
-        try {
-            keyspace.getSession().execute("CREATE TABLE " + configuration.getSubscriptionColumnFamilyName() + " (" +
-                    "id text, " +
-                    "filters text, " +
-
-                    "PRIMARY KEY (id));");
-        } catch (AlreadyExistsException ignored) {
-        }
-    }
-
-    public String getFilters(String id){
-        String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName()  + " WHERE id = '" + id+"';";
-
-        ResultSet set = keyspace.getSession().execute(cql);
-
-        return set.one().getString("filters");
-    }
-
-    public void save(ActivityStreamsSubscription subscription){
-        String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName()  + " (" +
-                "id, filters) " +
-                "VALUES ('" +
-                subscription.getAuthToken() + "','" +
-                StringUtils.join(subscription.getFilters(), " ") +
-
-                "')";
-        keyspace.getSession().execute(cql);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml b/streams-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
deleted file mode 100644
index 842c918..0000000
--- a/streams-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
+++ /dev/null
@@ -1,25 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-<beans
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xmlns="http://www.springframework.org/schema/beans"
-        xmlns:context="http://www.springframework.org/schema/context"
-        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
-        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
-
-</beans>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java b/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
deleted file mode 100644
index 978af10..0000000
--- a/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.ResultSet;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import static org.easymock.EasyMock.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-public class CassandraActivityStreamsRepositoryTest {
-
-    private CassandraActivityStreamsRepository repository;
-
-
-    @Before
-    public void setup() {
-        CassandraKeyspace keyspace = createMock(CassandraKeyspace.class);
-        CassandraConfiguration configuration = createMock(CassandraConfiguration.class);
-        repository = new CassandraActivityStreamsRepository(keyspace, configuration);
-    }
-
-    @Ignore
-    @Test
-    public void saveActivity() {
-        ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
-        ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
-        ActivityStreamsObject target = new ActivityStreamsObjectImpl();
-        ActivityStreamsObject object = new ActivityStreamsObjectImpl();
-        ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
-
-        actor.setId("actorid1");
-        actor.setUrl("actorurl1");
-        actor.setDisplayName("actorname1");
-
-        target.setId("targetid1");
-        target.setUrl("targeturl1");
-        target.setDisplayName("r501");
-
-        provider.setUrl("providerurl");
-
-        object.setId("objectid1");
-        object.setDisplayName("objectname1");
-
-        entry.setId("dink");
-        entry.setVerb("verb1");
-        entry.setTags("r501");
-        entry.setProvider(provider);
-        Date d = new Date();
-        entry.setPublished(d);
-        entry.setActor(actor);
-        entry.setObject(object);
-        entry.setTarget(target);
-
-        repository.save(entry);
-    }
-
-    @Ignore
-    @Test
-    public void getActivity() {
-        String cql = "tags";
-        String other = "r501";
-        List<String> f = Arrays.asList(cql, other);
-        Date d = new Date(0);
-        List<CassandraActivityStreamsEntry> results = repository.getActivitiesForFilters(f,d);
-    }
-
-    @Ignore
-    @Test
-    public void dropTableTest(){
-        repository.dropTable("coltest");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java b/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
deleted file mode 100644
index 2a90462..0000000
--- a/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriptionImpl;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-public class CassandraActivitySubscriptionTest {
-
-    public CassandraSubscriptionRepository repository;
-
-
-    @Before
-    public void setup() {
-//        repository = new CassandraSubscriptionRepository();
-    }
-
-    @Ignore
-    @Test
-    public void saveTest(){
-        ActivityStreamsSubscription subscription = new ActivityStreamsSubscriptionImpl();
-        subscription.setFilters(Arrays.asList("thisis", "atest"));
-        subscription.setAuthToken("subid");
-
-        repository.save(subscription);
-    }
-
-    @Ignore
-    @Test
-    public void getTest(){
-        String filters = repository.getFilters("subid");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-config-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-config-graph/pom.xml b/streams-config-graph/pom.xml
deleted file mode 100644
index d4a3dbb..0000000
--- a/streams-config-graph/pom.xml
+++ /dev/null
@@ -1,71 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.streams</groupId>
-        <artifactId>streams-project</artifactId>
-        <version>0.1-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>streams-config-graph</artifactId>
-
-    <dependencies>
-        <dependency>
-            <artifactId>gs-core</artifactId>
-            <groupId>org.graphstream</groupId>
-            <version>1.2</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-            </resource>
-        </resources>
-        <testResources>
-            <testResource>
-                <directory>src/test/resources</directory>
-            </testResource>
-        </testResources>
-    </build>
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-config-graph/src/main/java/org/apache/streams/config/graph/PipelineGraphConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-config-graph/src/main/java/org/apache/streams/config/graph/PipelineGraphConfigurator.java b/streams-config-graph/src/main/java/org/apache/streams/config/graph/PipelineGraphConfigurator.java
deleted file mode 100644
index 3d7b6dd..0000000
--- a/streams-config-graph/src/main/java/org/apache/streams/config/graph/PipelineGraphConfigurator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.config.graph;
-
-import org.graphstream.graph.Graph;
-import org.graphstream.graph.implementations.SingleGraph;
-import org.graphstream.stream.file.FileSource;
-import org.graphstream.stream.file.FileSourceFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Enumeration;
-
-/**
- * Created with IntelliJ IDEA.
- * User: sblackmon
- * Date: 9/23/13
- * Time: 10:44 AM
- * To change this template use File | Settings | File Templates.
- */
-public class PipelineGraphConfigurator {
-
-    public static Graph pipeline = loadPipeline();
-
-    private static Graph loadPipeline() {
-
-        Graph pipeline = new SingleGraph("pipelines");
-
-        // this class looks for any pipelines specified with a graph definition
-        // each is loaded into the execution graph
-        // the application is responsible for launching each
-        Enumeration<URL> pipelineFiles;
-        try {
-            pipelineFiles = PipelineGraphConfigurator.class.getClassLoader().getResources("*.dot");
-
-            for( URL pipelineFile : Collections.list(pipelineFiles) ) {
-                File file = new File(pipelineFile.toURI());
-                String filePath = file.getAbsolutePath();
-                FileSource fileSource = FileSourceFactory.sourceFor(filePath);
-
-                fileSource.addSink(pipeline);
-
-                try {
-                    fileSource.begin(filePath);
-
-                    while (fileSource.nextEvents()) {
-                        // Optionally some code here ...
-                    }
-                } catch( IOException e) {
-                    e.printStackTrace();
-                }
-
-                try {
-                    fileSource.end();
-                } catch( IOException e) {
-                    e.printStackTrace();
-                } finally {
-                    fileSource.removeSink(pipeline);
-                }
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        } catch (URISyntaxException e) {
-            e.printStackTrace();
-        }
-
-        return pipeline;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml
new file mode 100644
index 0000000..fd6711f
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/pom.xml
@@ -0,0 +1,142 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streams-persist-cassandra</artifactId>
+
+    <name>${bundle.symbolicName} [${bundle.namespace}]</name>
+
+    <properties>
+        <bundle.symbolicName>streams-persist-cassandra</bundle.symbolicName>
+        <bundle.namespace>org.apache.streams</bundle.namespace>
+        <easymock.version>3.2</easymock.version>
+    </properties>
+
+    <packaging>bundle</packaging>
+    <build>
+        <resources>
+        <resource>
+            <directory>src/main/resources</directory>
+        </resource>
+
+            <resource>
+                <directory>.</directory>
+                <includes>
+                    <include>plugin.xml</include>
+                    <include>plugin.properties</include>
+                    <include>icons/**</include>
+                </includes>
+            </resource>
+        </resources>
+    <plugins>
+        <plugin>
+            <groupId>org.ops4j</groupId>
+            <artifactId>maven-pax-plugin</artifactId>
+            <!--
+             | enable improved OSGi compilation support for the bundle life-cycle.
+             | to switch back to the standard bundle life-cycle, move this setting
+             | down to the maven-bundle-plugin section
+            -->
+            <extensions>true</extensions>
+        </plugin>
+        <plugin>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>maven-bundle-plugin</artifactId>
+            <version>1.4.3</version>
+            <!--
+             | the following instructions build a simple set of public/private classes into an OSGi bundle
+            -->
+            <configuration>
+                <instructions>
+                    <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
+                    <Bundle-Version>${project.version}</Bundle-Version>
+                    <Export-Package>
+                        ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model, org.apache.streams.cassandra.configuration
+                    </Export-Package>
+                    <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model, ${bundle.namespace}.cassandra.configuration </Private-Package>
+                    <Import-Package>
+                        org.apache.rave.model,org.apache.rave.portal.model.impl,
+                        com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
+                        javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map,
+                        org.apache.commons.lang,
+                        org.apache.streams.osgi.components.activitysubscriber,
+                        org.springframework.beans.factory.annotation, org.springframework.stereotype
+                    </Import-Package>
+                </instructions>
+            </configuration>
+        </plugin>
+    </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rave</groupId>
+            <artifactId>rave-core-api</artifactId>
+            <version>${rave.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rave</groupId>
+            <artifactId>rave-core</artifactId>
+            <version>${rave.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.8.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+            <version>${datastax.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jboss.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>3.2.9.Final</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams.osgi.components</groupId>
+            <artifactId>activity-subscriber</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>${easymock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
new file mode 100644
index 0000000..39e74a7
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
@@ -0,0 +1,63 @@
+package org.apache.streams.cassandra.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class CassandraConfiguration {
+    @Value("${keyspaceName}")
+    private String keyspaceName;
+
+    @Value("${activitystreamsColumnFamilyName}")
+    private String activitystreamsColumnFamilyName;
+
+    @Value("${subscriptionColumnFamilyName}")
+    private String subscriptionColumnFamilyName;
+
+    @Value("${publisherColumnFamilyName}")
+    private String publisherColumnFamilyName;
+
+    @Value("${cassandraPort}")
+    private String cassandraPort;
+
+    public String getKeyspaceName() {
+        return keyspaceName;
+    }
+
+    public void setKeyspaceName(String keyspaceName) {
+        this.keyspaceName = keyspaceName;
+    }
+
+    public String getActivitystreamsColumnFamilyName() {
+        return activitystreamsColumnFamilyName;
+    }
+
+    public void setActivitystreamsColumnFamilyName(String activitystreamsColumnFamilyName) {
+        this.activitystreamsColumnFamilyName = activitystreamsColumnFamilyName;
+    }
+
+    public String getSubscriptionColumnFamilyName() {
+        return subscriptionColumnFamilyName;
+    }
+
+    public void setSubscriptionColumnFamilyName(String subscriptionColumnFamilyName) {
+        this.subscriptionColumnFamilyName = subscriptionColumnFamilyName;
+    }
+
+    public String getPublisherColumnFamilyName() {
+        return publisherColumnFamilyName;
+    }
+
+    public void setPublisherColumnFamilyName(String publisherColumnFamilyName) {
+        this.publisherColumnFamilyName = publisherColumnFamilyName;
+    }
+
+    public String getCassandraPort() {
+        return cassandraPort;
+    }
+
+    public void setCassandraPort(String cassandraPort) {
+        this.cassandraPort = cassandraPort;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
new file mode 100644
index 0000000..c97fb82
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.model;
+
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.codehaus.jackson.map.annotate.JsonDeserialize;
+
+public class CassandraActivityStreamsEntry extends Activity implements Comparable<CassandraActivityStreamsEntry>{
+
+    @JsonDeserialize(as=ActivityObject.class)
+    private ActivityObject object;
+
+    @JsonDeserialize(as=ActivityObject.class)
+    private ActivityObject target;
+
+    @JsonDeserialize(as=Actor.class)
+    private Actor actor;
+
+    @JsonDeserialize(as=String.class)
+    private String provider;
+
+    public int compareTo(CassandraActivityStreamsEntry entry){
+        return (this.getPublished()).compareTo(entry.getPublished());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
new file mode 100644
index 0000000..56e5416
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.repository.impl;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
+import org.apache.streams.cassandra.configuration.CassandraConfiguration;
+import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+public class CassandraActivityStreamsRepository {
+
+    private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
+
+    private CassandraKeyspace keyspace;
+    private CassandraConfiguration configuration;
+
+    @Autowired
+    public CassandraActivityStreamsRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
+        this.configuration = configuration;
+        this.keyspace = keyspace;
+
+        try {
+            keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
+                    "id text, " +
+                    "published timestamp, " +
+                    "verb text, " +
+                    "tags text, " +
+
+                    "actor_displayname text, " +
+                    "actor_id text, " +
+                    "actor_url text, " +
+                    "actor_objecttype text, " +
+
+                    "target_displayname text, " +
+                    "target_id text, " +
+                    "target_url text, " +
+
+                    "provider_url text, " +
+
+                    "object_url text, " +
+                    "object_displayname text, " +
+                    "object_id text, " +
+                    "object_objecttype text, " +
+
+                    "PRIMARY KEY (id, tags, published));");
+        } catch (AlreadyExistsException ignored) {
+        }
+    }
+
+    public void save(ActivityStreamsEntry entry) {
+        String sql = "INSERT INTO " + configuration.getActivitystreamsColumnFamilyName() + " (" +
+                "id, published, verb, tags, " +
+                "actor_displayname, actor_objecttype, actor_id, actor_url, " +
+                "target_displayname, target_id, target_url, " +
+                "provider_url, " +
+                "object_displayname, object_objecttype, object_id, object_url) " +
+                "VALUES ('" +
+                entry.getId() + "','" +
+                entry.getPublished().getTime() + "','" +
+                entry.getVerb() + "','" +
+                entry.getTags() + "','" +
+
+                entry.getActor().getDisplayName() + "','" +
+                entry.getActor().getObjectType() + "','" +
+                entry.getActor().getId() + "','" +
+                entry.getActor().getUrl() + "','" +
+
+                entry.getTarget().getDisplayName() + "','" +
+                entry.getTarget().getId() + "','" +
+                entry.getTarget().getUrl() + "','" +
+
+                entry.getProvider().getUrl() + "','" +
+
+                entry.getObject().getDisplayName() + "','" +
+                entry.getObject().getObjectType() + "','" +
+                entry.getObject().getId() + "','" +
+                entry.getObject().getUrl() +
+
+                "')";
+        keyspace.getSession().execute(sql);
+    }
+
+    public List<CassandraActivityStreamsEntry> getActivitiesForFilters(List<String> filters, Date lastUpdated) {
+        List<CassandraActivityStreamsEntry> results = new ArrayList<CassandraActivityStreamsEntry>();
+
+        for (String tag : filters) {
+            String cql = "SELECT * FROM " + configuration.getActivitystreamsColumnFamilyName() + " WHERE ";
+
+            //add filters
+            cql = cql + " tags = '" + tag + "' AND ";
+
+            //specify last modified
+            cql = cql + "published > " + lastUpdated.getTime() + " ALLOW FILTERING";
+
+            //execute the cql query and store the results
+            ResultSet set = keyspace.getSession().execute(cql);
+
+            //iterate through the results and create a new ActivityStreamsEntry for every result returned
+
+            for (Row row : set) {
+                CassandraActivityStreamsEntry entry = new CassandraActivityStreamsEntry();
+                ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
+                ActivityStreamsObject target = new ActivityStreamsObjectImpl();
+                ActivityStreamsObject object = new ActivityStreamsObjectImpl();
+                ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
+
+                actor.setDisplayName(row.getString("actor_displayname"));
+                actor.setId(row.getString("actor_id"));
+                actor.setObjectType(row.getString("actor_objecttype"));
+                actor.setUrl(row.getString("actor_url"));
+
+                target.setDisplayName(row.getString("target_displayname"));
+                target.setId(row.getString("target_id"));
+                target.setUrl(row.getString("target_url"));
+
+                object.setDisplayName(row.getString("object_displayname"));
+                object.setObjectType(row.getString("object_objecttype"));
+                object.setUrl(row.getString("object_url"));
+                object.setId(row.getString("object_id"));
+
+                provider.setUrl(row.getString("provider_url"));
+
+                entry.setPublished(row.getDate("published"));
+                entry.setVerb(row.getString("verb"));
+                entry.setId(row.getString("id"));
+                entry.setTags(row.getString("tags"));
+                entry.setActor(actor);
+                entry.setTarget(target);
+                entry.setObject(object);
+                entry.setProvider(provider);
+
+                results.add(entry);
+            }
+        }
+
+        return results;
+    }
+
+    public void dropTable(String table) {
+        String cql = "DROP TABLE " + table;
+        keyspace.getSession().execute(cql);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
new file mode 100644
index 0000000..0551bf2
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
@@ -0,0 +1,43 @@
+package org.apache.streams.cassandra.repository.impl;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import org.apache.streams.cassandra.configuration.CassandraConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class CassandraKeyspace {
+    private CassandraConfiguration configuration;
+    private Cluster cluster;
+    private Session session;
+
+    @Autowired
+    public CassandraKeyspace(CassandraConfiguration configuration){
+        this.configuration = configuration;
+
+        cluster = Cluster.builder().addContactPoint(configuration.getCassandraPort()).build();
+        session = cluster.connect();
+
+        //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS
+        try {
+            session.execute("CREATE KEYSPACE " + configuration.getKeyspaceName() + " WITH replication = { 'class': 'SimpleStrategy','replication_factor' : 1 };");
+        } catch (AlreadyExistsException ignored) {
+        }
+
+        //connect to the keyspace
+        session = cluster.connect(configuration.getKeyspaceName());
+    }
+
+    public Session getSession(){
+        return session;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        try {
+            cluster.shutdown();
+        } finally {
+            super.finalize();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
new file mode 100644
index 0000000..f5fe471
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.repository.impl;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.cassandra.configuration.CassandraConfiguration;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class CassandraSubscriptionRepository {
+    private static final Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
+
+    private CassandraKeyspace keyspace;
+    private CassandraConfiguration configuration;
+
+    @Autowired
+    public CassandraSubscriptionRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
+        this.keyspace = keyspace;
+        this.configuration = configuration;
+
+        try {
+            keyspace.getSession().execute("CREATE TABLE " + configuration.getSubscriptionColumnFamilyName() + " (" +
+                    "id text, " +
+                    "filters text, " +
+
+                    "PRIMARY KEY (id));");
+        } catch (AlreadyExistsException ignored) {
+        }
+    }
+
+    public String getFilters(String id){
+        String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName()  + " WHERE id = '" + id+"';";
+
+        ResultSet set = keyspace.getSession().execute(cql);
+
+        return set.one().getString("filters");
+    }
+
+    public void save(ActivityStreamsSubscription subscription){
+        String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName()  + " (" +
+                "id, filters) " +
+                "VALUES ('" +
+                subscription.getAuthToken() + "','" +
+                StringUtils.join(subscription.getFilters(), " ") +
+
+                "')";
+        keyspace.getSession().execute(cql);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
new file mode 100644
index 0000000..842c918
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
@@ -0,0 +1,25 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<beans
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:context="http://www.springframework.org/schema/context"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
new file mode 100644
index 0000000..978af10
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.repository.impl;
+
+import com.datastax.driver.core.ResultSet;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
+import org.apache.streams.cassandra.configuration.CassandraConfiguration;
+import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import static org.easymock.EasyMock.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+public class CassandraActivityStreamsRepositoryTest {
+
+    private CassandraActivityStreamsRepository repository;
+
+
+    @Before
+    public void setup() {
+        CassandraKeyspace keyspace = createMock(CassandraKeyspace.class);
+        CassandraConfiguration configuration = createMock(CassandraConfiguration.class);
+        repository = new CassandraActivityStreamsRepository(keyspace, configuration);
+    }
+
+    @Ignore
+    @Test
+    public void saveActivity() {
+        ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
+        ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
+        ActivityStreamsObject target = new ActivityStreamsObjectImpl();
+        ActivityStreamsObject object = new ActivityStreamsObjectImpl();
+        ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
+
+        actor.setId("actorid1");
+        actor.setUrl("actorurl1");
+        actor.setDisplayName("actorname1");
+
+        target.setId("targetid1");
+        target.setUrl("targeturl1");
+        target.setDisplayName("r501");
+
+        provider.setUrl("providerurl");
+
+        object.setId("objectid1");
+        object.setDisplayName("objectname1");
+
+        entry.setId("dink");
+        entry.setVerb("verb1");
+        entry.setTags("r501");
+        entry.setProvider(provider);
+        Date d = new Date();
+        entry.setPublished(d);
+        entry.setActor(actor);
+        entry.setObject(object);
+        entry.setTarget(target);
+
+        repository.save(entry);
+    }
+
+    @Ignore
+    @Test
+    public void getActivity() {
+        String cql = "tags";
+        String other = "r501";
+        List<String> f = Arrays.asList(cql, other);
+        Date d = new Date(0);
+        List<CassandraActivityStreamsEntry> results = repository.getActivitiesForFilters(f,d);
+    }
+
+    @Ignore
+    @Test
+    public void dropTableTest(){
+        repository.dropTable("coltest");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
new file mode 100644
index 0000000..2a90462
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.repository.impl;
+
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
+import org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriptionImpl;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class CassandraActivitySubscriptionTest {
+
+    public CassandraSubscriptionRepository repository;
+
+
+    @Before
+    public void setup() {
+//        repository = new CassandraSubscriptionRepository();
+    }
+
+    @Ignore
+    @Test
+    public void saveTest(){
+        ActivityStreamsSubscription subscription = new ActivityStreamsSubscriptionImpl();
+        subscription.setFilters(Arrays.asList("thisis", "atest"));
+        subscription.setAuthToken("subid");
+
+        repository.save(subscription);
+    }
+
+    @Ignore
+    @Test
+    public void getTest(){
+        String filters = repository.getFilters("subid");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-eip-routes/ReadMe.txt
----------------------------------------------------------------------
diff --git a/streams-eip-routes/ReadMe.txt b/streams-eip-routes/ReadMe.txt
deleted file mode 100644
index 19a1d19..0000000
--- a/streams-eip-routes/ReadMe.txt
+++ /dev/null
@@ -1,32 +0,0 @@
-Camel Router WAR Project with Web Console and REST Support
-==========================================================
-
-This project bundles the Camel Web Console, REST API, and some
-sample routes as a WAR. You can build the WAR by running
-
-mvn install
-
-You can then run the project by dropping the WAR into your 
-favorite web container or just run
-
-mvn jetty:run
-
-to start up and deploy to Jetty.
-
-
-Web Console
-===========
-
-You can view the Web Console by pointing your browser to http://localhost:8080/
-
-You should be able to do things like
-
-    * browse the available endpoints
-    * browse the messages on an endpoint if it is a BrowsableEndpoint
-    * send a message to an endpoint
-    * create new endpoints
-
-For more help see the Apache Camel documentation
-
-    http://camel.apache.org/
-