You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/11/08 22:30:09 UTC
svn commit: r1540193 - in
/incubator/streams/branches/webservice/streams-persistence/src:
main/java/org/apache/streams/persistence/repository/cassandra/
test/java/org/apache/streams/persistence/repository/cassandra/
Author: dsullivan
Date: Fri Nov 8 21:30:09 2013
New Revision: 1540193
URL: http://svn.apache.org/r1540193
Log:
changing to new datastax version and transitioning to query builder in the ActivityStreamsRepository
Modified:
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java?rev=1540193&r1=1540192&r2=1540193&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java Fri Nov 8 21:30:09 2013
@@ -3,6 +3,9 @@ package org.apache.streams.persistence.r
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.persistence.configuration.CassandraConfiguration;
@@ -14,10 +17,7 @@ import org.apache.streams.persistence.re
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
@Component
public class CassandraActivityStreamsRepository implements ActivityStreamsRepository {
@@ -33,7 +33,7 @@ public class CassandraActivityStreamsRep
this.keyspace = keyspace;
try {
- keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
+ String createKeyspaceCql = "CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
"id text, " +
"published timestamp, " +
"verb text, " +
@@ -55,44 +55,42 @@ public class CassandraActivityStreamsRep
"object_id text, " +
"object_objecttype text, " +
- "PRIMARY KEY (id, tags, published));");
+ "PRIMARY KEY (id));";
+ String publishedIndexCql = "CREATE INDEX ON " + configuration.getActivitystreamsColumnFamilyName() + " (published)";
+ String tagsIndexCql = "CREATE INDEX ON " + configuration.getActivitystreamsColumnFamilyName() + " (tags)";
+
+ keyspace.getSession().execute(createKeyspaceCql);
+ keyspace.getSession().execute(tagsIndexCql);
+ keyspace.getSession().execute(publishedIndexCql);
} catch (AlreadyExistsException ignored) {
}
}
@Override
public void save(ActivityStreamsEntry entry) {
- //TODO: this should be random UUID
- String sql = "INSERT INTO " + configuration.getActivitystreamsColumnFamilyName() + " (" +
- "id, published, verb, tags, " +
- "actor_displayname, actor_objecttype, actor_id, actor_url, " +
- "target_displayname, target_id, target_url, " +
- "provider_url, " +
- "object_displayname, object_objecttype, object_id, object_url) " +
- "VALUES ('" +
- entry.getId() + "','" +
- entry.getPublished().getTime() + "','" +
- entry.getVerb() + "','" +
- entry.getTags() + "','" +
-
- entry.getActor().getDisplayName() + "','" +
- entry.getActor().getObjectType() + "','" +
- entry.getActor().getId() + "','" +
- entry.getActor().getUrl() + "','" +
-
- entry.getTarget().getDisplayName() + "','" +
- entry.getTarget().getId() + "','" +
- entry.getTarget().getUrl() + "','" +
-
- entry.getProvider().getUrl() + "','" +
-
- entry.getObject().getDisplayName() + "','" +
- entry.getObject().getObjectType() + "','" +
- entry.getObject().getId() + "','" +
- entry.getObject().getUrl() +
+ Insert query = QueryBuilder.insertInto(configuration.getActivitystreamsColumnFamilyName())
+ .value("id", entry.getId())
+ .value("published", entry.getPublished().getTime())
+ .value("verb", entry.getVerb())
+ .value("tags", entry.getTags())
+
+ .value("actor_displayname", entry.getActor().getDisplayName())
+ .value("actor_objecttype", entry.getActor().getObjectType())
+ .value("actor_id", entry.getActor().getId())
+ .value("actor_url", entry.getActor().getUrl())
+
+ .value("target_displayname", entry.getTarget().getDisplayName())
+ .value("target_id", entry.getTarget().getId())
+ .value("target_url", entry.getTarget().getUrl())
+
+ .value("provider_url", entry.getProvider().getUrl())
+
+ .value("object_displayname", entry.getObject().getDisplayName())
+ .value("object_objecttype", entry.getObject().getObjectType())
+ .value("object_id", entry.getObject().getId())
+ .value("object_url", entry.getObject().getUrl());
- "')";
- keyspace.getSession().execute(sql);
+ keyspace.getSession().execute(query);
}
@Override
@@ -100,17 +98,12 @@ public class CassandraActivityStreamsRep
List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>();
for (String tag : tags) {
- String cql = "SELECT * FROM " + configuration.getActivitystreamsColumnFamilyName() + " WHERE ";
-
- //add tags
- cql = cql + " tags = '" + tag + "' AND ";
-
- //specify last modified
- cql = cql + "published > " + lastUpdated.getTime() + " ALLOW FILTERING";
-
- //execute the cql query and store the results
- //TODO: will this ever return a null ResultSet
- ResultSet set = keyspace.getSession().execute(cql);
+ Select query = QueryBuilder.select().from(configuration.getActivitystreamsColumnFamilyName())
+ .where(QueryBuilder.eq("tags", tag))
+ .and(QueryBuilder.gt("published", lastUpdated))
+ .limit(10)
+ .allowFiltering();
+ ResultSet set = keyspace.getSession().execute(query);
//iterate through the results and create a new ActivityStreamsEntry for every result returned
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java?rev=1540193&r1=1540192&r2=1540193&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java Fri Nov 8 21:30:09 2013
@@ -3,10 +3,14 @@ package org.apache.streams.persistence.r
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import org.apache.streams.persistence.configuration.CassandraConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.net.InetAddress;
+
@Component
public class CassandraKeyspace {
private CassandraConfiguration configuration;
Modified: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java?rev=1540193&r1=1540192&r2=1540193&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java Fri Nov 8 21:30:09 2013
@@ -9,7 +9,9 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Date;
+import java.util.HashSet;
public class CassandraActiivtyStreamsRepositoryTest {
private CassandraActivityStreamsRepository repository;
@@ -18,7 +20,7 @@ public class CassandraActiivtyStreamsRep
public void setup(){
CassandraConfiguration configuration = new CassandraConfiguration();
configuration.setCassandraPort("127.0.0.1");
- configuration.setActivitystreamsColumnFamilyName("acitivites_Test1");
+ configuration.setActivitystreamsColumnFamilyName("acitivites_Test34");
configuration.setKeyspaceName("keyspacetest");
CassandraKeyspace keyspace = new CassandraKeyspace(configuration);
@@ -33,6 +35,12 @@ public class CassandraActiivtyStreamsRep
@Ignore
@Test
+ public void getActivitiesForTagsTest(){
+ repository.getActivitiesForTags(new HashSet<String>(Arrays.asList("tags")),new Date(0));
+ }
+
+ @Ignore
+ @Test
public void saveActivity(){
ActivityStreamsEntry entry = new CassandraActivityStreamsEntry();
ActivityStreamsObject actor = new CassandraActivityStreamsObject();