You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 16:50:50 UTC
[13/13] git commit: dropping streams-pojo-extensions,
BC activity can be extended without it move streams cassandra under
contrib move streams-eip-routes under runtimes,
renaming streams-runtime-webapp
dropping streams-pojo-extensions, BC activity can be extended without it
move streams cassandra under contrib
move streams-eip-routes under runtimes, renaming streams-runtime-webapp
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/adb43b29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/adb43b29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/adb43b29
Branch: refs/heads/springcleaning
Commit: adb43b29567351e8849385be95806d4be2fcb7e5
Parents: 381d758
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Mon Mar 24 03:07:06 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Mon Mar 24 03:07:06 2014 -0500
----------------------------------------------------------------------
pom.xml | 8 +-
streams-cassandra/pom.xml | 142 -------------
.../configuration/CassandraConfiguration.java | 63 ------
.../model/CassandraActivityStreamsEntry.java | 45 ----
.../CassandraActivityStreamsRepository.java | 176 ----------------
.../repository/impl/CassandraKeyspace.java | 43 ----
.../impl/CassandraSubscriptionRepository.java | 69 ------
.../spring/streams-cassandra-context.xml | 25 ---
.../CassandraActivityStreamsRepositoryTest.java | 99 ---------
.../impl/CassandraActivitySubscriptionTest.java | 54 -----
streams-config-graph/pom.xml | 71 -------
.../config/graph/PipelineGraphConfigurator.java | 88 --------
.../streams-persist-cassandra/pom.xml | 142 +++++++++++++
.../configuration/CassandraConfiguration.java | 63 ++++++
.../model/CassandraActivityStreamsEntry.java | 43 ++++
.../CassandraActivityStreamsRepository.java | 176 ++++++++++++++++
.../repository/impl/CassandraKeyspace.java | 43 ++++
.../impl/CassandraSubscriptionRepository.java | 69 ++++++
.../spring/streams-cassandra-context.xml | 25 +++
.../CassandraActivityStreamsRepositoryTest.java | 99 +++++++++
.../impl/CassandraActivitySubscriptionTest.java | 54 +++++
streams-eip-routes/ReadMe.txt | 32 ---
streams-eip-routes/pom.xml | 209 -------------------
.../aggregation/ActivityAggregator.java | 59 ------
.../configuration/EipConfigurator.java | 201 ------------------
.../ActivityPublisherRegistrationProcessor.java | 73 -------
...yStreamsSubscriberRegistrationProcessor.java | 94 ---------
.../routers/ActivityConsumerRouteBuilder.java | 32 ---
.../ActivityStreamsSubscriberRouteBuilder.java | 32 ---
.../routers/impl/ActivityConsumerRouter.java | 144 -------------
.../impl/ActivityStreamsSubscriberRouter.java | 142 -------------
.../messaging/service/ActivityService.java | 31 ---
.../messaging/service/SubscriptionService.java | 29 ---
.../service/impl/CassandraActivityService.java | 96 ---------
.../impl/CassandraSubscriptionService.java | 43 ----
.../META-INF/spring/propertiesLoader.xml | 35 ----
.../spring/streams-eip-applicationContext.xml | 113 ----------
.../streams-eip-osgi-component-import.xml | 38 ----
.../META-INF/spring/streamsCamelContext.xml | 96 ---------
.../main/resources/META-INF/streams.properties | 41 ----
.../impl/CassandraActivityServiceTest.java | 87 --------
streams-pojo-extensions/pom.xml | 149 -------------
.../org/apache/streams/ActivityExtended.json | 17 --
.../pojo/test/ActivityExtendedSerDeTest.java | 108 ----------
.../test/resources/gnip_twitter_extended.json | 146 -------------
streams-runtimes/pom.xml | 1 +
.../streams-runtime-webapp/ReadMe.txt | 32 +++
streams-runtimes/streams-runtime-webapp/pom.xml | 209 +++++++++++++++++++
.../aggregation/ActivityAggregator.java | 59 ++++++
.../configuration/EipConfigurator.java | 201 ++++++++++++++++++
.../ActivityPublisherRegistrationProcessor.java | 72 +++++++
...yStreamsSubscriberRegistrationProcessor.java | 93 +++++++++
.../routers/ActivityConsumerRouteBuilder.java | 32 +++
.../ActivityStreamsSubscriberRouteBuilder.java | 32 +++
.../routers/impl/ActivityConsumerRouter.java | 144 +++++++++++++
.../impl/ActivityStreamsSubscriberRouter.java | 141 +++++++++++++
.../messaging/service/ActivityService.java | 31 +++
.../messaging/service/SubscriptionService.java | 29 +++
.../service/impl/CassandraActivityService.java | 96 +++++++++
.../impl/CassandraSubscriptionService.java | 43 ++++
.../META-INF/spring/propertiesLoader.xml | 35 ++++
.../spring/streams-eip-applicationContext.xml | 113 ++++++++++
.../streams-eip-osgi-component-import.xml | 38 ++++
.../META-INF/spring/streamsCamelContext.xml | 96 +++++++++
.../main/resources/META-INF/streams.properties | 41 ++++
.../impl/CassandraActivityServiceTest.java | 86 ++++++++
66 files changed, 2340 insertions(+), 2928 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c552cf1..a476c07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,17 +88,13 @@
<module>poms</module>
<module>provision</module>
<module>streams-osgi-components</module>
- <module>streams-eip-routes</module>
- <module>streams-cassandra</module>
- <module>streams-web</module>
+ <module>streams-core</module>
<module>streams-config</module>
- <module>streams-config-graph</module>
<module>streams-pojo</module>
<module>streams-util</module>
- <module>streams-pojo-extensions</module>
<module>streams-contrib</module>
- <module>streams-core</module>
<module>streams-runtimes</module>
+ <module>streams-web</module>
</modules>
<packaging>pom</packaging>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-cassandra/pom.xml b/streams-cassandra/pom.xml
deleted file mode 100644
index 4837649..0000000
--- a/streams-cassandra/pom.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-project</artifactId>
- <version>0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>streams-cassandra</artifactId>
-
- <name>${bundle.symbolicName} [${bundle.namespace}]</name>
-
- <properties>
- <bundle.symbolicName>streams-cassandra</bundle.symbolicName>
- <bundle.namespace>org.apache.streams</bundle.namespace>
- <easymock.version>3.2</easymock.version>
- </properties>
-
- <packaging>bundle</packaging>
- <build>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
-
- <resource>
- <directory>.</directory>
- <includes>
- <include>plugin.xml</include>
- <include>plugin.properties</include>
- <include>icons/**</include>
- </includes>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <groupId>org.ops4j</groupId>
- <artifactId>maven-pax-plugin</artifactId>
- <!--
- | enable improved OSGi compilation support for the bundle life-cycle.
- | to switch back to the standard bundle life-cycle, move this setting
- | down to the maven-bundle-plugin section
- -->
- <extensions>true</extensions>
- </plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <version>1.4.3</version>
- <!--
- | the following instructions build a simple set of public/private classes into an OSGi bundle
- -->
- <configuration>
- <instructions>
- <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
- <Bundle-Version>${project.version}</Bundle-Version>
- <Export-Package>
- ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model, org.apache.streams.cassandra.configuration
- </Export-Package>
- <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model, ${bundle.namespace}.cassandra.configuration </Private-Package>
- <Import-Package>
- org.apache.rave.model,org.apache.rave.portal.model.impl,
- com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
- javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map,
- org.apache.commons.lang,
- org.apache.streams.osgi.components.activitysubscriber,
- org.springframework.beans.factory.annotation, org.springframework.stereotype
- </Import-Package>
- </instructions>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.rave</groupId>
- <artifactId>rave-core-api</artifactId>
- <version>${rave.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.rave</groupId>
- <artifactId>rave-core</artifactId>
- <version>${rave.version}</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.8.2</version>
- </dependency>
-
- <dependency>
- <groupId>com.datastax.cassandra</groupId>
- <artifactId>cassandra-driver-core</artifactId>
- <version>${datastax.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.2.9.Final</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streams.osgi.components</groupId>
- <artifactId>activity-subscriber</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <version>${easymock.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
deleted file mode 100644
index 39e74a7..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.streams.cassandra.configuration;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CassandraConfiguration {
- @Value("${keyspaceName}")
- private String keyspaceName;
-
- @Value("${activitystreamsColumnFamilyName}")
- private String activitystreamsColumnFamilyName;
-
- @Value("${subscriptionColumnFamilyName}")
- private String subscriptionColumnFamilyName;
-
- @Value("${publisherColumnFamilyName}")
- private String publisherColumnFamilyName;
-
- @Value("${cassandraPort}")
- private String cassandraPort;
-
- public String getKeyspaceName() {
- return keyspaceName;
- }
-
- public void setKeyspaceName(String keyspaceName) {
- this.keyspaceName = keyspaceName;
- }
-
- public String getActivitystreamsColumnFamilyName() {
- return activitystreamsColumnFamilyName;
- }
-
- public void setActivitystreamsColumnFamilyName(String activitystreamsColumnFamilyName) {
- this.activitystreamsColumnFamilyName = activitystreamsColumnFamilyName;
- }
-
- public String getSubscriptionColumnFamilyName() {
- return subscriptionColumnFamilyName;
- }
-
- public void setSubscriptionColumnFamilyName(String subscriptionColumnFamilyName) {
- this.subscriptionColumnFamilyName = subscriptionColumnFamilyName;
- }
-
- public String getPublisherColumnFamilyName() {
- return publisherColumnFamilyName;
- }
-
- public void setPublisherColumnFamilyName(String publisherColumnFamilyName) {
- this.publisherColumnFamilyName = publisherColumnFamilyName;
- }
-
- public String getCassandraPort() {
- return cassandraPort;
- }
-
- public void setCassandraPort(String cassandraPort) {
- this.cassandraPort = cassandraPort;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
deleted file mode 100644
index 2a2321a..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.model;
-
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.codehaus.jackson.map.annotate.JsonDeserialize;
-
-import java.util.Date;
-
-public class CassandraActivityStreamsEntry extends ActivityStreamsEntryImpl implements Comparable<CassandraActivityStreamsEntry>{
-
- @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
- private ActivityStreamsObject object;
-
- @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
- private ActivityStreamsObject target;
-
- @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
- private ActivityStreamsObject actor;
-
- @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
- private ActivityStreamsObject provider;
-
- public int compareTo(CassandraActivityStreamsEntry entry){
- return (this.getPublished()).compareTo(entry.getPublished());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
deleted file mode 100644
index 56e5416..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-
-public class CassandraActivityStreamsRepository {
-
- private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
-
- private CassandraKeyspace keyspace;
- private CassandraConfiguration configuration;
-
- @Autowired
- public CassandraActivityStreamsRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
- this.configuration = configuration;
- this.keyspace = keyspace;
-
- try {
- keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
- "id text, " +
- "published timestamp, " +
- "verb text, " +
- "tags text, " +
-
- "actor_displayname text, " +
- "actor_id text, " +
- "actor_url text, " +
- "actor_objecttype text, " +
-
- "target_displayname text, " +
- "target_id text, " +
- "target_url text, " +
-
- "provider_url text, " +
-
- "object_url text, " +
- "object_displayname text, " +
- "object_id text, " +
- "object_objecttype text, " +
-
- "PRIMARY KEY (id, tags, published));");
- } catch (AlreadyExistsException ignored) {
- }
- }
-
- public void save(ActivityStreamsEntry entry) {
- String sql = "INSERT INTO " + configuration.getActivitystreamsColumnFamilyName() + " (" +
- "id, published, verb, tags, " +
- "actor_displayname, actor_objecttype, actor_id, actor_url, " +
- "target_displayname, target_id, target_url, " +
- "provider_url, " +
- "object_displayname, object_objecttype, object_id, object_url) " +
- "VALUES ('" +
- entry.getId() + "','" +
- entry.getPublished().getTime() + "','" +
- entry.getVerb() + "','" +
- entry.getTags() + "','" +
-
- entry.getActor().getDisplayName() + "','" +
- entry.getActor().getObjectType() + "','" +
- entry.getActor().getId() + "','" +
- entry.getActor().getUrl() + "','" +
-
- entry.getTarget().getDisplayName() + "','" +
- entry.getTarget().getId() + "','" +
- entry.getTarget().getUrl() + "','" +
-
- entry.getProvider().getUrl() + "','" +
-
- entry.getObject().getDisplayName() + "','" +
- entry.getObject().getObjectType() + "','" +
- entry.getObject().getId() + "','" +
- entry.getObject().getUrl() +
-
- "')";
- keyspace.getSession().execute(sql);
- }
-
- public List<CassandraActivityStreamsEntry> getActivitiesForFilters(List<String> filters, Date lastUpdated) {
- List<CassandraActivityStreamsEntry> results = new ArrayList<CassandraActivityStreamsEntry>();
-
- for (String tag : filters) {
- String cql = "SELECT * FROM " + configuration.getActivitystreamsColumnFamilyName() + " WHERE ";
-
- //add filters
- cql = cql + " tags = '" + tag + "' AND ";
-
- //specify last modified
- cql = cql + "published > " + lastUpdated.getTime() + " ALLOW FILTERING";
-
- //execute the cql query and store the results
- ResultSet set = keyspace.getSession().execute(cql);
-
- //iterate through the results and create a new ActivityStreamsEntry for every result returned
-
- for (Row row : set) {
- CassandraActivityStreamsEntry entry = new CassandraActivityStreamsEntry();
- ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
- ActivityStreamsObject target = new ActivityStreamsObjectImpl();
- ActivityStreamsObject object = new ActivityStreamsObjectImpl();
- ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
-
- actor.setDisplayName(row.getString("actor_displayname"));
- actor.setId(row.getString("actor_id"));
- actor.setObjectType(row.getString("actor_objecttype"));
- actor.setUrl(row.getString("actor_url"));
-
- target.setDisplayName(row.getString("target_displayname"));
- target.setId(row.getString("target_id"));
- target.setUrl(row.getString("target_url"));
-
- object.setDisplayName(row.getString("object_displayname"));
- object.setObjectType(row.getString("object_objecttype"));
- object.setUrl(row.getString("object_url"));
- object.setId(row.getString("object_id"));
-
- provider.setUrl(row.getString("provider_url"));
-
- entry.setPublished(row.getDate("published"));
- entry.setVerb(row.getString("verb"));
- entry.setId(row.getString("id"));
- entry.setTags(row.getString("tags"));
- entry.setActor(actor);
- entry.setTarget(target);
- entry.setObject(object);
- entry.setProvider(provider);
-
- results.add(entry);
- }
- }
-
- return results;
- }
-
- public void dropTable(String table) {
- String cql = "DROP TABLE " + table;
- keyspace.getSession().execute(cql);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
deleted file mode 100644
index 0551bf2..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class CassandraKeyspace {
- private CassandraConfiguration configuration;
- private Cluster cluster;
- private Session session;
-
- @Autowired
- public CassandraKeyspace(CassandraConfiguration configuration){
- this.configuration = configuration;
-
- cluster = Cluster.builder().addContactPoint(configuration.getCassandraPort()).build();
- session = cluster.connect();
-
- //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS
- try {
- session.execute("CREATE KEYSPACE " + configuration.getKeyspaceName() + " WITH replication = { 'class': 'SimpleStrategy','replication_factor' : 1 };");
- } catch (AlreadyExistsException ignored) {
- }
-
- //connect to the keyspace
- session = cluster.connect(configuration.getKeyspaceName());
- }
-
- public Session getSession(){
- return session;
- }
-
- @Override
- protected void finalize() throws Throwable {
- try {
- cluster.shutdown();
- } finally {
- super.finalize();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java b/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
deleted file mode 100644
index f5fe471..0000000
--- a/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class CassandraSubscriptionRepository {
- private static final Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
-
- private CassandraKeyspace keyspace;
- private CassandraConfiguration configuration;
-
- @Autowired
- public CassandraSubscriptionRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
- this.keyspace = keyspace;
- this.configuration = configuration;
-
- try {
- keyspace.getSession().execute("CREATE TABLE " + configuration.getSubscriptionColumnFamilyName() + " (" +
- "id text, " +
- "filters text, " +
-
- "PRIMARY KEY (id));");
- } catch (AlreadyExistsException ignored) {
- }
- }
-
- public String getFilters(String id){
- String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName() + " WHERE id = '" + id+"';";
-
- ResultSet set = keyspace.getSession().execute(cql);
-
- return set.one().getString("filters");
- }
-
- public void save(ActivityStreamsSubscription subscription){
- String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName() + " (" +
- "id, filters) " +
- "VALUES ('" +
- subscription.getAuthToken() + "','" +
- StringUtils.join(subscription.getFilters(), " ") +
-
- "')";
- keyspace.getSession().execute(cql);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml b/streams-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
deleted file mode 100644
index 842c918..0000000
--- a/streams-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
+++ /dev/null
@@ -1,25 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-<beans
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
-
-</beans>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java b/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
deleted file mode 100644
index 978af10..0000000
--- a/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.ResultSet;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import static org.easymock.EasyMock.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-public class CassandraActivityStreamsRepositoryTest {
-
- private CassandraActivityStreamsRepository repository;
-
-
- @Before
- public void setup() {
- CassandraKeyspace keyspace = createMock(CassandraKeyspace.class);
- CassandraConfiguration configuration = createMock(CassandraConfiguration.class);
- repository = new CassandraActivityStreamsRepository(keyspace, configuration);
- }
-
- @Ignore
- @Test
- public void saveActivity() {
- ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
- ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
- ActivityStreamsObject target = new ActivityStreamsObjectImpl();
- ActivityStreamsObject object = new ActivityStreamsObjectImpl();
- ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
-
- actor.setId("actorid1");
- actor.setUrl("actorurl1");
- actor.setDisplayName("actorname1");
-
- target.setId("targetid1");
- target.setUrl("targeturl1");
- target.setDisplayName("r501");
-
- provider.setUrl("providerurl");
-
- object.setId("objectid1");
- object.setDisplayName("objectname1");
-
- entry.setId("dink");
- entry.setVerb("verb1");
- entry.setTags("r501");
- entry.setProvider(provider);
- Date d = new Date();
- entry.setPublished(d);
- entry.setActor(actor);
- entry.setObject(object);
- entry.setTarget(target);
-
- repository.save(entry);
- }
-
- @Ignore
- @Test
- public void getActivity() {
- String cql = "tags";
- String other = "r501";
- List<String> f = Arrays.asList(cql, other);
- Date d = new Date(0);
- List<CassandraActivityStreamsEntry> results = repository.getActivitiesForFilters(f,d);
- }
-
- @Ignore
- @Test
- public void dropTableTest(){
- repository.dropTable("coltest");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java b/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
deleted file mode 100644
index 2a90462..0000000
--- a/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriptionImpl;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-public class CassandraActivitySubscriptionTest {
-
- public CassandraSubscriptionRepository repository;
-
-
- @Before
- public void setup() {
-// repository = new CassandraSubscriptionRepository();
- }
-
- @Ignore
- @Test
- public void saveTest(){
- ActivityStreamsSubscription subscription = new ActivityStreamsSubscriptionImpl();
- subscription.setFilters(Arrays.asList("thisis", "atest"));
- subscription.setAuthToken("subid");
-
- repository.save(subscription);
- }
-
- @Ignore
- @Test
- public void getTest(){
- String filters = repository.getFilters("subid");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-config-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-config-graph/pom.xml b/streams-config-graph/pom.xml
deleted file mode 100644
index d4a3dbb..0000000
--- a/streams-config-graph/pom.xml
+++ /dev/null
@@ -1,71 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-project</artifactId>
- <version>0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>streams-config-graph</artifactId>
-
- <dependencies>
- <dependency>
- <artifactId>gs-core</artifactId>
- <groupId>org.graphstream</groupId>
- <version>1.2</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/java</sourceDirectory>
- <testSourceDirectory>src/test/java</testSourceDirectory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- <testResources>
- <testResource>
- <directory>src/test/resources</directory>
- </testResource>
- </testResources>
- </build>
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-config-graph/src/main/java/org/apache/streams/config/graph/PipelineGraphConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-config-graph/src/main/java/org/apache/streams/config/graph/PipelineGraphConfigurator.java b/streams-config-graph/src/main/java/org/apache/streams/config/graph/PipelineGraphConfigurator.java
deleted file mode 100644
index 3d7b6dd..0000000
--- a/streams-config-graph/src/main/java/org/apache/streams/config/graph/PipelineGraphConfigurator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.config.graph;
-
-import org.graphstream.graph.Graph;
-import org.graphstream.graph.implementations.SingleGraph;
-import org.graphstream.stream.file.FileSource;
-import org.graphstream.stream.file.FileSourceFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Enumeration;
-
-/**
- * Created with IntelliJ IDEA.
- * User: sblackmon
- * Date: 9/23/13
- * Time: 10:44 AM
- * To change this template use File | Settings | File Templates.
- */
-public class PipelineGraphConfigurator {
-
- public static Graph pipeline = loadPipeline();
-
- private static Graph loadPipeline() {
-
- Graph pipeline = new SingleGraph("pipelines");
-
- // this class looks for any pipelines specified with a graph definition
- // each is loaded into the execution graph
- // the application is responsible for launching each
- Enumeration<URL> pipelineFiles;
- try {
- pipelineFiles = PipelineGraphConfigurator.class.getClassLoader().getResources("*.dot");
-
- for( URL pipelineFile : Collections.list(pipelineFiles) ) {
- File file = new File(pipelineFile.toURI());
- String filePath = file.getAbsolutePath();
- FileSource fileSource = FileSourceFactory.sourceFor(filePath);
-
- fileSource.addSink(pipeline);
-
- try {
- fileSource.begin(filePath);
-
- while (fileSource.nextEvents()) {
- // Optionally some code here ...
- }
- } catch( IOException e) {
- e.printStackTrace();
- }
-
- try {
- fileSource.end();
- } catch( IOException e) {
- e.printStackTrace();
- } finally {
- fileSource.removeSink(pipeline);
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
-
- return pipeline;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml
new file mode 100644
index 0000000..fd6711f
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/pom.xml
@@ -0,0 +1,142 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-contrib</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streams-persist-cassandra</artifactId>
+
+ <name>${bundle.symbolicName} [${bundle.namespace}]</name>
+
+ <properties>
+ <bundle.symbolicName>streams-persist-cassandra</bundle.symbolicName>
+ <bundle.namespace>org.apache.streams</bundle.namespace>
+ <easymock.version>3.2</easymock.version>
+ </properties>
+
+ <packaging>bundle</packaging>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+
+ <resource>
+ <directory>.</directory>
+ <includes>
+ <include>plugin.xml</include>
+ <include>plugin.properties</include>
+ <include>icons/**</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.ops4j</groupId>
+ <artifactId>maven-pax-plugin</artifactId>
+ <!--
+ | enable improved OSGi compilation support for the bundle life-cycle.
+ | to switch back to the standard bundle life-cycle, move this setting
+ | down to the maven-bundle-plugin section
+ -->
+ <extensions>true</extensions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>1.4.3</version>
+ <!--
+ | the following instructions build a simple set of public/private classes into an OSGi bundle
+ -->
+ <configuration>
+ <instructions>
+ <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
+ <Bundle-Version>${project.version}</Bundle-Version>
+ <Export-Package>
+ ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model, org.apache.streams.cassandra.configuration
+ </Export-Package>
+ <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model, ${bundle.namespace}.cassandra.configuration </Private-Package>
+ <Import-Package>
+ org.apache.rave.model,org.apache.rave.portal.model.impl,
+ com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
+ javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map,
+ org.apache.commons.lang,
+ org.apache.streams.osgi.components.activitysubscriber,
+ org.springframework.beans.factory.annotation, org.springframework.stereotype
+ </Import-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rave</groupId>
+ <artifactId>rave-core-api</artifactId>
+ <version>${rave.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rave</groupId>
+ <artifactId>rave-core</artifactId>
+ <version>${rave.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${datastax.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.9.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams.osgi.components</groupId>
+ <artifactId>activity-subscriber</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>${easymock.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
new file mode 100644
index 0000000..39e74a7
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
@@ -0,0 +1,63 @@
+package org.apache.streams.cassandra.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class CassandraConfiguration {
+ @Value("${keyspaceName}")
+ private String keyspaceName;
+
+ @Value("${activitystreamsColumnFamilyName}")
+ private String activitystreamsColumnFamilyName;
+
+ @Value("${subscriptionColumnFamilyName}")
+ private String subscriptionColumnFamilyName;
+
+ @Value("${publisherColumnFamilyName}")
+ private String publisherColumnFamilyName;
+
+ @Value("${cassandraPort}")
+ private String cassandraPort;
+
+ public String getKeyspaceName() {
+ return keyspaceName;
+ }
+
+ public void setKeyspaceName(String keyspaceName) {
+ this.keyspaceName = keyspaceName;
+ }
+
+ public String getActivitystreamsColumnFamilyName() {
+ return activitystreamsColumnFamilyName;
+ }
+
+ public void setActivitystreamsColumnFamilyName(String activitystreamsColumnFamilyName) {
+ this.activitystreamsColumnFamilyName = activitystreamsColumnFamilyName;
+ }
+
+ public String getSubscriptionColumnFamilyName() {
+ return subscriptionColumnFamilyName;
+ }
+
+ public void setSubscriptionColumnFamilyName(String subscriptionColumnFamilyName) {
+ this.subscriptionColumnFamilyName = subscriptionColumnFamilyName;
+ }
+
+ public String getPublisherColumnFamilyName() {
+ return publisherColumnFamilyName;
+ }
+
+ public void setPublisherColumnFamilyName(String publisherColumnFamilyName) {
+ this.publisherColumnFamilyName = publisherColumnFamilyName;
+ }
+
+ public String getCassandraPort() {
+ return cassandraPort;
+ }
+
+ public void setCassandraPort(String cassandraPort) {
+ this.cassandraPort = cassandraPort;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
new file mode 100644
index 0000000..c97fb82
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.model;
+
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.codehaus.jackson.map.annotate.JsonDeserialize;
+
+public class CassandraActivityStreamsEntry extends Activity implements Comparable<CassandraActivityStreamsEntry>{
+
+ @JsonDeserialize(as=ActivityObject.class)
+ private ActivityObject object;
+
+ @JsonDeserialize(as=ActivityObject.class)
+ private ActivityObject target;
+
+ @JsonDeserialize(as=Actor.class)
+ private Actor actor;
+
+ @JsonDeserialize(as=String.class)
+ private String provider;
+
+ public int compareTo(CassandraActivityStreamsEntry entry){
+ return (this.getPublished()).compareTo(entry.getPublished());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
new file mode 100644
index 0000000..56e5416
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.repository.impl;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
+import org.apache.streams.cassandra.configuration.CassandraConfiguration;
+import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+public class CassandraActivityStreamsRepository {
+
+ private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
+
+ private CassandraKeyspace keyspace;
+ private CassandraConfiguration configuration;
+
+ @Autowired
+ public CassandraActivityStreamsRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
+ this.configuration = configuration;
+ this.keyspace = keyspace;
+
+ try {
+ keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
+ "id text, " +
+ "published timestamp, " +
+ "verb text, " +
+ "tags text, " +
+
+ "actor_displayname text, " +
+ "actor_id text, " +
+ "actor_url text, " +
+ "actor_objecttype text, " +
+
+ "target_displayname text, " +
+ "target_id text, " +
+ "target_url text, " +
+
+ "provider_url text, " +
+
+ "object_url text, " +
+ "object_displayname text, " +
+ "object_id text, " +
+ "object_objecttype text, " +
+
+ "PRIMARY KEY (id, tags, published));");
+ } catch (AlreadyExistsException ignored) {
+ }
+ }
+
+ public void save(ActivityStreamsEntry entry) {
+ String sql = "INSERT INTO " + configuration.getActivitystreamsColumnFamilyName() + " (" +
+ "id, published, verb, tags, " +
+ "actor_displayname, actor_objecttype, actor_id, actor_url, " +
+ "target_displayname, target_id, target_url, " +
+ "provider_url, " +
+ "object_displayname, object_objecttype, object_id, object_url) " +
+ "VALUES ('" +
+ entry.getId() + "','" +
+ entry.getPublished().getTime() + "','" +
+ entry.getVerb() + "','" +
+ entry.getTags() + "','" +
+
+ entry.getActor().getDisplayName() + "','" +
+ entry.getActor().getObjectType() + "','" +
+ entry.getActor().getId() + "','" +
+ entry.getActor().getUrl() + "','" +
+
+ entry.getTarget().getDisplayName() + "','" +
+ entry.getTarget().getId() + "','" +
+ entry.getTarget().getUrl() + "','" +
+
+ entry.getProvider().getUrl() + "','" +
+
+ entry.getObject().getDisplayName() + "','" +
+ entry.getObject().getObjectType() + "','" +
+ entry.getObject().getId() + "','" +
+ entry.getObject().getUrl() +
+
+ "')";
+ keyspace.getSession().execute(sql);
+ }
+
+ public List<CassandraActivityStreamsEntry> getActivitiesForFilters(List<String> filters, Date lastUpdated) {
+ List<CassandraActivityStreamsEntry> results = new ArrayList<CassandraActivityStreamsEntry>();
+
+ for (String tag : filters) {
+ String cql = "SELECT * FROM " + configuration.getActivitystreamsColumnFamilyName() + " WHERE ";
+
+ //add filters
+ cql = cql + " tags = '" + tag + "' AND ";
+
+ //specify last modified
+ cql = cql + "published > " + lastUpdated.getTime() + " ALLOW FILTERING";
+
+ //execute the cql query and store the results
+ ResultSet set = keyspace.getSession().execute(cql);
+
+ //iterate through the results and create a new ActivityStreamsEntry for every result returned
+
+ for (Row row : set) {
+ CassandraActivityStreamsEntry entry = new CassandraActivityStreamsEntry();
+ ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject target = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject object = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
+
+ actor.setDisplayName(row.getString("actor_displayname"));
+ actor.setId(row.getString("actor_id"));
+ actor.setObjectType(row.getString("actor_objecttype"));
+ actor.setUrl(row.getString("actor_url"));
+
+ target.setDisplayName(row.getString("target_displayname"));
+ target.setId(row.getString("target_id"));
+ target.setUrl(row.getString("target_url"));
+
+ object.setDisplayName(row.getString("object_displayname"));
+ object.setObjectType(row.getString("object_objecttype"));
+ object.setUrl(row.getString("object_url"));
+ object.setId(row.getString("object_id"));
+
+ provider.setUrl(row.getString("provider_url"));
+
+ entry.setPublished(row.getDate("published"));
+ entry.setVerb(row.getString("verb"));
+ entry.setId(row.getString("id"));
+ entry.setTags(row.getString("tags"));
+ entry.setActor(actor);
+ entry.setTarget(target);
+ entry.setObject(object);
+ entry.setProvider(provider);
+
+ results.add(entry);
+ }
+ }
+
+ return results;
+ }
+
+ public void dropTable(String table) {
+ String cql = "DROP TABLE " + table;
+ keyspace.getSession().execute(cql);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
new file mode 100644
index 0000000..0551bf2
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
@@ -0,0 +1,43 @@
+package org.apache.streams.cassandra.repository.impl;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import org.apache.streams.cassandra.configuration.CassandraConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class CassandraKeyspace {
+ private CassandraConfiguration configuration;
+ private Cluster cluster;
+ private Session session;
+
+ @Autowired
+ public CassandraKeyspace(CassandraConfiguration configuration){
+ this.configuration = configuration;
+
+ cluster = Cluster.builder().addContactPoint(configuration.getCassandraPort()).build();
+ session = cluster.connect();
+
+ //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS
+ try {
+ session.execute("CREATE KEYSPACE " + configuration.getKeyspaceName() + " WITH replication = { 'class': 'SimpleStrategy','replication_factor' : 1 };");
+ } catch (AlreadyExistsException ignored) {
+ }
+
+ //connect to the keyspace
+ session = cluster.connect(configuration.getKeyspaceName());
+ }
+
+ public Session getSession(){
+ return session;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ cluster.shutdown();
+ } finally {
+ super.finalize();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
new file mode 100644
index 0000000..f5fe471
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.repository.impl;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.cassandra.configuration.CassandraConfiguration;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class CassandraSubscriptionRepository {
+ private static final Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
+
+ private CassandraKeyspace keyspace;
+ private CassandraConfiguration configuration;
+
+ @Autowired
+ public CassandraSubscriptionRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
+ this.keyspace = keyspace;
+ this.configuration = configuration;
+
+ try {
+ keyspace.getSession().execute("CREATE TABLE " + configuration.getSubscriptionColumnFamilyName() + " (" +
+ "id text, " +
+ "filters text, " +
+
+ "PRIMARY KEY (id));");
+ } catch (AlreadyExistsException ignored) {
+ }
+ }
+
+ public String getFilters(String id){
+ String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName() + " WHERE id = '" + id+"';";
+
+ ResultSet set = keyspace.getSession().execute(cql);
+
+ return set.one().getString("filters");
+ }
+
+ public void save(ActivityStreamsSubscription subscription){
+ String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName() + " (" +
+ "id, filters) " +
+ "VALUES ('" +
+ subscription.getAuthToken() + "','" +
+ StringUtils.join(subscription.getFilters(), " ") +
+
+ "')";
+ keyspace.getSession().execute(cql);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
new file mode 100644
index 0000000..842c918
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
@@ -0,0 +1,25 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<beans
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
new file mode 100644
index 0000000..978af10
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.repository.impl;
+
+import com.datastax.driver.core.ResultSet;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
+import org.apache.streams.cassandra.configuration.CassandraConfiguration;
+import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import static org.easymock.EasyMock.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+public class CassandraActivityStreamsRepositoryTest {
+
+ private CassandraActivityStreamsRepository repository;
+
+
+ @Before
+ public void setup() {
+ CassandraKeyspace keyspace = createMock(CassandraKeyspace.class);
+ CassandraConfiguration configuration = createMock(CassandraConfiguration.class);
+ repository = new CassandraActivityStreamsRepository(keyspace, configuration);
+ }
+
+ @Ignore
+ @Test
+ public void saveActivity() {
+ ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
+ ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject target = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject object = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
+
+ actor.setId("actorid1");
+ actor.setUrl("actorurl1");
+ actor.setDisplayName("actorname1");
+
+ target.setId("targetid1");
+ target.setUrl("targeturl1");
+ target.setDisplayName("r501");
+
+ provider.setUrl("providerurl");
+
+ object.setId("objectid1");
+ object.setDisplayName("objectname1");
+
+ entry.setId("dink");
+ entry.setVerb("verb1");
+ entry.setTags("r501");
+ entry.setProvider(provider);
+ Date d = new Date();
+ entry.setPublished(d);
+ entry.setActor(actor);
+ entry.setObject(object);
+ entry.setTarget(target);
+
+ repository.save(entry);
+ }
+
+ @Ignore
+ @Test
+ public void getActivity() {
+ String cql = "tags";
+ String other = "r501";
+ List<String> f = Arrays.asList(cql, other);
+ Date d = new Date(0);
+ List<CassandraActivityStreamsEntry> results = repository.getActivitiesForFilters(f,d);
+ }
+
+ @Ignore
+ @Test
+ public void dropTableTest(){
+ repository.dropTable("coltest");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
new file mode 100644
index 0000000..2a90462
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.repository.impl;
+
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
+import org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriptionImpl;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class CassandraActivitySubscriptionTest {
+
+ public CassandraSubscriptionRepository repository;
+
+
+ @Before
+ public void setup() {
+// repository = new CassandraSubscriptionRepository();
+ }
+
+ @Ignore
+ @Test
+ public void saveTest(){
+ ActivityStreamsSubscription subscription = new ActivityStreamsSubscriptionImpl();
+ subscription.setFilters(Arrays.asList("thisis", "atest"));
+ subscription.setAuthToken("subid");
+
+ repository.save(subscription);
+ }
+
+ @Ignore
+ @Test
+ public void getTest(){
+ String filters = repository.getFilters("subid");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/adb43b29/streams-eip-routes/ReadMe.txt
----------------------------------------------------------------------
diff --git a/streams-eip-routes/ReadMe.txt b/streams-eip-routes/ReadMe.txt
deleted file mode 100644
index 19a1d19..0000000
--- a/streams-eip-routes/ReadMe.txt
+++ /dev/null
@@ -1,32 +0,0 @@
-Camel Router WAR Project with Web Console and REST Support
-==========================================================
-
-This project bundles the Camel Web Console, REST API, and some
-sample routes as a WAR. You can build the WAR by running
-
-mvn install
-
-You can then run the project by dropping the WAR into your
-favorite web container or just run
-
-mvn jetty:run
-
-to start up and deploy to Jetty.
-
-
-Web Console
-===========
-
-You can view the Web Console by pointing your browser to http://localhost:8080/
-
-You should be able to do things like
-
- * browse the available endpoints
- * browse the messages on an endpoint if it is a BrowsableEndpoint
- * send a message to an endpoint
- * create new endpoints
-
-For more help see the Apache Camel documentation
-
- http://camel.apache.org/
-