You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/11/08 22:30:09 UTC

svn commit: r1540193 - in /incubator/streams/branches/webservice/streams-persistence/src: main/java/org/apache/streams/persistence/repository/cassandra/ test/java/org/apache/streams/persistence/repository/cassandra/

Author: dsullivan
Date: Fri Nov  8 21:30:09 2013
New Revision: 1540193

URL: http://svn.apache.org/r1540193
Log:
changing to new datastax version and transitioning to query builder in the ActivityStreamsRepository

Modified:
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java?rev=1540193&r1=1540192&r2=1540193&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java Fri Nov  8 21:30:09 2013
@@ -3,6 +3,9 @@ package org.apache.streams.persistence.r
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.persistence.configuration.CassandraConfiguration;
@@ -14,10 +17,7 @@ import org.apache.streams.persistence.re
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 @Component
 public class CassandraActivityStreamsRepository implements ActivityStreamsRepository {
@@ -33,7 +33,7 @@ public class CassandraActivityStreamsRep
         this.keyspace = keyspace;
 
         try {
-            keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
+            String createKeyspaceCql = "CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
                     "id text, " +
                     "published timestamp, " +
                     "verb text, " +
@@ -55,44 +55,42 @@ public class CassandraActivityStreamsRep
                     "object_id text, " +
                     "object_objecttype text, " +
 
-                    "PRIMARY KEY (id, tags, published));");
+                    "PRIMARY KEY (id));";
+            String publishedIndexCql = "CREATE INDEX ON " + configuration.getActivitystreamsColumnFamilyName() + " (published)";
+            String tagsIndexCql = "CREATE INDEX ON " + configuration.getActivitystreamsColumnFamilyName() + " (tags)";
+
+            keyspace.getSession().execute(createKeyspaceCql);
+            keyspace.getSession().execute(tagsIndexCql);
+            keyspace.getSession().execute(publishedIndexCql);
         } catch (AlreadyExistsException ignored) {
         }
     }
 
     @Override
     public void save(ActivityStreamsEntry entry) {
-        //TODO: this should be random UUID
-        String sql = "INSERT INTO " + configuration.getActivitystreamsColumnFamilyName() + " (" +
-                "id, published, verb, tags, " +
-                "actor_displayname, actor_objecttype, actor_id, actor_url, " +
-                "target_displayname, target_id, target_url, " +
-                "provider_url, " +
-                "object_displayname, object_objecttype, object_id, object_url) " +
-                "VALUES ('" +
-                entry.getId() + "','" +
-                entry.getPublished().getTime() + "','" +
-                entry.getVerb() + "','" +
-                entry.getTags() + "','" +
-
-                entry.getActor().getDisplayName() + "','" +
-                entry.getActor().getObjectType() + "','" +
-                entry.getActor().getId() + "','" +
-                entry.getActor().getUrl() + "','" +
-
-                entry.getTarget().getDisplayName() + "','" +
-                entry.getTarget().getId() + "','" +
-                entry.getTarget().getUrl() + "','" +
-
-                entry.getProvider().getUrl() + "','" +
-
-                entry.getObject().getDisplayName() + "','" +
-                entry.getObject().getObjectType() + "','" +
-                entry.getObject().getId() + "','" +
-                entry.getObject().getUrl() +
+        Insert query = QueryBuilder.insertInto(configuration.getActivitystreamsColumnFamilyName())
+                .value("id", entry.getId())
+                .value("published", entry.getPublished().getTime())
+                .value("verb", entry.getVerb())
+                .value("tags", entry.getTags())
+
+                .value("actor_displayname", entry.getActor().getDisplayName())
+                .value("actor_objecttype", entry.getActor().getObjectType())
+                .value("actor_id", entry.getActor().getId())
+                .value("actor_url", entry.getActor().getUrl())
+
+                .value("target_displayname", entry.getTarget().getDisplayName())
+                .value("target_id", entry.getTarget().getId())
+                .value("target_url", entry.getTarget().getUrl())
+
+                .value("provider_url", entry.getProvider().getUrl())
+
+                .value("object_displayname", entry.getObject().getDisplayName())
+                .value("object_objecttype", entry.getObject().getObjectType())
+                .value("object_id", entry.getObject().getId())
+                .value("object_url", entry.getObject().getUrl());
 
-                "')";
-        keyspace.getSession().execute(sql);
+        keyspace.getSession().execute(query);
     }
 
     @Override
@@ -100,17 +98,12 @@ public class CassandraActivityStreamsRep
         List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>();
 
         for (String tag : tags) {
-            String cql = "SELECT * FROM " + configuration.getActivitystreamsColumnFamilyName() + " WHERE ";
-
-            //add tags
-            cql = cql + " tags = '" + tag + "' AND ";
-
-            //specify last modified
-            cql = cql + "published > " + lastUpdated.getTime() + " ALLOW FILTERING";
-
-            //execute the cql query and store the results
-            //TODO: will this ever return a null ResultSet
-            ResultSet set = keyspace.getSession().execute(cql);
+            Select query = QueryBuilder.select().from(configuration.getActivitystreamsColumnFamilyName())
+                    .where(QueryBuilder.eq("tags", tag))
+                    .and(QueryBuilder.gt("published", lastUpdated))
+                    .limit(10)
+                    .allowFiltering();
+            ResultSet set = keyspace.getSession().execute(query);
 
             //iterate through the results and create a new ActivityStreamsEntry for every result returned
 

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java?rev=1540193&r1=1540192&r2=1540193&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java Fri Nov  8 21:30:09 2013
@@ -3,10 +3,14 @@ package org.apache.streams.persistence.r
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
 import org.apache.streams.persistence.configuration.CassandraConfiguration;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.net.InetAddress;
+
 @Component
 public class CassandraKeyspace {
     private CassandraConfiguration configuration;

Modified: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java?rev=1540193&r1=1540192&r2=1540193&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java Fri Nov  8 21:30:09 2013
@@ -9,7 +9,9 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Date;
+import java.util.HashSet;
 
 public class CassandraActiivtyStreamsRepositoryTest {
     private CassandraActivityStreamsRepository repository;
@@ -18,7 +20,7 @@ public class CassandraActiivtyStreamsRep
     public void setup(){
         CassandraConfiguration configuration = new CassandraConfiguration();
         configuration.setCassandraPort("127.0.0.1");
-        configuration.setActivitystreamsColumnFamilyName("acitivites_Test1");
+        configuration.setActivitystreamsColumnFamilyName("acitivites_Test34");
         configuration.setKeyspaceName("keyspacetest");
         CassandraKeyspace keyspace = new CassandraKeyspace(configuration);
 
@@ -33,6 +35,12 @@ public class CassandraActiivtyStreamsRep
 
     @Ignore
     @Test
+    public void getActivitiesForTagsTest(){
+        repository.getActivitiesForTags(new HashSet<String>(Arrays.asList("tags")),new Date(0));
+    }
+
+    @Ignore
+    @Test
     public void saveActivity(){
         ActivityStreamsEntry entry = new CassandraActivityStreamsEntry();
         ActivityStreamsObject actor = new CassandraActivityStreamsObject();