You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/08/30 18:38:17 UTC
svn commit: r1519019 - in /incubator/streams/branches/cassandra: ./
streams-cassandra/
streams-cassandra/src/main/java/org/apache/streams/cassandra/model/
streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/
streams-cassandra/s...
Author: dsullivan
Date: Fri Aug 30 16:38:16 2013
New Revision: 1519019
URL: http://svn.apache.org/r1519019
Log:
changed from astyanax to datastax driver. After subscribing, the subscribers stream is populated with 10 streams from the database.
Modified:
incubator/streams/branches/cassandra/pom.xml
incubator/streams/branches/cassandra/streams-cassandra/pom.xml
incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
incubator/streams/branches/cassandra/streams-eip-routes/pom.xml
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
incubator/streams/branches/cassandra/streams-web/pom.xml
Modified: incubator/streams/branches/cassandra/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/pom.xml (original)
+++ incubator/streams/branches/cassandra/pom.xml Fri Aug 30 16:38:16 2013
@@ -17,57 +17,60 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
- <parent>
+ <parent>
<groupId>org.apache.streams</groupId>
<artifactId>streams-master</artifactId>
<version>0.2-incubating-SNAPSHOT</version>
- </parent>
+ </parent>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-project</artifactId>
- <version>0.1-SNAPSHOT</version>
-
- <name>Apache Streams Project</name>
-
- <description>Apache Streams Project</description>
-
- <properties>
- <org.osgi.service.http.port>8080</org.osgi.service.http.port>
- <org.osgi.service.http.port.secure>8443</org.osgi.service.http.port.secure>
- </properties>
-
- <packaging>pom</packaging>
-
- <modules>
- <module>poms</module>
- <module>provision</module>
- <module>streams-osgi-components</module>
- <module>streams-eip-routes</module>
- <module>streams-cassandra</module>
- <module>streams-web</module>
- </modules>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.ops4j</groupId>
- <artifactId>maven-pax-plugin</artifactId>
- <version>1.5</version>
- <extensions>true</extensions>
- <configuration>
- <compilerVersion>1.5</compilerVersion>
- <provision>
- <param>--platform=felix</param>
- </provision>
- </configuration>
- <executions>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-project</artifactId>
+ <version>0.1-SNAPSHOT</version>
+
+ <name>Apache Streams Project</name>
+
+ <description>Apache Streams Project</description>
+
+ <properties>
+ <org.osgi.service.http.port>8080</org.osgi.service.http.port>
+ <org.osgi.service.http.port.secure>8443</org.osgi.service.http.port.secure>
+ <rave.version>0.22-SNAPSHOT</rave.version>
+ <datastax.version>1.0.2</datastax.version>
+ </properties>
+
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>poms</module>
+ <module>provision</module>
+ <module>streams-osgi-components</module>
+ <module>streams-eip-routes</module>
+ <module>streams-cassandra</module>
+ <module>streams-web</module>
+ </modules>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.ops4j</groupId>
+ <artifactId>maven-pax-plugin</artifactId>
+ <version>1.5</version>
+ <extensions>true</extensions>
+ <configuration>
+ <compilerVersion>1.5</compilerVersion>
+ <provision>
+ <param>--platform=felix</param>
+ </provision>
+ </configuration>
+ <executions>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
Modified: incubator/streams/branches/cassandra/streams-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/pom.xml Fri Aug 30 16:38:16 2013
@@ -58,12 +58,12 @@
<Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
<Bundle-Version>${project.version}</Bundle-Version>
<Export-Package>
- ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model, org.apache.streams.cassandra.service
+ ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model
</Export-Package>
- <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model, ${bundle.namespace}.cassandra.service </Private-Package>
+ <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model </Private-Package>
<Import-Package>
org.apache.rave.model,org.apache.rave.portal.model.impl,
- com.netflix.astyanax, com.netflix.astyanax.connectionpool, com.netflix.astyanax.connectionpool.impl, com.netflix.astyanax.impl, com.netflix.astyanax.thrift, com.netflix.astyanax.connectionpool.exceptions, com.netflix.astyanax.model, com.netflix.astyanax.serializers, com.netflix.astyanax.entitystore, com.netflix.astyanax.ddl,
+ com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map
</Import-Package>
</instructions>
@@ -76,50 +76,31 @@
<dependency>
<groupId>org.apache.rave</groupId>
<artifactId>rave-core-api</artifactId>
- <version>0.22-SNAPSHOT</version>
- <scope>provided</scope>
+ <version>${rave.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rave</groupId>
<artifactId>rave-core</artifactId>
- <version>0.22-SNAPSHOT</version>
- <scope>provided</scope>
+ <version>${rave.version}</version>
</dependency>
- <!--astyanax-->
<dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-core</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-thrift</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-cassandra</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- <scope>provided</scope>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
</dependency>
<dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-entity-mapper</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- <scope>provided</scope>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${datastax.version}</version>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.8.2</version>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.9.Final</version>
</dependency>
</dependencies>
Modified: incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java Fri Aug 30 16:38:16 2013
@@ -1,688 +1,17 @@
package org.apache.streams.cassandra.model;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsMediaLink;
import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
+import org.codehaus.jackson.map.annotate.JsonDeserialize;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import java.io.Serializable;
-import java.util.Date;
-import java.util.Map;
-import java.util.logging.Logger;
-
-@Entity
-public class CassandraActivityStreamsEntry implements ActivityStreamsEntry, Serializable{
- private static final long serialVersionUID = 1L;
- private static Logger log = Logger.getLogger(CassandraActivityStreamsEntry.class.getName());
-
- @Id
- private String id;
-
- @Column
- private ActivityStreamsObject actor;
-
- @Column
- private String content;
-
- @Column
- private ActivityStreamsObject generator;
-
- @Column
- private ActivityStreamsMediaLink icon;
-
- @Column
+public class CassandraActivityStreamsEntry extends ActivityStreamsEntryImpl{
+ @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
private ActivityStreamsObject object;
- @Column
- private ActivityStreamsObject provider;
-
- @Column
+ @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
private ActivityStreamsObject target;
- @Column
- private String title;
-
- @Column
- private String verb;
-
- //The user who verb'd this activity
- @Column
- private String userId;
-
- //If this activity was generated as part of a group, this indicates the group's id
- @Column
- private String groupId;
-
- //The id of the application that published this activity
- @Column
- private String appId;
-
- @Column
- private String bcc;
-
- @Column
- private String bto;
-
- @Column
- private String cc;
-
- @Column
- private String context;
-
- @Column
- private String dc;
-
- private Date endTime;
-
- @Column
- private String geojson;
-
- @Column
- private String inReplyTo;
-
- @Column
- private String ld;
-
- @Column
- private String links;
-
- @Column
- private String location;
-
- @Column
- private String mood;
-
- @Column
- private String odata;
-
- @Column
- private String opengraph;
-
- @Column
- private String priority;
-
- @Column
- private String rating;
-
- @Column
- private String result;
-
- @Column
- private String schema_org;
-
- @Column
- private String source;
-
- @Column
- private Date startTime;
-
- @Column
- private String tags;
-
- @Column
- private String to;
-
- @Column
- private Date published;
-
- @Column
- private Date updated;
-
- @Column
- private String url;
-
- @Column
- private String objectType;
-
- private Map openSocial;
-
- private Map extensions;
-
- /**
- * Create a new empty DeserializableActivityEntry
- */
- public CassandraActivityStreamsEntry() {
- }
-
- /** {@inheritDoc} */
-
- public Date getPublished() {
- return published;
- }
-
- /** {@inheritDoc} */
- public void setPublished(Date published) {
- this.published = published;
- }
-
- public Date getUpdated(){
- return updated;
- }
-
- public void setUpdated(Date updated){
- this.updated=updated;
- }
-
- public String getUrl(){
- return this.url;
- }
-
- public void setUrl(String url){
- this.url=url;
- }
-
- public String getObjectType(){
- return this.objectType;
- }
-
- public void setObjectType(String objectType){
- this.objectType=objectType;
- }
-
- /** {@inheritDoc} */
-
- public Map getOpenSocial() {
- return openSocial;
- }
-
- /**
- * {@inheritDoc}
- */
- public void setOpenSocial(Map openSocial) {
-
- this.openSocial = openSocial;
- }
-
- /** {@inheritDoc} */
-
- public Map getExtensions() {
- return extensions;
- }
-
- /** {@inheritDoc} */
- public void setExtensions(Map extensions) {
-
-
- this.extensions = extensions;
- }
-
- public ActivityStreamsObject getActor() {
- return actor;
- }
-
- /** {@inheritDoc} */
- public void setActor(ActivityStreamsObject actor) {
- this.actor = actor;
- }
-
- /** {@inheritDoc} */
- public String getContent() {
- return content;
- }
-
- /** {@inheritDoc} */
- public void setContent(String content) {
- this.content = content;
- }
-
-
- /** {@inheritDoc} */
- public ActivityStreamsObject getGenerator() {
- return generator;
- }
-
- /** {@inheritDoc} */
- public void setGenerator(ActivityStreamsObject generator) {
- this.generator = generator;
- }
-
- /** {@inheritDoc} */
- public ActivityStreamsMediaLink getIcon() {
- return icon;
- }
-
- /** {@inheritDoc} */
- public void setIcon(ActivityStreamsMediaLink icon) {
- this.icon = icon;
- }
-
- /** {@inheritDoc} */
-
- public String getId() {
- return id;
- }
-
- /** {@inheritDoc} */
- public void setId(String id) {
- this.id = id;
- }
-
- /** {@inheritDoc} */
- public ActivityStreamsObject getObject() {
- return object;
- }
-
- /** {@inheritDoc} */
- public void setObject(ActivityStreamsObject object) {
- this.object = object;
- }
-
-
-
- /** {@inheritDoc} */
- public ActivityStreamsObject getProvider() {
- return provider;
- }
-
- /** {@inheritDoc} */
- public void setProvider(ActivityStreamsObject provider) {
- this.provider = provider;
- }
-
- /** {@inheritDoc} */
- public ActivityStreamsObject getTarget() {
- return target;
- }
-
- /** {@inheritDoc} */
- public void setTarget(ActivityStreamsObject target) {
- this.target = target;
- }
-
- /** {@inheritDoc} */
-
- public String getTitle() {
- return title;
- }
-
- /** {@inheritDoc} */
- public void setTitle(String title) {
- this.title = title;
- }
-
-
- /** {@inheritDoc} */
-
- public String getVerb() {
- return verb;
- }
-
- /** {@inheritDoc} */
- public void setVerb(String verb) {
- this.verb = verb;
- }
-
- /** {@inheritDoc} */
- public String getUserId() {
- return userId;
- }
-
- /** {@inheritDoc} */
- public void setUserId(String userId) {
- this.userId = userId;
- }
-
- /** {@inheritDoc} */
- public String getGroupId() {
- return groupId;
- }
-
- /** {@inheritDoc} */
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-
-
- /** {@inheritDoc} */
- public String getAppId() {
- return appId;
- }
-
- /** {@inheritDoc} */
- public void setAppId(String appId) {
- this.appId = appId;
- }
-
- /** {@inheritDoc} */
- public String getBcc() {
- return bcc;
- }
-
- /** {@inheritDoc} */
- public void setBcc(String bcc) {
- this.bcc = bcc;
- }
-
- /** {@inheritDoc} */
- public String getBto() {
- return bto;
- }
-
- /** {@inheritDoc} */
- public void setBto(String bto) {
- this.bto = bto;
- }
-
- /** {@inheritDoc} */
- public String getCc() {
- return cc;
- }
-
- /** {@inheritDoc} */
- public void setCc(String cc) {
- this.cc = cc;
- }
-
- /** {@inheritDoc} */
- public String getContext() {
- return context;
- }
-
- /** {@inheritDoc} */
- public void setContext(String context) {
- this.context = context;
- }
-
- /** {@inheritDoc} */
- public String getDc() {
- return dc;
- }
-
- /** {@inheritDoc} */
- public void setDc(String dc) {
- this.dc = dc;
- }
-
- /** {@inheritDoc} */
- public Date getEndTime() {
- return endTime;
- }
-
- /** {@inheritDoc} */
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- /** {@inheritDoc} */
- public String getGeojson() {
- return geojson;
- }
-
- /** {@inheritDoc} */
- public void setGeojson(String geojson) {
- this.geojson = geojson;
- }
-
- /** {@inheritDoc} */
- public String getInReplyTo() {
- return inReplyTo;
- }
-
- /** {@inheritDoc} */
- public void setInReplyTo(String inReplyTo) {
- this.inReplyTo = inReplyTo;
- }
-
- /** {@inheritDoc} */
- public String getLd() {
- return ld;
- }
-
- /** {@inheritDoc} */
- public void setLd(String ld) {
- this.ld = ld;
- }
-
- /** {@inheritDoc} */
- public String getLinks() {
- return links;
- }
-
- /** {@inheritDoc} */
- public void setLinks(String links) {
- this.links = links;
- }
-
- /** {@inheritDoc} */
- public String getLocation() {
- return location;
- }
-
- /** {@inheritDoc} */
- public void setLocation(String location) {
- this.location = location;
- }
-
- /** {@inheritDoc} */
- public String getMood() {
- return mood;
- }
-
- /** {@inheritDoc} */
- public void setMood(String mood) {
- this.mood = mood;
- }
-
- /** {@inheritDoc} */
- public String getOdata() {
- return odata;
- }
-
- /** {@inheritDoc} */
- public void setOdata(String odata) {
- this.odata = odata;
- }
-
- /** {@inheritDoc} */
- public String getOpengraph() {
- return opengraph;
- }
-
- /** {@inheritDoc} */
- public void setOpengraph(String opengraph) {
- this.opengraph = opengraph;
- }
-
- /** {@inheritDoc} */
- public String getPriority() {
- return priority;
- }
-
- /** {@inheritDoc} */
- public void setPriority(String priority) {
- this.priority = priority;
- }
-
- /** {@inheritDoc} */
- public String getRating() {
- return rating;
- }
-
- /** {@inheritDoc} */
- public void setRating(String rating) {
- this.rating = rating;
- }
-
- /** {@inheritDoc} */
- public String getResult() {
- return result;
- }
-
- /** {@inheritDoc} */
- public void setResult(String result) {
- this.result = result;
- }
-
- /** {@inheritDoc} */
- public String getSchema_org() {
- return schema_org;
- }
-
- /** {@inheritDoc} */
- public void setSchema_org(String schema_org) {
- this.schema_org = schema_org;
- }
-
- /** {@inheritDoc} */
- public String getSource() {
- return source;
- }
-
- /** {@inheritDoc} */
- public void setSource(String source) {
- this.source = source;
- }
-
- /** {@inheritDoc} */
- public Date getStartTime() {
- return startTime;
- }
-
- /** {@inheritDoc} */
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- /** {@inheritDoc} */
- public String getTags() {
- return tags;
- }
-
- /** {@inheritDoc} */
- public void setTags(String tags) {
- this.tags = tags;
- }
-
- /** {@inheritDoc} */
- public String getTo() {
- return to;
- }
-
- /** {@inheritDoc} */
- public void setTo(String to) {
- this.to = to;
- }
-
-
-
- /**
- * Sorts ActivityEntries in ascending order based on publish date.
- *
- * @param that
- * is the DeserializableActivityEntry to compare to this DeserializableActivityEntry
- *
- * @return int represents how the ActivityEntries compare
- */
-
- public int compareTo(ActivityStreamsEntry that) {
- if (this.getPublished() == null && that.getPublished() == null) {
- return 0; // both are null, equal
- } else if (this.getPublished() == null) {
- return -1; // this is null, comes before real date
- } else if (that.getPublished() == null) {
- return 1; // that is null, this comes after
- } else { // compare publish dates in lexicographical order
- return this.getPublished().compareTo(that.getPublished());
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- CassandraActivityStreamsEntry that = (CassandraActivityStreamsEntry) o;
-
- if (actor != null ? !actor.equals(that.actor) : that.actor != null) return false;
- if (appId != null ? !appId.equals(that.appId) : that.appId != null) return false;
- if (bcc != null ? !bcc.equals(that.bcc) : that.bcc != null) return false;
- if (bto != null ? !bto.equals(that.bto) : that.bto != null) return false;
- if (cc != null ? !cc.equals(that.cc) : that.cc != null) return false;
- if (content != null ? !content.equals(that.content) : that.content != null) return false;
- if (context != null ? !context.equals(that.context) : that.context != null) return false;
- if (dc != null ? !dc.equals(that.dc) : that.dc != null) return false;
- if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) return false;
- if (extensions != null ? !extensions.equals(that.extensions) : that.extensions != null) return false;
- if (generator != null ? !generator.equals(that.generator) : that.generator != null) return false;
- if (geojson != null ? !geojson.equals(that.geojson) : that.geojson != null) return false;
- if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return false;
- if (icon != null ? !icon.equals(that.icon) : that.icon != null) return false;
- if (id != null ? !id.equals(that.id) : that.id != null) return false;
- if (inReplyTo != null ? !inReplyTo.equals(that.inReplyTo) : that.inReplyTo != null) return false;
- if (ld != null ? !ld.equals(that.ld) : that.ld != null) return false;
- if (links != null ? !links.equals(that.links) : that.links != null) return false;
- if (location != null ? !location.equals(that.location) : that.location != null) return false;
- if (mood != null ? !mood.equals(that.mood) : that.mood != null) return false;
- if (object != null ? !object.equals(that.object) : that.object != null) return false;
- if (objectType != null ? !objectType.equals(that.objectType) : that.objectType != null) return false;
- if (odata != null ? !odata.equals(that.odata) : that.odata != null) return false;
- if (openSocial != null ? !openSocial.equals(that.openSocial) : that.openSocial != null) return false;
- if (opengraph != null ? !opengraph.equals(that.opengraph) : that.opengraph != null) return false;
- if (priority != null ? !priority.equals(that.priority) : that.priority != null) return false;
- if (provider != null ? !provider.equals(that.provider) : that.provider != null) return false;
- if (published != null ? !published.equals(that.published) : that.published != null) return false;
- if (rating != null ? !rating.equals(that.rating) : that.rating != null) return false;
- if (result != null ? !result.equals(that.result) : that.result != null) return false;
- if (schema_org != null ? !schema_org.equals(that.schema_org) : that.schema_org != null) return false;
- if (source != null ? !source.equals(that.source) : that.source != null) return false;
- if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) return false;
- if (tags != null ? !tags.equals(that.tags) : that.tags != null) return false;
- if (target != null ? !target.equals(that.target) : that.target != null) return false;
- if (title != null ? !title.equals(that.title) : that.title != null) return false;
- if (to != null ? !to.equals(that.to) : that.to != null) return false;
- if (updated != null ? !updated.equals(that.updated) : that.updated != null) return false;
- if (url != null ? !url.equals(that.url) : that.url != null) return false;
- if (userId != null ? !userId.equals(that.userId) : that.userId != null) return false;
- if (verb != null ? !verb.equals(that.verb) : that.verb != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result1 = id != null ? id.hashCode() : 0;
- result1 = 31 * result1 + (actor != null ? actor.hashCode() : 0);
- result1 = 31 * result1 + (content != null ? content.hashCode() : 0);
- result1 = 31 * result1 + (generator != null ? generator.hashCode() : 0);
- result1 = 31 * result1 + (icon != null ? icon.hashCode() : 0);
- result1 = 31 * result1 + (object != null ? object.hashCode() : 0);
- result1 = 31 * result1 + (provider != null ? provider.hashCode() : 0);
- result1 = 31 * result1 + (target != null ? target.hashCode() : 0);
- result1 = 31 * result1 + (title != null ? title.hashCode() : 0);
- result1 = 31 * result1 + (verb != null ? verb.hashCode() : 0);
- result1 = 31 * result1 + (userId != null ? userId.hashCode() : 0);
- result1 = 31 * result1 + (groupId != null ? groupId.hashCode() : 0);
- result1 = 31 * result1 + (appId != null ? appId.hashCode() : 0);
- result1 = 31 * result1 + (bcc != null ? bcc.hashCode() : 0);
- result1 = 31 * result1 + (bto != null ? bto.hashCode() : 0);
- result1 = 31 * result1 + (cc != null ? cc.hashCode() : 0);
- result1 = 31 * result1 + (context != null ? context.hashCode() : 0);
- result1 = 31 * result1 + (dc != null ? dc.hashCode() : 0);
- result1 = 31 * result1 + (endTime != null ? endTime.hashCode() : 0);
- result1 = 31 * result1 + (geojson != null ? geojson.hashCode() : 0);
- result1 = 31 * result1 + (inReplyTo != null ? inReplyTo.hashCode() : 0);
- result1 = 31 * result1 + (ld != null ? ld.hashCode() : 0);
- result1 = 31 * result1 + (links != null ? links.hashCode() : 0);
- result1 = 31 * result1 + (location != null ? location.hashCode() : 0);
- result1 = 31 * result1 + (mood != null ? mood.hashCode() : 0);
- result1 = 31 * result1 + (odata != null ? odata.hashCode() : 0);
- result1 = 31 * result1 + (opengraph != null ? opengraph.hashCode() : 0);
- result1 = 31 * result1 + (priority != null ? priority.hashCode() : 0);
- result1 = 31 * result1 + (rating != null ? rating.hashCode() : 0);
- result1 = 31 * result1 + (result != null ? result.hashCode() : 0);
- result1 = 31 * result1 + (schema_org != null ? schema_org.hashCode() : 0);
- result1 = 31 * result1 + (source != null ? source.hashCode() : 0);
- result1 = 31 * result1 + (startTime != null ? startTime.hashCode() : 0);
- result1 = 31 * result1 + (tags != null ? tags.hashCode() : 0);
- result1 = 31 * result1 + (to != null ? to.hashCode() : 0);
- result1 = 31 * result1 + (published != null ? published.hashCode() : 0);
- result1 = 31 * result1 + (updated != null ? updated.hashCode() : 0);
- result1 = 31 * result1 + (url != null ? url.hashCode() : 0);
- result1 = 31 * result1 + (objectType != null ? objectType.hashCode() : 0);
- result1 = 31 * result1 + (openSocial != null ? openSocial.hashCode() : 0);
- result1 = 31 * result1 + (extensions != null ? extensions.hashCode() : 0);
- return result1;
- }
+ @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
+ private ActivityStreamsObject actor;
}
Modified: incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java Fri Aug 30 16:38:16 2013
@@ -1,129 +1,115 @@
package org.apache.streams.cassandra.repository.impl;
-import com.google.common.collect.ImmutableMap;
-import com.netflix.astyanax.AstyanaxContext;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
-import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
-import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
-import com.netflix.astyanax.entitystore.DefaultEntityManager;
-import com.netflix.astyanax.entitystore.EntityManager;
-import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
-import com.netflix.astyanax.model.ColumnFamily;
-import com.netflix.astyanax.serializers.StringSerializer;
-import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+import com.datastax.driver.core.*;
+
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import java.util.ArrayList;
import java.util.List;
+
public class CassandraActivityStreamsRepository {
- private final String KEYSPACE_NAME = "ActivityStreams";
- private final String CLUSTER_NAME = "Cluster";
- private final String COLUMN_FAMILY_NAME = "Activities";
+ private final String KEYSPACE_NAME = "streams";
+ private final String TABLE_NAME = "activities";
private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
- private Keyspace keyspace;
- private AstyanaxContext<Keyspace> context;
- private ColumnFamily<String, String> columnFamily;
- private EntityManager<CassandraActivityStreamsEntry, String> entityManager;
+ private Cluster cluster;
+ private Session session;
public CassandraActivityStreamsRepository() {
+ cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
+ session = cluster.connect();
- //Cassandra Context Initialization
- context = getContext();
-
- //start the context
- context.start();
- keyspace = context.getClient();
- columnFamily = ColumnFamily.newColumnFamily(COLUMN_FAMILY_NAME, StringSerializer.get(), StringSerializer.get());
-
- //create the keyspace and column if they don't exist
+ //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS
try {
- createKeyspace();
- createColumn();
- } catch (ConnectionException e) {
- LOG.error("An error occured while trying to connect to the database", e);
+ session.execute("CREATE KEYSPACE " + KEYSPACE_NAME + " WITH replication = { 'class': 'SimpleStrategy','replication_factor' : 1 };");
+ } catch (AlreadyExistsException ignored) {
}
-
- //initialize entitymanager resposible for translating Entry objects into database ready entries
- entityManager = new DefaultEntityManager.Builder<CassandraActivityStreamsEntry, String>()
- .withEntityType(CassandraActivityStreamsEntry.class)
- .withKeyspace(keyspace)
- .withColumnFamily(columnFamily)
- .build();
- }
-
- //creates the context object
- private AstyanaxContext<Keyspace> getContext() {
- if (context != null) {
- return context;
- } else {
- return new AstyanaxContext.Builder()
- .forCluster(CLUSTER_NAME)
- .forKeyspace(KEYSPACE_NAME)
- .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
- .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
- )
- .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("ActivityStreamsConnectionPool")
- .setPort(9160)
- .setMaxConnsPerHost(1)
- .setSeeds("127.0.0.1:9160")
- )
- .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
- .buildKeyspace(ThriftFamilyFactory.getInstance());
+ //connect to the keyspace
+ session = cluster.connect(KEYSPACE_NAME);
+ try {
+ session.execute("CREATE TABLE " + TABLE_NAME + " (" +
+ "id text, " +
+ "published timestamp, " +
+ "verb text, " +
+ "actor_displayname text, " +
+ "actor_id text, " +
+ "object_displayname text, " +
+ "object_id text, " +
+ "target_displayname text, " +
+ "target_id text, " +
+ "PRIMARY KEY (id, published));");
+ } catch (AlreadyExistsException ignored) {
}
}
- //creates the ActivityStreams Keyspace if it doesn't exist already
- private void createKeyspace() throws ConnectionException {
- //try describing the Keyspace, a BadRequestException will be thrown if the keyspace does not exist
- try {
- keyspace.describeKeyspace();
- } catch (BadRequestException e1) {
- //the keyspace does not exist
- //create the keyspace
- keyspace.createKeyspace(ImmutableMap.<String, Object>builder()
- .put("strategy_options", ImmutableMap.<String, Object>builder()
- .put("replication_factor", "1")
- .build())
- .put("strategy_class", "SimpleStrategy")
- .build()
- );
+ public void save(ActivityStreamsEntry entry) {
+ String sql = "INSERT INTO " + TABLE_NAME + " (id, published, verb, actor_displayname, actor_id, object_displayname, object_id, target_displayname, target_id) VALUES ('" +
+ entry.getId() + "','" +
+ entry.getPublished().getTime() + "','" +
+ entry.getVerb() + "','" +
+ entry.getActor().getDisplayName() + "','" +
+ entry.getActor().getId() + "','" +
+ entry.getTarget().getDisplayName() + "','" +
+ entry.getTarget().getId() + "','" +
+ entry.getObject().getDisplayName() + "','" +
+ entry.getObject().getId() + "')";
+ session.execute(sql);
+ }
+
+ public List<ActivityStreamsEntry> getActivitiesForQuery(String cql) {
+ ResultSet set = session.execute(cql);
+ List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>();
+ for (Row row : set) {
+ ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
+ ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject target = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject object = new ActivityStreamsObjectImpl();
+
+ actor.setDisplayName(row.getString("actor_displayname"));
+ actor.setUrl(row.getString("actor_id"));
+
+ target.setDisplayName(row.getString("target_displayname"));
+ target.setUrl(row.getString("target_id"));
+
+ object.setDisplayName(row.getString("object_displayname"));
+ target.setUrl(row.getString("object_id"));
+
+ entry.setPublished(row.getDate("published"));
+ entry.setVerb(row.getString("verb"));
+ entry.setId(row.getString("id"));
+ entry.setActor(actor);
+ entry.setTarget(target);
+ entry.setObject(object);
+
+ results.add(entry);
}
+
+ return results;
}
- //creates the Activities column if it doesn't exist already
- private void createColumn() throws ConnectionException {
- if (keyspace.describeKeyspace().getColumnFamily(COLUMN_FAMILY_NAME) == null) {
- //the column does not exist
- keyspace.createColumnFamily(columnFamily, ImmutableMap.<String, Object>builder()
- .put("default_validation_class", "UTF8Type")
- .put("key_validation_class", "UTF8Type")
- .put("comparator_type", "UTF8Type")
- .build());
- }
+ public void dropTable(String table){
+ String cql = "DROP TABLE " + table;
+ session.execute(cql);
}
- public void save(CassandraActivityStreamsEntry entry) {
+
+ @Override
+ protected void finalize() throws Throwable {
try {
- // Inserting data
- entityManager.put(entry);
- LOG.info("Insertion of the entry with the id, " + entry.getId() + ", was a success");
- } catch (Exception e) {
- LOG.info("An error occured while inserting the entry with id, " + entry.getId(), e);
+ cluster.shutdown();
+ } finally {
+ super.finalize();
}
}
- public List<CassandraActivityStreamsEntry> getActivitiesForQuery(String query) {
- //return entities that match the given cql query
- LOG.info("executing the query: "+query);
- return entityManager.find(query);
- }
-
}
Modified: incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java Fri Aug 30 16:38:16 2013
@@ -1,11 +1,14 @@
package org.apache.streams.cassandra.repository.impl;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.model.CqlResult;
+import org.apache.rave.model.ActivityStreamsEntry;
+import org.apache.rave.model.ActivityStreamsObject;
+import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
+import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
import org.junit.Before;
import org.junit.Test;
+import java.util.Date;
import java.util.List;
public class CassandraActivityStreamsRepositoryTest {
@@ -19,9 +22,40 @@ public class CassandraActivityStreamsRep
}
@Test
- public void getActivitiesForQuery() {
- String cql = "select * from Activities";
- List<CassandraActivityStreamsEntry> list= repository.getActivitiesForQuery(cql);
- assert(list != null);
+ public void saveActivity() {
+ ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
+ ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject target = new ActivityStreamsObjectImpl();
+ ActivityStreamsObject object = new ActivityStreamsObjectImpl();
+
+ actor.setId("actorid1");
+ actor.setDisplayName("actorname1");
+
+ target.setId("targetid1");
+ target.setDisplayName("targetname1");
+
+ object.setId("objectid1");
+ object.setDisplayName("objectname1");
+
+ entry.setId("one");
+ entry.setVerb("verb1");
+ Date d = new Date();
+ entry.setPublished(d);
+ entry.setActor(actor);
+ entry.setObject(object);
+ entry.setTarget(target);
+
+ //repository.save(entry);
+ }
+
+ @Test
+ public void getActivity() {
+ String cql = "SELECT * FROM coltest WHERE published > '2010-10-10' LIMIT 1 ALLOW FILTERING";
+ List<ActivityStreamsEntry> results = repository.getActivitiesForQuery(cql);
+ }
+
+ @Test
+ public void dropTableTest(){
+ //repository.dropTable("coltest");
}
}
Modified: incubator/streams/branches/cassandra/streams-eip-routes/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/pom.xml Fri Aug 30 16:38:16 2013
@@ -95,7 +95,7 @@
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
- <version>2.8.5</version>
+ <version>2.9.0</version>
</dependency>
<dependency>
@@ -189,33 +189,10 @@
<scope>provided</scope>
</dependency>
- <!--astyanax-->
<dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-core</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-thrift</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-cassandra</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-entity-mapper</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- <scope>provided</scope>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.1</version>
</dependency>
</dependencies>
Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java Fri Aug 30 16:38:16 2013
@@ -1,23 +1,15 @@
package org.apache.streams.messaging.aggregation;
-import org.apache.camel.Exchange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
import org.apache.streams.messaging.service.impl.CassandraActivityService;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
-import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.scheduling.annotation.Scheduled;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
public class ActivityAggregator {
@@ -39,8 +31,10 @@ public class ActivityAggregator {
Set<String> activities = new HashSet<String>();
for (ActivityStreamsSubscriptionFilter filter: subscriber.getActivityStreamsSubscriberConfiguration().getActivityStreamsSubscriptionFilters()){
//send the query of each filter to the service to receive the activities of that filter
- activities.addAll(activityService.getActivitiesForQuery(filter.getQuery()));
+ activities.addAll(activityService.getActivitiesForQuery(filter.getQuery() + " WHERE published > " + subscriber.getLastUpdated().getTime() + " LIMIT 10 ALLOW FILTERING"));
}
+ //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost
+ subscriber.setLastUpdated(new Date());
subscriber.receive(new ArrayList<String>(activities));
}
}
Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java Fri Aug 30 16:38:16 2013
@@ -3,6 +3,7 @@ package org.apache.streams.messaging.ser
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.camel.Exchange;
+import org.apache.rave.model.ActivityStreamsEntry;
import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
import org.apache.streams.messaging.service.ActivityService;
@@ -11,6 +12,7 @@ import org.codehaus.jackson.map.ObjectMa
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
public class CassandraActivityService implements ActivityService {
@@ -40,7 +42,8 @@ public class CassandraActivityService im
String activityJson = e.getIn().getBody(String.class);
try {
- CassandraActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class);
+ ActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class);
+ streamsEntry.setPublished(new Date());
cassandraActivityStreamsRepository.save(streamsEntry);
} catch (IOException err) {
LOG.error("there was an error while converting the json string to an object and saving to the database", err);
@@ -50,13 +53,13 @@ public class CassandraActivityService im
}
public List<String> getActivitiesForQuery(String query) {
- List<CassandraActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForQuery(query);
+ List<ActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForQuery(query);
return getJsonList(activityObjects);
}
- private List<String> getJsonList(List<CassandraActivityStreamsEntry> activities) {
+ private List<String> getJsonList(List<ActivityStreamsEntry> activities) {
List<String> jsonList = new ArrayList<String>();
- for (CassandraActivityStreamsEntry entry : activities) {
+ for (ActivityStreamsEntry entry : activities) {
try {
jsonList.add(mapper.writeValueAsString(entry));
} catch (IOException e) {
Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java Fri Aug 30 16:38:16 2013
@@ -1,8 +1,13 @@
package org.apache.streams.messaging.service.impl;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.junit.Before;
import org.junit.Test;
+import static org.easymock.EasyMock.*;
+
+import java.util.ArrayList;
import java.util.List;
public class CassandraActivityServiceTest {
@@ -16,7 +21,44 @@ public class CassandraActivityServiceTes
@Test
public void getActivititiesForQueryTest(){
- List<String> activities = cassandraActivityService.getActivitiesForQuery("select * from Activities");
- assert(activities != null);
+ //List<String> activities = cassandraActivityService.getActivitiesForQuery("select * from Activities");
+ //assert(activities != null);
+ }
+
+ @Test
+ public void receiveExchangeTest(){
+ Exchange e = createMock(Exchange.class);
+ List<Exchange> grouped = new ArrayList<Exchange>();
+ Exchange e2 = createMock(Exchange.class);
+ grouped.add(e2);
+ Message m = createMock(Message.class);
+
+ String activityJson = "{\n" +
+ "\"id\":\"id2\",\n" +
+ "\"verb\":\"verb2\",\n" +
+ "\"displayName\":\"displayname2\",\n" +
+ "\"target\":{\n" +
+ "\t\"id\":\"targetid2\",\n" +
+ "\t\"displayName\":\"targetname2\"\n" +
+ "\t},\n" +
+ "\t\"object\":{\n" +
+ "\t\"id\":\"objectid2\",\n" +
+ "\t\"displayName\":\"objectname2\"\n" +
+ "\t},\n" +
+ "\t\"actor\":{\n" +
+ "\t\"id\":\"actorid2\",\n" +
+ "\t\"displayName\":\"actorname2\"\n" +
+ "\t}\n" +
+ "\t\n" +
+ "\t}";
+
+ expect(e.getProperty(Exchange.GROUPED_EXCHANGE, List.class)).andReturn(grouped);
+ expect(e2.getIn()).andReturn(m);
+ expect(m.getBody(String.class)).andReturn(activityJson);
+
+ replay(e, e2, m);
+
+ //cassandraActivityService.receiveExchange(e);
+ List<String> myTest = cassandraActivityService.getActivitiesForQuery("select * from coltest");
}
}
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/pom.xml Fri Aug 30 16:38:16 2013
@@ -129,22 +129,19 @@
<dependency>
<groupId>org.apache.rave</groupId>
<artifactId>rave-core</artifactId>
- <version>0.22-SNAPSHOT</version>
- <scope>provided</scope>
+ <version>${rave.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rave</groupId>
<artifactId>rave-core-api</artifactId>
- <version>0.22-SNAPSHOT</version>
- <scope>provided</scope>
+ <version>${rave.version}</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-cassandra</artifactId>
<version>0.1-SNAPSHOT</version>
- <scope>provided</scope>
</dependency>
</dependencies>
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java Fri Aug 30 16:38:16 2013
@@ -1,6 +1,7 @@
package org.apache.streams.osgi.components.activitysubscriber;
+import java.util.Date;
import java.util.List;
public interface ActivityStreamsSubscriber {
@@ -14,4 +15,6 @@ public interface ActivityStreamsSubscrib
public boolean isAuthenticated();
public void setAuthenticated(boolean authenticated);
public ActivityStreamsSubscription getActivityStreamsSubscriberConfiguration();
+ Date getLastUpdated();
+ void setLastUpdated(Date lastUpdated);
}
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java Fri Aug 30 16:38:16 2013
@@ -8,6 +8,7 @@ import org.codehaus.jackson.map.Deserial
import org.codehaus.jackson.map.ObjectMapper;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
public class ActivityStreamsSubscriberDelegate implements ActivityStreamsSubscriber {
@@ -23,10 +24,13 @@ public class ActivityStreamsSubscriberDe
//an individual subscriber gets ONE stream which is an aggregation of all its SRCs
private ArrayList<String> stream;
+ private Date lastUpdated;
+
public ActivityStreamsSubscriberDelegate(ActivityStreamsSubscription configuration){
setActivityStreamsSubscriberConfiguration(configuration);
stream = new ArrayList<String>();
+ lastUpdated = new Date(0);
}
@@ -83,7 +87,13 @@ public class ActivityStreamsSubscriberDe
return stream.toString();
}
+ public Date getLastUpdated() {
+ return lastUpdated;
+ }
+ public void setLastUpdated(Date lastUpdated) {
+ this.lastUpdated = lastUpdated;
+ }
public void init(){
//any initialization... gets called directly after registration
Modified: incubator/streams/branches/cassandra/streams-web/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-web/pom.xml?rev=1519019&r1=1519018&r2=1519019&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-web/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-web/pom.xml Fri Aug 30 16:38:16 2013
@@ -48,7 +48,6 @@
<groupId>org.apache.streams</groupId>
<artifactId>streams-eip-routes</artifactId>
<version>${project.version}</version>
- <type>bundle</type>
</dependency>
<dependency>
@@ -143,31 +142,6 @@
<version>${spring.version}</version>
</dependency>
- <!--astyanax-->
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-core</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-thrift</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-cassandra</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>com.netflix.astyanax</groupId>
- <artifactId>astyanax-entity-mapper</artifactId>
- <version>1.56.43-SNAPSHOT</version>
- </dependency>
-
<!--rave-->
<dependency>
<groupId>org.apache.rave</groupId>