You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/08/30 18:38:17 UTC

svn commit: r1519019 - in /incubator/streams/branches/cassandra: ./ streams-cassandra/ streams-cassandra/src/main/java/org/apache/streams/cassandra/model/ streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/ streams-cassandra/s...

Author: dsullivan
Date: Fri Aug 30 16:38:16 2013
New Revision: 1519019

URL: http://svn.apache.org/r1519019
Log:
changed from astyanax to datastax driver. After subscribing, the subscribers stream is populated with 10 streams from the database.

Modified:
    incubator/streams/branches/cassandra/pom.xml
    incubator/streams/branches/cassandra/streams-cassandra/pom.xml
    incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
    incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
    incubator/streams/branches/cassandra/streams-eip-routes/pom.xml
    incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
    incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
    incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml
    incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
    incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
    incubator/streams/branches/cassandra/streams-web/pom.xml

Modified: incubator/streams/branches/cassandra/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/pom.xml (original)
+++ incubator/streams/branches/cassandra/pom.xml Fri Aug 30 16:38:16 2013
@@ -17,57 +17,60 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
-  <modelVersion>4.0.0</modelVersion>
+    <modelVersion>4.0.0</modelVersion>
 
-  <parent>
+    <parent>
         <groupId>org.apache.streams</groupId>
         <artifactId>streams-master</artifactId>
         <version>0.2-incubating-SNAPSHOT</version>
-   </parent>
+    </parent>
 
-  <groupId>org.apache.streams</groupId>
-  <artifactId>streams-project</artifactId>
-  <version>0.1-SNAPSHOT</version>
-
-  <name>Apache Streams Project</name>
-
-  <description>Apache Streams Project</description>
-
-  <properties>
-    <org.osgi.service.http.port>8080</org.osgi.service.http.port>
-    <org.osgi.service.http.port.secure>8443</org.osgi.service.http.port.secure>
-  </properties>
-
-  <packaging>pom</packaging>
-
-  <modules>
-    <module>poms</module>
-    <module>provision</module>
-    <module>streams-osgi-components</module>
-    <module>streams-eip-routes</module>
-    <module>streams-cassandra</module>
-    <module>streams-web</module>
-  </modules>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.ops4j</groupId>
-        <artifactId>maven-pax-plugin</artifactId>
-        <version>1.5</version>
-          <extensions>true</extensions>
-          <configuration>
-            <compilerVersion>1.5</compilerVersion>
-            <provision>
-              <param>--platform=felix</param>
-            </provision>
-        </configuration>
-       <executions>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
+    <groupId>org.apache.streams</groupId>
+    <artifactId>streams-project</artifactId>
+    <version>0.1-SNAPSHOT</version>
+
+    <name>Apache Streams Project</name>
+
+    <description>Apache Streams Project</description>
+
+    <properties>
+        <org.osgi.service.http.port>8080</org.osgi.service.http.port>
+        <org.osgi.service.http.port.secure>8443</org.osgi.service.http.port.secure>
+        <rave.version>0.22-SNAPSHOT</rave.version>
+        <datastax.version>1.0.2</datastax.version>
+    </properties>
+
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>poms</module>
+        <module>provision</module>
+        <module>streams-osgi-components</module>
+        <module>streams-eip-routes</module>
+        <module>streams-cassandra</module>
+        <module>streams-web</module>
+    </modules>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.ops4j</groupId>
+                <artifactId>maven-pax-plugin</artifactId>
+                <version>1.5</version>
+                <extensions>true</extensions>
+                <configuration>
+                    <compilerVersion>1.5</compilerVersion>
+                    <provision>
+                        <param>--platform=felix</param>
+                    </provision>
+                </configuration>
+                <executions>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 
 </project>
\ No newline at end of file

Modified: incubator/streams/branches/cassandra/streams-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/pom.xml Fri Aug 30 16:38:16 2013
@@ -58,12 +58,12 @@
                     <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
                     <Bundle-Version>${project.version}</Bundle-Version>
                     <Export-Package>
-                        ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model, org.apache.streams.cassandra.service
+                        ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model
                     </Export-Package>
-                    <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model, ${bundle.namespace}.cassandra.service </Private-Package>
+                    <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model </Private-Package>
                     <Import-Package>
                         org.apache.rave.model,org.apache.rave.portal.model.impl,
-                        com.netflix.astyanax, com.netflix.astyanax.connectionpool, com.netflix.astyanax.connectionpool.impl, com.netflix.astyanax.impl, com.netflix.astyanax.thrift, com.netflix.astyanax.connectionpool.exceptions, com.netflix.astyanax.model, com.netflix.astyanax.serializers, com.netflix.astyanax.entitystore,  com.netflix.astyanax.ddl,
+                        com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
                         javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map
                     </Import-Package>
                 </instructions>
@@ -76,50 +76,31 @@
         <dependency>
             <groupId>org.apache.rave</groupId>
             <artifactId>rave-core-api</artifactId>
-            <version>0.22-SNAPSHOT</version>
-            <scope>provided</scope>
+            <version>${rave.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.rave</groupId>
             <artifactId>rave-core</artifactId>
-            <version>0.22-SNAPSHOT</version>
-            <scope>provided</scope>
+            <version>${rave.version}</version>
         </dependency>
 
-        <!--astyanax-->
         <dependency>
-            <groupId>com.netflix.astyanax</groupId>
-            <artifactId>astyanax-core</artifactId>
-            <version>1.56.43-SNAPSHOT</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.netflix.astyanax</groupId>
-            <artifactId>astyanax-thrift</artifactId>
-            <version>1.56.43-SNAPSHOT</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.netflix.astyanax</groupId>
-            <artifactId>astyanax-cassandra</artifactId>
-            <version>1.56.43-SNAPSHOT</version>
-            <scope>provided</scope>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.8.2</version>
         </dependency>
 
         <dependency>
-            <groupId>com.netflix.astyanax</groupId>
-            <artifactId>astyanax-entity-mapper</artifactId>
-            <version>1.56.43-SNAPSHOT</version>
-            <scope>provided</scope>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+            <version>${datastax.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.8.2</version>
+            <groupId>org.jboss.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>3.2.9.Final</version>
         </dependency>
 
     </dependencies>

Modified: incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java Fri Aug 30 16:38:16 2013
@@ -1,688 +1,17 @@
 package org.apache.streams.cassandra.model;
 
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsMediaLink;
 import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
+import org.codehaus.jackson.map.annotate.JsonDeserialize;
 
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import java.io.Serializable;
-import java.util.Date;
-import java.util.Map;
-import java.util.logging.Logger;
-
-@Entity
-public class CassandraActivityStreamsEntry implements ActivityStreamsEntry, Serializable{
-    private static final long serialVersionUID = 1L;
-    private static Logger log = Logger.getLogger(CassandraActivityStreamsEntry.class.getName());
-
-    @Id
-    private String id;
-
-    @Column
-    private ActivityStreamsObject actor;
-
-    @Column
-    private String content;
-
-    @Column
-    private ActivityStreamsObject generator;
-
-    @Column
-    private ActivityStreamsMediaLink icon;
-
-    @Column
+public class CassandraActivityStreamsEntry extends ActivityStreamsEntryImpl{
+    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
     private ActivityStreamsObject object;
 
-    @Column
-    private ActivityStreamsObject provider;
-
-    @Column
+    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
     private ActivityStreamsObject target;
 
-    @Column
-    private String title;
-
-    @Column
-    private String verb;
-
-    //The user who verb'd this activity
-    @Column
-    private String userId;
-
-    //If this activity was generated as part of a group, this indicates the group's id
-    @Column
-    private String groupId;
-
-    //The id of the application that published this activity
-    @Column
-    private String appId;
-
-    @Column
-    private String bcc;
-
-    @Column
-    private String bto;
-
-    @Column
-    private String cc;
-
-    @Column
-    private String context;
-
-    @Column
-    private String dc;
-
-    private Date endTime;
-
-    @Column
-    private String geojson;
-
-    @Column
-    private String inReplyTo;
-
-    @Column
-    private String ld;
-
-    @Column
-    private String links;
-
-    @Column
-    private String location;
-
-    @Column
-    private String mood;
-
-    @Column
-    private String odata;
-
-    @Column
-    private String opengraph;
-
-    @Column
-    private String priority;
-
-    @Column
-    private String rating;
-
-    @Column
-    private String result;
-
-    @Column
-    private String schema_org;
-
-    @Column
-    private String source;
-
-    @Column
-    private Date startTime;
-
-    @Column
-    private String tags;
-
-    @Column
-    private String to;
-
-    @Column
-    private Date published;
-
-    @Column
-    private Date updated;
-
-    @Column
-    private String url;
-
-    @Column
-    private String objectType;
-
-    private Map openSocial;
-
-    private Map extensions;
-
-    /**
-     * Create a new empty DeserializableActivityEntry
-     */
-    public CassandraActivityStreamsEntry() {
-    }
-
-    /** {@inheritDoc} */
-
-    public Date getPublished() {
-        return published;
-    }
-
-    /** {@inheritDoc} */
-    public void setPublished(Date published) {
-        this.published = published;
-    }
-
-    public Date getUpdated(){
-        return updated;
-    }
-
-    public void setUpdated(Date updated){
-        this.updated=updated;
-    }
-
-    public String getUrl(){
-        return this.url;
-    }
-
-    public void setUrl(String url){
-        this.url=url;
-    }
-
-    public String getObjectType(){
-        return this.objectType;
-    }
-
-    public void setObjectType(String objectType){
-        this.objectType=objectType;
-    }
-
-    /** {@inheritDoc} */
-
-    public Map getOpenSocial() {
-        return openSocial;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void setOpenSocial(Map openSocial) {
-
-        this.openSocial = openSocial;
-    }
-
-    /** {@inheritDoc} */
-
-    public Map getExtensions() {
-        return extensions;
-    }
-
-    /** {@inheritDoc} */
-    public void setExtensions(Map extensions) {
-
-
-        this.extensions = extensions;
-    }
-
-    public ActivityStreamsObject getActor() {
-        return actor;
-    }
-
-    /** {@inheritDoc} */
-    public void setActor(ActivityStreamsObject actor) {
-        this.actor = actor;
-    }
-
-    /** {@inheritDoc} */
-    public String getContent() {
-        return content;
-    }
-
-    /** {@inheritDoc} */
-    public void setContent(String content) {
-        this.content = content;
-    }
-
-
-    /** {@inheritDoc} */
-    public ActivityStreamsObject getGenerator() {
-        return generator;
-    }
-
-    /** {@inheritDoc} */
-    public void setGenerator(ActivityStreamsObject generator) {
-        this.generator = generator;
-    }
-
-    /** {@inheritDoc} */
-    public ActivityStreamsMediaLink getIcon() {
-        return icon;
-    }
-
-    /** {@inheritDoc} */
-    public void setIcon(ActivityStreamsMediaLink icon) {
-        this.icon = icon;
-    }
-
-    /** {@inheritDoc} */
-
-    public String getId() {
-        return id;
-    }
-
-    /** {@inheritDoc} */
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    /** {@inheritDoc} */
-    public ActivityStreamsObject getObject() {
-        return object;
-    }
-
-    /** {@inheritDoc} */
-    public void setObject(ActivityStreamsObject object) {
-        this.object = object;
-    }
-
-
-
-    /** {@inheritDoc} */
-    public ActivityStreamsObject getProvider() {
-        return provider;
-    }
-
-    /** {@inheritDoc} */
-    public void setProvider(ActivityStreamsObject provider) {
-        this.provider = provider;
-    }
-
-    /** {@inheritDoc} */
-    public ActivityStreamsObject getTarget() {
-        return target;
-    }
-
-    /** {@inheritDoc} */
-    public void setTarget(ActivityStreamsObject target) {
-        this.target = target;
-    }
-
-    /** {@inheritDoc} */
-
-    public String getTitle() {
-        return title;
-    }
-
-    /** {@inheritDoc} */
-    public void setTitle(String title) {
-        this.title = title;
-    }
-
-
-    /** {@inheritDoc} */
-
-    public String getVerb() {
-        return verb;
-    }
-
-    /** {@inheritDoc} */
-    public void setVerb(String verb) {
-        this.verb = verb;
-    }
-
-    /** {@inheritDoc} */
-    public String getUserId() {
-        return userId;
-    }
-
-    /** {@inheritDoc} */
-    public void setUserId(String userId) {
-        this.userId = userId;
-    }
-
-    /** {@inheritDoc} */
-    public String getGroupId() {
-        return groupId;
-    }
-
-    /** {@inheritDoc} */
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
-    }
-
-
-    /** {@inheritDoc} */
-    public String getAppId() {
-        return appId;
-    }
-
-    /** {@inheritDoc} */
-    public void setAppId(String appId) {
-        this.appId = appId;
-    }
-
-    /** {@inheritDoc} */
-    public String getBcc() {
-        return bcc;
-    }
-
-    /** {@inheritDoc} */
-    public void setBcc(String bcc) {
-        this.bcc = bcc;
-    }
-
-    /** {@inheritDoc} */
-    public String getBto() {
-        return bto;
-    }
-
-    /** {@inheritDoc} */
-    public void setBto(String bto) {
-        this.bto = bto;
-    }
-
-    /** {@inheritDoc} */
-    public String getCc() {
-        return cc;
-    }
-
-    /** {@inheritDoc} */
-    public void setCc(String cc) {
-        this.cc = cc;
-    }
-
-    /** {@inheritDoc} */
-    public String getContext() {
-        return context;
-    }
-
-    /** {@inheritDoc} */
-    public void setContext(String context) {
-        this.context = context;
-    }
-
-    /** {@inheritDoc} */
-    public String getDc() {
-        return dc;
-    }
-
-    /** {@inheritDoc} */
-    public void setDc(String dc) {
-        this.dc = dc;
-    }
-
-    /** {@inheritDoc} */
-    public Date getEndTime() {
-        return endTime;
-    }
-
-    /** {@inheritDoc} */
-    public void setEndTime(Date endTime) {
-        this.endTime = endTime;
-    }
-
-    /** {@inheritDoc} */
-    public String getGeojson() {
-        return geojson;
-    }
-
-    /** {@inheritDoc} */
-    public void setGeojson(String geojson) {
-        this.geojson = geojson;
-    }
-
-    /** {@inheritDoc} */
-    public String getInReplyTo() {
-        return inReplyTo;
-    }
-
-    /** {@inheritDoc} */
-    public void setInReplyTo(String inReplyTo) {
-        this.inReplyTo = inReplyTo;
-    }
-
-    /** {@inheritDoc} */
-    public String getLd() {
-        return ld;
-    }
-
-    /** {@inheritDoc} */
-    public void setLd(String ld) {
-        this.ld = ld;
-    }
-
-    /** {@inheritDoc} */
-    public String getLinks() {
-        return links;
-    }
-
-    /** {@inheritDoc} */
-    public void setLinks(String links) {
-        this.links = links;
-    }
-
-    /** {@inheritDoc} */
-    public String getLocation() {
-        return location;
-    }
-
-    /** {@inheritDoc} */
-    public void setLocation(String location) {
-        this.location = location;
-    }
-
-    /** {@inheritDoc} */
-    public String getMood() {
-        return mood;
-    }
-
-    /** {@inheritDoc} */
-    public void setMood(String mood) {
-        this.mood = mood;
-    }
-
-    /** {@inheritDoc} */
-    public String getOdata() {
-        return odata;
-    }
-
-    /** {@inheritDoc} */
-    public void setOdata(String odata) {
-        this.odata = odata;
-    }
-
-    /** {@inheritDoc} */
-    public String getOpengraph() {
-        return opengraph;
-    }
-
-    /** {@inheritDoc} */
-    public void setOpengraph(String opengraph) {
-        this.opengraph = opengraph;
-    }
-
-    /** {@inheritDoc} */
-    public String getPriority() {
-        return priority;
-    }
-
-    /** {@inheritDoc} */
-    public void setPriority(String priority) {
-        this.priority = priority;
-    }
-
-    /** {@inheritDoc} */
-    public String getRating() {
-        return rating;
-    }
-
-    /** {@inheritDoc} */
-    public void setRating(String rating) {
-        this.rating = rating;
-    }
-
-    /** {@inheritDoc} */
-    public String getResult() {
-        return result;
-    }
-
-    /** {@inheritDoc} */
-    public void setResult(String result) {
-        this.result = result;
-    }
-
-    /** {@inheritDoc} */
-    public String getSchema_org() {
-        return schema_org;
-    }
-
-    /** {@inheritDoc} */
-    public void setSchema_org(String schema_org) {
-        this.schema_org = schema_org;
-    }
-
-    /** {@inheritDoc} */
-    public String getSource() {
-        return source;
-    }
-
-    /** {@inheritDoc} */
-    public void setSource(String source) {
-        this.source = source;
-    }
-
-    /** {@inheritDoc} */
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    /** {@inheritDoc} */
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    /** {@inheritDoc} */
-    public String getTags() {
-        return tags;
-    }
-
-    /** {@inheritDoc} */
-    public void setTags(String tags) {
-        this.tags = tags;
-    }
-
-    /** {@inheritDoc} */
-    public String getTo() {
-        return to;
-    }
-
-    /** {@inheritDoc} */
-    public void setTo(String to) {
-        this.to = to;
-    }
-
-
-
-    /**
-     * Sorts ActivityEntries in ascending order based on publish date.
-     *
-     * @param that
-     *            is the DeserializableActivityEntry to compare to this DeserializableActivityEntry
-     *
-     * @return int represents how the ActivityEntries compare
-     */
-
-    public int compareTo(ActivityStreamsEntry that) {
-        if (this.getPublished() == null && that.getPublished() == null) {
-            return 0; // both are null, equal
-        } else if (this.getPublished() == null) {
-            return -1; // this is null, comes before real date
-        } else if (that.getPublished() == null) {
-            return 1; // that is null, this comes after
-        } else { // compare publish dates in lexicographical order
-            return this.getPublished().compareTo(that.getPublished());
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        CassandraActivityStreamsEntry that = (CassandraActivityStreamsEntry) o;
-
-        if (actor != null ? !actor.equals(that.actor) : that.actor != null) return false;
-        if (appId != null ? !appId.equals(that.appId) : that.appId != null) return false;
-        if (bcc != null ? !bcc.equals(that.bcc) : that.bcc != null) return false;
-        if (bto != null ? !bto.equals(that.bto) : that.bto != null) return false;
-        if (cc != null ? !cc.equals(that.cc) : that.cc != null) return false;
-        if (content != null ? !content.equals(that.content) : that.content != null) return false;
-        if (context != null ? !context.equals(that.context) : that.context != null) return false;
-        if (dc != null ? !dc.equals(that.dc) : that.dc != null) return false;
-        if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) return false;
-        if (extensions != null ? !extensions.equals(that.extensions) : that.extensions != null) return false;
-        if (generator != null ? !generator.equals(that.generator) : that.generator != null) return false;
-        if (geojson != null ? !geojson.equals(that.geojson) : that.geojson != null) return false;
-        if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return false;
-        if (icon != null ? !icon.equals(that.icon) : that.icon != null) return false;
-        if (id != null ? !id.equals(that.id) : that.id != null) return false;
-        if (inReplyTo != null ? !inReplyTo.equals(that.inReplyTo) : that.inReplyTo != null) return false;
-        if (ld != null ? !ld.equals(that.ld) : that.ld != null) return false;
-        if (links != null ? !links.equals(that.links) : that.links != null) return false;
-        if (location != null ? !location.equals(that.location) : that.location != null) return false;
-        if (mood != null ? !mood.equals(that.mood) : that.mood != null) return false;
-        if (object != null ? !object.equals(that.object) : that.object != null) return false;
-        if (objectType != null ? !objectType.equals(that.objectType) : that.objectType != null) return false;
-        if (odata != null ? !odata.equals(that.odata) : that.odata != null) return false;
-        if (openSocial != null ? !openSocial.equals(that.openSocial) : that.openSocial != null) return false;
-        if (opengraph != null ? !opengraph.equals(that.opengraph) : that.opengraph != null) return false;
-        if (priority != null ? !priority.equals(that.priority) : that.priority != null) return false;
-        if (provider != null ? !provider.equals(that.provider) : that.provider != null) return false;
-        if (published != null ? !published.equals(that.published) : that.published != null) return false;
-        if (rating != null ? !rating.equals(that.rating) : that.rating != null) return false;
-        if (result != null ? !result.equals(that.result) : that.result != null) return false;
-        if (schema_org != null ? !schema_org.equals(that.schema_org) : that.schema_org != null) return false;
-        if (source != null ? !source.equals(that.source) : that.source != null) return false;
-        if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) return false;
-        if (tags != null ? !tags.equals(that.tags) : that.tags != null) return false;
-        if (target != null ? !target.equals(that.target) : that.target != null) return false;
-        if (title != null ? !title.equals(that.title) : that.title != null) return false;
-        if (to != null ? !to.equals(that.to) : that.to != null) return false;
-        if (updated != null ? !updated.equals(that.updated) : that.updated != null) return false;
-        if (url != null ? !url.equals(that.url) : that.url != null) return false;
-        if (userId != null ? !userId.equals(that.userId) : that.userId != null) return false;
-        if (verb != null ? !verb.equals(that.verb) : that.verb != null) return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result1 = id != null ? id.hashCode() : 0;
-        result1 = 31 * result1 + (actor != null ? actor.hashCode() : 0);
-        result1 = 31 * result1 + (content != null ? content.hashCode() : 0);
-        result1 = 31 * result1 + (generator != null ? generator.hashCode() : 0);
-        result1 = 31 * result1 + (icon != null ? icon.hashCode() : 0);
-        result1 = 31 * result1 + (object != null ? object.hashCode() : 0);
-        result1 = 31 * result1 + (provider != null ? provider.hashCode() : 0);
-        result1 = 31 * result1 + (target != null ? target.hashCode() : 0);
-        result1 = 31 * result1 + (title != null ? title.hashCode() : 0);
-        result1 = 31 * result1 + (verb != null ? verb.hashCode() : 0);
-        result1 = 31 * result1 + (userId != null ? userId.hashCode() : 0);
-        result1 = 31 * result1 + (groupId != null ? groupId.hashCode() : 0);
-        result1 = 31 * result1 + (appId != null ? appId.hashCode() : 0);
-        result1 = 31 * result1 + (bcc != null ? bcc.hashCode() : 0);
-        result1 = 31 * result1 + (bto != null ? bto.hashCode() : 0);
-        result1 = 31 * result1 + (cc != null ? cc.hashCode() : 0);
-        result1 = 31 * result1 + (context != null ? context.hashCode() : 0);
-        result1 = 31 * result1 + (dc != null ? dc.hashCode() : 0);
-        result1 = 31 * result1 + (endTime != null ? endTime.hashCode() : 0);
-        result1 = 31 * result1 + (geojson != null ? geojson.hashCode() : 0);
-        result1 = 31 * result1 + (inReplyTo != null ? inReplyTo.hashCode() : 0);
-        result1 = 31 * result1 + (ld != null ? ld.hashCode() : 0);
-        result1 = 31 * result1 + (links != null ? links.hashCode() : 0);
-        result1 = 31 * result1 + (location != null ? location.hashCode() : 0);
-        result1 = 31 * result1 + (mood != null ? mood.hashCode() : 0);
-        result1 = 31 * result1 + (odata != null ? odata.hashCode() : 0);
-        result1 = 31 * result1 + (opengraph != null ? opengraph.hashCode() : 0);
-        result1 = 31 * result1 + (priority != null ? priority.hashCode() : 0);
-        result1 = 31 * result1 + (rating != null ? rating.hashCode() : 0);
-        result1 = 31 * result1 + (result != null ? result.hashCode() : 0);
-        result1 = 31 * result1 + (schema_org != null ? schema_org.hashCode() : 0);
-        result1 = 31 * result1 + (source != null ? source.hashCode() : 0);
-        result1 = 31 * result1 + (startTime != null ? startTime.hashCode() : 0);
-        result1 = 31 * result1 + (tags != null ? tags.hashCode() : 0);
-        result1 = 31 * result1 + (to != null ? to.hashCode() : 0);
-        result1 = 31 * result1 + (published != null ? published.hashCode() : 0);
-        result1 = 31 * result1 + (updated != null ? updated.hashCode() : 0);
-        result1 = 31 * result1 + (url != null ? url.hashCode() : 0);
-        result1 = 31 * result1 + (objectType != null ? objectType.hashCode() : 0);
-        result1 = 31 * result1 + (openSocial != null ? openSocial.hashCode() : 0);
-        result1 = 31 * result1 + (extensions != null ? extensions.hashCode() : 0);
-        return result1;
-    }
+    @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
+    private ActivityStreamsObject actor;
 }

Modified: incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java Fri Aug 30 16:38:16 2013
@@ -1,129 +1,115 @@
 package org.apache.streams.cassandra.repository.impl;
 
-import com.google.common.collect.ImmutableMap;
-import com.netflix.astyanax.AstyanaxContext;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
-import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
-import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
-import com.netflix.astyanax.entitystore.DefaultEntityManager;
-import com.netflix.astyanax.entitystore.EntityManager;
-import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
-import com.netflix.astyanax.model.ColumnFamily;
-import com.netflix.astyanax.serializers.StringSerializer;
-import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+import com.datastax.driver.core.*;
+
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
 import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
 
+import java.util.ArrayList;
 import java.util.List;
 
+
 public class CassandraActivityStreamsRepository {
 
-    private final String KEYSPACE_NAME = "ActivityStreams";
-    private final String CLUSTER_NAME = "Cluster";
-    private final String COLUMN_FAMILY_NAME = "Activities";
+    private final String KEYSPACE_NAME = "streams";
+    private final String TABLE_NAME = "activities";
 
     private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
 
-    private Keyspace keyspace;
-    private AstyanaxContext<Keyspace> context;
-    private ColumnFamily<String, String> columnFamily;
-    private EntityManager<CassandraActivityStreamsEntry, String> entityManager;
+    private Cluster cluster;
+    private Session session;
 
     public CassandraActivityStreamsRepository() {
+        cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
+        session = cluster.connect();
 
-        //Cassandra Context Initialization
-        context = getContext();
-
-        //start the context
-        context.start();
-        keyspace = context.getClient();
-        columnFamily = ColumnFamily.newColumnFamily(COLUMN_FAMILY_NAME, StringSerializer.get(), StringSerializer.get());
-
-        //create the keyspace and column if they don't exist
+        //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS
         try {
-            createKeyspace();
-            createColumn();
-        } catch (ConnectionException e) {
-            LOG.error("An error occured while trying to connect to the database", e);
+            session.execute("CREATE KEYSPACE " + KEYSPACE_NAME + " WITH replication = { 'class': 'SimpleStrategy','replication_factor' : 1 };");
+        } catch (AlreadyExistsException ignored) {
         }
-
-        //initialize entitymanager resposible for translating Entry objects into database ready entries
-        entityManager = new DefaultEntityManager.Builder<CassandraActivityStreamsEntry, String>()
-                .withEntityType(CassandraActivityStreamsEntry.class)
-                .withKeyspace(keyspace)
-                .withColumnFamily(columnFamily)
-                .build();
-    }
-
-    //creates the context object
-    private AstyanaxContext<Keyspace> getContext() {
-        if (context != null) {
-            return context;
-        } else {
-            return new AstyanaxContext.Builder()
-                    .forCluster(CLUSTER_NAME)
-                    .forKeyspace(KEYSPACE_NAME)
-                    .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
-                            .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
-                    )
-                    .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("ActivityStreamsConnectionPool")
-                            .setPort(9160)
-                            .setMaxConnsPerHost(1)
-                            .setSeeds("127.0.0.1:9160")
-                    )
-                    .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
-                    .buildKeyspace(ThriftFamilyFactory.getInstance());
+        //connect to the keyspace
+        session = cluster.connect(KEYSPACE_NAME);
+        try {
+            session.execute("CREATE TABLE " + TABLE_NAME + " (" +
+                    "id text, " +
+                    "published timestamp, " +
+                    "verb text, " +
+                    "actor_displayname text, " +
+                    "actor_id text, " +
+                    "object_displayname text, " +
+                    "object_id text, " +
+                    "target_displayname text, " +
+                    "target_id text, " +
+                    "PRIMARY KEY (id, published));");
+        } catch (AlreadyExistsException ignored) {
         }
     }
 
-    //creates the ActivityStreams Keyspace if it doesn't exist already
-    private void createKeyspace() throws ConnectionException {
-        //try describing the Keyspace, a BadRequestException will be thrown if the keyspace does not exist
-        try {
-            keyspace.describeKeyspace();
-        } catch (BadRequestException e1) {
-            //the keyspace does not exist
-            //create the keyspace
-            keyspace.createKeyspace(ImmutableMap.<String, Object>builder()
-                    .put("strategy_options", ImmutableMap.<String, Object>builder()
-                            .put("replication_factor", "1")
-                            .build())
-                    .put("strategy_class", "SimpleStrategy")
-                    .build()
-            );
+    public void save(ActivityStreamsEntry entry) {
+        String sql = "INSERT INTO " + TABLE_NAME + " (id, published, verb, actor_displayname, actor_id, object_displayname, object_id, target_displayname, target_id) VALUES ('" +
+                entry.getId() + "','" +
+                entry.getPublished().getTime() + "','" +
+                entry.getVerb() + "','" +
+                entry.getActor().getDisplayName() + "','" +
+                entry.getActor().getId() + "','" +
+                entry.getTarget().getDisplayName() + "','" +
+                entry.getTarget().getId() + "','" +
+                entry.getObject().getDisplayName() + "','" +
+                entry.getObject().getId() + "')";
+        session.execute(sql);
+    }
+
+    public List<ActivityStreamsEntry> getActivitiesForQuery(String cql) {
+        ResultSet set = session.execute(cql);
+        List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>();
+        for (Row row : set) {
+            ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
+            ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
+            ActivityStreamsObject target = new ActivityStreamsObjectImpl();
+            ActivityStreamsObject object = new ActivityStreamsObjectImpl();
+
+            actor.setDisplayName(row.getString("actor_displayname"));
+            actor.setUrl(row.getString("actor_id"));
+
+            target.setDisplayName(row.getString("target_displayname"));
+            target.setUrl(row.getString("target_id"));
+
+            object.setDisplayName(row.getString("object_displayname"));
+            target.setUrl(row.getString("object_id"));
+
+            entry.setPublished(row.getDate("published"));
+            entry.setVerb(row.getString("verb"));
+            entry.setId(row.getString("id"));
+            entry.setActor(actor);
+            entry.setTarget(target);
+            entry.setObject(object);
+
+            results.add(entry);
         }
+
+        return results;
     }
 
-    //creates the Activities column if it doesn't exist already
-    private void createColumn() throws ConnectionException {
-        if (keyspace.describeKeyspace().getColumnFamily(COLUMN_FAMILY_NAME) == null) {
-            //the column does not exist
-            keyspace.createColumnFamily(columnFamily, ImmutableMap.<String, Object>builder()
-                    .put("default_validation_class", "UTF8Type")
-                    .put("key_validation_class", "UTF8Type")
-                    .put("comparator_type", "UTF8Type")
-                    .build());
-        }
+    public void dropTable(String table){
+        String cql = "DROP TABLE " + table;
+        session.execute(cql);
     }
 
-    public void save(CassandraActivityStreamsEntry entry) {
+
+    @Override
+    protected void finalize() throws Throwable {
         try {
-            // Inserting data
-            entityManager.put(entry);
-            LOG.info("Insertion of the entry with the id, " + entry.getId() + ", was a success");
-        } catch (Exception e) {
-            LOG.info("An error occured while inserting the entry with id, " + entry.getId(), e);
+            cluster.shutdown();
+        } finally {
+            super.finalize();
         }
     }
 
-    public List<CassandraActivityStreamsEntry> getActivitiesForQuery(String query) {
-            //return entities that match the given cql query
-            LOG.info("executing the query: "+query);
-            return entityManager.find(query);
-    }
-
 }

Modified: incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java Fri Aug 30 16:38:16 2013
@@ -1,11 +1,14 @@
 package org.apache.streams.cassandra.repository.impl;
 
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.model.CqlResult;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
 import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Date;
 import java.util.List;
 
 public class CassandraActivityStreamsRepositoryTest {
@@ -19,9 +22,40 @@ public class CassandraActivityStreamsRep
     }
 
     @Test
-    public void getActivitiesForQuery() {
-        String cql = "select * from Activities";
-        List<CassandraActivityStreamsEntry> list= repository.getActivitiesForQuery(cql);
-        assert(list != null);
+    public void saveActivity() {
+        ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
+        ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
+        ActivityStreamsObject target = new ActivityStreamsObjectImpl();
+        ActivityStreamsObject object = new ActivityStreamsObjectImpl();
+
+        actor.setId("actorid1");
+        actor.setDisplayName("actorname1");
+
+        target.setId("targetid1");
+        target.setDisplayName("targetname1");
+
+        object.setId("objectid1");
+        object.setDisplayName("objectname1");
+
+        entry.setId("one");
+        entry.setVerb("verb1");
+        Date d = new Date();
+        entry.setPublished(d);
+        entry.setActor(actor);
+        entry.setObject(object);
+        entry.setTarget(target);
+
+        //repository.save(entry);
+    }
+
+    @Test
+    public void getActivity() {
+        String cql = "SELECT * FROM coltest WHERE published > '2010-10-10' LIMIT 1 ALLOW FILTERING";
+        List<ActivityStreamsEntry> results = repository.getActivitiesForQuery(cql);
+    }
+
+    @Test
+    public void dropTableTest(){
+        //repository.dropTable("coltest");
     }
 }

Modified: incubator/streams/branches/cassandra/streams-eip-routes/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/pom.xml Fri Aug 30 16:38:16 2013
@@ -95,7 +95,7 @@
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core</artifactId>
-            <version>2.8.5</version>
+            <version>2.9.0</version>
         </dependency>
 
         <dependency>
@@ -189,33 +189,10 @@
             <scope>provided</scope>
         </dependency>
 
-        <!--astyanax-->
         <dependency>
-            <groupId>com.netflix.astyanax</groupId>
-            <artifactId>astyanax-core</artifactId>
-            <version>1.56.43-SNAPSHOT</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.netflix.astyanax</groupId>
-            <artifactId>astyanax-thrift</artifactId>
-            <version>1.56.43-SNAPSHOT</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.netflix.astyanax</groupId>
-            <artifactId>astyanax-cassandra</artifactId>
-            <version>1.56.43-SNAPSHOT</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.netflix.astyanax</groupId>
-            <artifactId>astyanax-entity-mapper</artifactId>
-            <version>1.56.43-SNAPSHOT</version>
-            <scope>provided</scope>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>3.1</version>
         </dependency>
     </dependencies>
 

Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java Fri Aug 30 16:38:16 2013
@@ -1,23 +1,15 @@
 package org.apache.streams.messaging.aggregation;
 
 
-import org.apache.camel.Exchange;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
 import org.apache.streams.messaging.service.impl.CassandraActivityService;
 import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
 import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
 import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
-import org.codehaus.jackson.map.ObjectMapper;
 import org.springframework.scheduling.annotation.Scheduled;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 public class ActivityAggregator {
 
@@ -39,8 +31,10 @@ public class ActivityAggregator {
             Set<String> activities = new HashSet<String>();
             for (ActivityStreamsSubscriptionFilter filter: subscriber.getActivityStreamsSubscriberConfiguration().getActivityStreamsSubscriptionFilters()){
                 //send the query of each filter to the service to receive the activities of that filter
-                activities.addAll(activityService.getActivitiesForQuery(filter.getQuery()));
+                activities.addAll(activityService.getActivitiesForQuery(filter.getQuery() + " WHERE published > " + subscriber.getLastUpdated().getTime() + " LIMIT 10 ALLOW FILTERING"));
             }
+            //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost
+            subscriber.setLastUpdated(new Date());
             subscriber.receive(new ArrayList<String>(activities));
         }
     }

Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java Fri Aug 30 16:38:16 2013
@@ -3,6 +3,7 @@ package org.apache.streams.messaging.ser
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.camel.Exchange;
+import org.apache.rave.model.ActivityStreamsEntry;
 import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
 import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
 import org.apache.streams.messaging.service.ActivityService;
@@ -11,6 +12,7 @@ import org.codehaus.jackson.map.ObjectMa
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 public class CassandraActivityService implements ActivityService {
@@ -40,7 +42,8 @@ public class CassandraActivityService im
             String activityJson = e.getIn().getBody(String.class);
 
             try {
-                CassandraActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class);
+                ActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class);
+                streamsEntry.setPublished(new Date());
                 cassandraActivityStreamsRepository.save(streamsEntry);
             } catch (IOException err) {
                 LOG.error("there was an error while converting the json string to an object and saving to the database", err);
@@ -50,13 +53,13 @@ public class CassandraActivityService im
     }
 
     public List<String> getActivitiesForQuery(String query) {
-        List<CassandraActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForQuery(query);
+        List<ActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForQuery(query);
         return getJsonList(activityObjects);
     }
 
-    private List<String> getJsonList(List<CassandraActivityStreamsEntry> activities) {
+    private List<String> getJsonList(List<ActivityStreamsEntry> activities) {
         List<String> jsonList = new ArrayList<String>();
-        for (CassandraActivityStreamsEntry entry : activities) {
+        for (ActivityStreamsEntry entry : activities) {
             try {
                 jsonList.add(mapper.writeValueAsString(entry));
             } catch (IOException e) {

Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java Fri Aug 30 16:38:16 2013
@@ -1,8 +1,13 @@
 package org.apache.streams.messaging.service.impl;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.*;
+
+import java.util.ArrayList;
 import java.util.List;
 
 public class CassandraActivityServiceTest {
@@ -16,7 +21,44 @@ public class CassandraActivityServiceTes
 
     @Test
     public void getActivititiesForQueryTest(){
-        List<String> activities = cassandraActivityService.getActivitiesForQuery("select * from Activities");
-        assert(activities != null);
+        //List<String> activities = cassandraActivityService.getActivitiesForQuery("select * from Activities");
+        //assert(activities != null);
+    }
+
+    @Test
+    public void receiveExchangeTest(){
+        Exchange e = createMock(Exchange.class);
+        List<Exchange> grouped = new ArrayList<Exchange>();
+        Exchange e2 = createMock(Exchange.class);
+        grouped.add(e2);
+        Message m = createMock(Message.class);
+
+        String activityJson = "{\n" +
+                "\"id\":\"id2\",\n" +
+                "\"verb\":\"verb2\",\n" +
+                "\"displayName\":\"displayname2\",\n" +
+                "\"target\":{\n" +
+                "\t\"id\":\"targetid2\",\n" +
+                "\t\"displayName\":\"targetname2\"\n" +
+                "\t},\n" +
+                "\t\"object\":{\n" +
+                "\t\"id\":\"objectid2\",\n" +
+                "\t\"displayName\":\"objectname2\"\n" +
+                "\t},\n" +
+                "\t\"actor\":{\n" +
+                "\t\"id\":\"actorid2\",\n" +
+                "\t\"displayName\":\"actorname2\"\n" +
+                "\t}\n" +
+                "\t\n" +
+                "\t}";
+
+        expect(e.getProperty(Exchange.GROUPED_EXCHANGE, List.class)).andReturn(grouped);
+        expect(e2.getIn()).andReturn(m);
+        expect(m.getBody(String.class)).andReturn(activityJson);
+
+        replay(e, e2, m);
+
+        //cassandraActivityService.receiveExchange(e);
+        List<String> myTest = cassandraActivityService.getActivitiesForQuery("select * from coltest");
     }
 }

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml Fri Aug 30 16:38:16 2013
@@ -129,22 +129,19 @@
         <dependency>
             <groupId>org.apache.rave</groupId>
             <artifactId>rave-core</artifactId>
-            <version>0.22-SNAPSHOT</version>
-            <scope>provided</scope>
+            <version>${rave.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.rave</groupId>
             <artifactId>rave-core-api</artifactId>
-            <version>0.22-SNAPSHOT</version>
-            <scope>provided</scope>
+            <version>${rave.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-cassandra</artifactId>
             <version>0.1-SNAPSHOT</version>
-            <scope>provided</scope>
         </dependency>
 
     </dependencies>

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java Fri Aug 30 16:38:16 2013
@@ -1,6 +1,7 @@
 package org.apache.streams.osgi.components.activitysubscriber;
 
 
+import java.util.Date;
 import java.util.List;
 
 public interface ActivityStreamsSubscriber {
@@ -14,4 +15,6 @@ public interface ActivityStreamsSubscrib
     public boolean isAuthenticated();
     public void setAuthenticated(boolean authenticated);
     public ActivityStreamsSubscription getActivityStreamsSubscriberConfiguration();
+    Date getLastUpdated();
+    void setLastUpdated(Date lastUpdated);
 }

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java Fri Aug 30 16:38:16 2013
@@ -8,6 +8,7 @@ import org.codehaus.jackson.map.Deserial
 import org.codehaus.jackson.map.ObjectMapper;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 public class ActivityStreamsSubscriberDelegate implements ActivityStreamsSubscriber {
@@ -23,10 +24,13 @@ public class ActivityStreamsSubscriberDe
     //an individual subscriber gets ONE stream which is an aggregation of all its SRCs
     private ArrayList<String> stream;
 
+    private Date lastUpdated;
+
 
     public ActivityStreamsSubscriberDelegate(ActivityStreamsSubscription configuration){
         setActivityStreamsSubscriberConfiguration(configuration);
         stream = new ArrayList<String>();
+        lastUpdated = new Date(0);
     }
 
 
@@ -83,7 +87,13 @@ public class ActivityStreamsSubscriberDe
         return stream.toString();
     }
 
+    public Date getLastUpdated() {
+        return lastUpdated;
+    }
 
+    public void setLastUpdated(Date lastUpdated) {
+        this.lastUpdated = lastUpdated;
+    }
 
     public void init(){
         //any initialization... gets called directly after registration

Modified: incubator/streams/branches/cassandra/streams-web/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-web/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-web/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-web/pom.xml Fri Aug 30 16:38:16 2013
@@ -48,7 +48,6 @@
          <groupId>org.apache.streams</groupId>
          <artifactId>streams-eip-routes</artifactId>
          <version>${project.version}</version>
-         <type>bundle</type>
      </dependency>
 
       <dependency>
@@ -143,31 +142,6 @@
           <version>${spring.version}</version>
       </dependency>
 
-      <!--astyanax-->
-      <dependency>
-          <groupId>com.netflix.astyanax</groupId>
-          <artifactId>astyanax-core</artifactId>
-          <version>1.56.43-SNAPSHOT</version>
-      </dependency>
-
-      <dependency>
-          <groupId>com.netflix.astyanax</groupId>
-          <artifactId>astyanax-thrift</artifactId>
-          <version>1.56.43-SNAPSHOT</version>
-      </dependency>
-
-      <dependency>
-          <groupId>com.netflix.astyanax</groupId>
-          <artifactId>astyanax-cassandra</artifactId>
-          <version>1.56.43-SNAPSHOT</version>
-      </dependency>
-
-      <dependency>
-          <groupId>com.netflix.astyanax</groupId>
-          <artifactId>astyanax-entity-mapper</artifactId>
-          <version>1.56.43-SNAPSHOT</version>
-      </dependency>
-
       <!--rave-->
       <dependency>
           <groupId>org.apache.rave</groupId>