You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/10/28 14:09:53 UTC

svn commit: r1536347 - in /incubator/streams/branches/webservice: streams-components/src/main/java/org/apache/streams/components/activityconsumer/ streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ streams-components/src...

Author: dsullivan
Date: Mon Oct 28 13:09:52 2013
New Revision: 1536347

URL: http://svn.apache.org/r1536347
Log:
publisher and subscriber url no longer need to be recreated

Added:
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java
Removed:
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activityconsumer/
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/aggregation/
Modified:
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties
    incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java Mon Oct 28 13:09:52 2013
@@ -1,22 +1,11 @@
 package org.apache.streams.components.activitysubscriber;
 
-
-import org.apache.streams.persistence.model.ActivityStreamsSubscription;
-
 import java.util.Date;
 import java.util.List;
 
 public interface ActivityStreamsSubscriber {
-    public void receive(List<String> activity);
-    public String getStream();
-    public void init();
-    public void setInRoute(String route);
-    public String getInRoute();
-    public void setSubscription(ActivityStreamsSubscription config);
-    public void updateSubscription(String config);
-    public boolean isAuthenticated();
-    public void setAuthenticated(boolean authenticated);
-    public ActivityStreamsSubscription getSubscription();
+    void receive(List<String> activity);
+    String getStream();
     Date getLastUpdated();
     void setLastUpdated(Date lastUpdated);
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java Mon Oct 28 13:09:52 2013
@@ -1,16 +1,15 @@
 package org.apache.streams.components.activitysubscriber;
 
-import java.util.Collection;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
 
 /**
  * Public API representing an example OSGi service
  */
 public interface ActivityStreamsSubscriberWarehouse {
 
-    public void register(ActivityStreamsSubscriber activitySubscriber);
-
-    public ActivityStreamsSubscriber findSubscribersByID(String id);
-
-    public Collection<ActivityStreamsSubscriber> getAllSubscribers();
+    String getStream(String inRoute);
+    ActivityStreamsSubscriber getSubscriber(String inRoute);
+    void updateSubscriber(ActivityStreamsSubscription subscription);
+    void register(ActivityStreamsSubscription subscription);
 }
 

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java Mon Oct 28 13:09:52 2013
@@ -3,10 +3,6 @@ package org.apache.streams.components.ac
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriber;
-import org.apache.streams.persistence.model.ActivityStreamsSubscription;
-import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -16,62 +12,16 @@ public class ActivityStreamsSubscriberDe
 
     private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberDelegate.class);
 
-    private boolean authenticated;
-
-    private ActivityStreamsSubscription subscription;
-
-    private String inRoute;
-
     //an individual subscriber gets ONE stream which is an aggregation of all its SRCs
     private List<String> stream;
 
     private Date lastUpdated;
 
-    public ActivityStreamsSubscriberDelegate(ActivityStreamsSubscription subscription){
-        this.subscription = subscription;
+    public ActivityStreamsSubscriberDelegate(){
         this.stream = new ArrayList<String>();
         this.lastUpdated = new Date(0);
     }
 
-
-    public void updateSubscription(String subscriptionJSON) {
-        ObjectMapper mapper = new ObjectMapper();
-        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false);
-
-        try {
-            // read from file, convert it to user class
-            ActivityStreamsSubscription subscription = mapper.readValue(subscriptionJSON, CassandraSubscription.class);
-            this.subscription = subscription;
-
-        } catch (Exception e) {
-            LOG.info("exception" + e);
-
-        }
-
-    }
-
-    public boolean isAuthenticated() {
-        return authenticated;
-    }
-
-    public void setAuthenticated(boolean authenticated) {
-        this.authenticated = authenticated;
-    }
-
-    public String getInRoute() {
-        return inRoute;
-    }
-
-    public void setInRoute(String inRoute) {
-        this.inRoute = inRoute;
-    }
-
-    public void receive (List<String> activity){
-        //add new activities to stream
-        LOG.info("adding activities to subscription stream");
-        stream.addAll(0,activity);
-    }
-
     //return the list of activities (stream) as a json string
     public String getStream() {
 
@@ -86,18 +36,7 @@ public class ActivityStreamsSubscriberDe
         this.lastUpdated = lastUpdated;
     }
 
-    public void init(){
-        //any initialization... gets called directly after registration
-
-
-
-    }
-
-    public ActivityStreamsSubscription getSubscription() {
-        return subscription;
-    }
-
-    public void setSubscription(ActivityStreamsSubscription subscription) {
-        this.subscription = subscription;
+    public void receive(List<String> activities){
+        stream.addAll(activities);
     }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java Mon Oct 28 13:09:52 2013
@@ -1,13 +1,13 @@
 package org.apache.streams.components.activitysubscriber.impl;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriber;
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
+import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -16,29 +16,46 @@ public class ActivityStreamsSubscriberWa
     private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberWarehouseImpl.class);
 
     private Map<String, ActivityStreamsSubscriber> subscribers;
+    private StreamsActivityRepositoryService activityService;
 
-    public ActivityStreamsSubscriberWarehouseImpl(){
+    @Autowired
+    public ActivityStreamsSubscriberWarehouseImpl(StreamsActivityRepositoryService activityService) {
+        this.activityService = activityService;
         subscribers = new HashMap<String, ActivityStreamsSubscriber>();
     }
 
-    public void register(ActivityStreamsSubscriber activitySubscriber) {
-        if (!subscribers.containsKey(activitySubscriber.getInRoute())){
-            subscribers.put(activitySubscriber.getInRoute(), activitySubscriber);
-            activitySubscriber.init();
+    @Override
+    public void register(ActivityStreamsSubscription subscription) {
+        if (!subscribers.containsKey(subscription.getInRoute())) {
+            ActivityStreamsSubscriber subscriber = new ActivityStreamsSubscriberDelegate();
+            subscribers.put(subscription.getInRoute(), subscriber);
         }
-
     }
 
-    //the warehouse can do some interesting things to make the filtering efficient i think...
-    public ActivityStreamsSubscriber findSubscribersByID(String id){
-        return subscribers.get(id);
+    @Override
+    public String getStream(String inRoute) {
+        ActivityStreamsSubscriber subscriber = getSubscriber(inRoute);
+        if (subscriber != null) {
+            return subscriber.getStream();
+        } else {
+            return "Registration Needed";
+        }
     }
 
-
-    public Collection<ActivityStreamsSubscriber> getAllSubscribers(){
-        return subscribers.values();
+    @Override
+    public ActivityStreamsSubscriber getSubscriber(String inRoute) {
+        return subscribers.get(inRoute);
     }
 
-
-
+    @Override
+    public void updateSubscriber(ActivityStreamsSubscription subscription) {
+        ActivityStreamsSubscriber subscriber = getSubscriber(subscription.getInRoute());
+        if (subscriber != null) {
+            //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost
+            Set<String> activities = new TreeSet<String>();
+            activities.addAll(activityService.getActivitiesForFilters(subscription.getFilters(), subscriber.getLastUpdated()));
+            subscriber.setLastUpdated(new Date());
+            subscriber.receive(new ArrayList<String>(activities));
+        }
+    }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java Mon Oct 28 13:09:52 2013
@@ -1,7 +1,5 @@
 package org.apache.streams.components.service;
 
-import org.codehaus.jackson.JsonParseException;
-
 import java.io.IOException;
 
 public interface StreamsSubscriberRegistrationService {

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java Mon Oct 28 13:09:52 2013
@@ -5,7 +5,6 @@ import org.apache.streams.persistence.mo
 import java.util.List;
 
 public interface StreamsSubscriptionRepositoryService{
-
-    List<String> getFilters(String authToken);
-    void saveFilters(ActivityStreamsSubscription subscription);
+    void saveSubscription(ActivityStreamsSubscription subscription);
+    List<ActivityStreamsSubscription> getAllSubscriptions();
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java Mon Oct 28 13:09:52 2013
@@ -20,12 +20,14 @@ public class CassandraSubscriptionServic
         this.repository = repository;
     }
 
-    public List<String> getFilters(String authToken) {
-        return Arrays.asList(repository.getFilters(authToken).split(" "));
-    }
-
-    public void saveFilters(ActivityStreamsSubscription subscription) {
+    @Override
+    public void saveSubscription(ActivityStreamsSubscription subscription) {
         subscription.setId("" + UUID.randomUUID());
         repository.save(subscription);
     }
+
+    @Override
+    public List<ActivityStreamsSubscription> getAllSubscriptions(){
+        return repository.getAllSubscriptions();
+    }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java Mon Oct 28 13:09:52 2013
@@ -25,7 +25,6 @@ public class StreamsActivityReceivingSer
      * @return the stream list in string form
      * */
     public  String getActivity(String subscriberID){
-        ActivityStreamsSubscriber subscriber = activityStreamsSubscriberWarehouse.findSubscribersByID(subscriberID);
-        return subscriber.getStream();
+        return activityStreamsSubscriberWarehouse.getStream(subscriberID);
     }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java Mon Oct 28 13:09:52 2013
@@ -3,12 +3,9 @@ package org.apache.streams.components.se
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.service.StreamsSubscriberRegistrationService;
-import org.apache.streams.components.aggregation.ActivityAggregator;
 import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
-import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriber;
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
 import org.apache.streams.persistence.model.ActivityStreamsSubscription;
-import org.apache.streams.components.activitysubscriber.impl.ActivityStreamsSubscriberDelegate;
 import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -23,17 +20,14 @@ public class StreamsSubscriberRegistrati
     private Log log = LogFactory.getLog(StreamsSubscriberRegistrationServiceImpl.class);
 
     private StreamsSubscriptionRepositoryService subscriptionService;
-    private ActivityAggregator activityAggregator;
     private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
 
     @Autowired
     public StreamsSubscriberRegistrationServiceImpl(
             StreamsSubscriptionRepositoryService subscriptionService,
-            ActivityAggregator activityAggregator,
             ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse
     ) {
         this.subscriptionService = subscriptionService;
-        this.activityAggregator = activityAggregator;
         this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
     }
 
@@ -47,18 +41,10 @@ public class StreamsSubscriberRegistrati
         mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 
         ActivityStreamsSubscription subscription = mapper.readValue(subscriberJSON, CassandraSubscription.class);
-        if (subscription.getFilters() == null) {
-            subscription.setFilters(subscriptionService.getFilters(subscription.getId()));
-        } else {
-            subscriptionService.saveFilters(subscription);
-        }
-
-        ActivityStreamsSubscriber subscriber = new ActivityStreamsSubscriberDelegate(subscription);
-        subscriber.setAuthenticated(true);
-        subscriber.setInRoute("" + UUID.randomUUID());
-        activityAggregator.updateSubscriber(subscriber);
-        activityStreamsSubscriberWarehouse.register(subscriber);
+        subscription.setInRoute("" + UUID.randomUUID());
+        subscriptionService.saveSubscription(subscription);
+        activityStreamsSubscriberWarehouse.register(subscription);
 
-        return subscriber.getInRoute();
+        return subscription.getInRoute();
     }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java Mon Oct 28 13:09:52 2013
@@ -7,8 +7,10 @@ import backtype.storm.topology.OutputFie
 import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
 import org.apache.streams.components.service.StreamsActivityRepositoryService;
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriber;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
@@ -19,7 +21,7 @@ import java.util.*;
 public class StormSubscriberBolt extends BaseBasicBolt {
     private static ApplicationContext appContext;
     BatchOutputCollector _collector;
-    StreamsActivityRepositoryService activityService;
+    private ActivityStreamsSubscriberWarehouse subscriberWarehouse;
 
     @Autowired
     public StormSubscriberBolt(ApplicationContext ctx) {
@@ -28,24 +30,23 @@ public class StormSubscriberBolt extends
 
     @Override
     public void prepare(Map stormConf, TopologyContext context) {
-        activityService = (StreamsActivityRepositoryService) appContext.getBean("cassandraActivityService");
+        subscriberWarehouse = (ActivityStreamsSubscriberWarehouse) appContext.getBean("activityStreamsSubscriberWarehouseImpl");
         super.prepare(stormConf, context);
     }
 
     @Override
     public void execute(Tuple tuple, BasicOutputCollector collector) {
-        if(tuple.getValue(0) instanceof ActivityStreamsSubscriber){
-            ActivityStreamsSubscriber subscriber = (ActivityStreamsSubscriber) tuple.getValue(0);
-            this.updateSubscriber(subscriber);
+        if(tuple.getValue(0) instanceof ActivityStreamsSubscription){
+            ActivityStreamsSubscription subscription = (ActivityStreamsSubscription) tuple.getValue(0);
+            if(subscriberWarehouse.getSubscriber(subscription.getInRoute()) == null){
+                subscriberWarehouse.register(subscription);
+            }
+            subscriberWarehouse.updateSubscriber(subscription);
         }
     }
 
-    public void updateSubscriber(ActivityStreamsSubscriber subscriber) {
-        Set<String> activities = new TreeSet<String>();
-        activities.addAll(activityService.getActivitiesForFilters(subscriber.getSubscription().getFilters(), subscriber.getLastUpdated()));
-        //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost
-        subscriber.setLastUpdated(new Date());
-        subscriber.receive(new ArrayList<String>(activities));
+    public void updateSubscription(ActivityStreamsSubscription subscription) {
+
     }
 
     @Override

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java Mon Oct 28 13:09:52 2013
@@ -11,6 +11,9 @@ import org.apache.streams.components.act
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
 import org.apache.streams.components.activitysubscriber.impl.ActivityStreamsSubscriberWarehouseImpl;
 import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
 import org.springframework.beans.factory.BeanFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.xml.XmlBeanFactory;
@@ -26,7 +29,7 @@ import java.util.Map;
 public class StormSubscriberSpout extends BaseRichSpout {
 
     private static ApplicationContext appContext;
-    private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+    private StreamsSubscriptionRepositoryService repositoryService;
     private SpoutOutputCollector _collector;
     private Iterator iterator;
 
@@ -37,7 +40,7 @@ public class StormSubscriberSpout extend
 
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        activityStreamsSubscriberWarehouse = (ActivityStreamsSubscriberWarehouse)appContext.getBean("activityStreamsSubscriberWarehouseImpl");
+        repositoryService = (StreamsSubscriptionRepositoryService)appContext.getBean("cassandraSubscriptionService");
 
         _collector = collector;
     }
@@ -45,14 +48,13 @@ public class StormSubscriberSpout extend
     @Override
     public void nextTuple() {
         Utils.sleep(10000);
-        for (ActivityStreamsSubscriber activityStreamsSubscriber : activityStreamsSubscriberWarehouse.getAllSubscribers()) {
-            _collector.emit(new Values(activityStreamsSubscriber));
+        for (ActivityStreamsSubscription subscription : repositoryService.getAllSubscriptions()) {
+            _collector.emit(new Values(subscription));
         }
     }
 
     @Override
     public void ack(Object id) {
-        System.out.println("RandomSentenceSpout.ack: "+ id);
     }
 
     @Override

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java Mon Oct 28 13:09:52 2013
@@ -1,15 +1,16 @@
 package org.apache.streams.persistence.model;
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
 import java.util.List;
 
 public interface ActivityStreamsSubscription {
 
-    public void setFilters(List<String> filters);
-    public List<String> getFilters();
+    void setFilters(List<String> filters);
+    List<String> getFilters();
+
+    String getId();
+    void setId(String id);
 
-    public String getId();
-    public void setId(String id);
+    String getInRoute();
+    void setInRoute(String inRoute);
 
 }

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java Mon Oct 28 13:09:52 2013
@@ -13,6 +13,8 @@ public class CassandraSubscription imple
 
     private String id;
 
+    private String inRoute;
+
     public void setFilters(List<String> filters) {
         //TODO: it's possible that this could be null
         this.filters = filters;
@@ -33,4 +35,14 @@ public class CassandraSubscription imple
     public void setId(String id) {
         this.id = id;
     }
+
+    @Override
+    public String getInRoute() {
+        return inRoute;
+    }
+
+    @Override
+    public void setInRoute(String inRoute) {
+        this.inRoute = inRoute;
+    }
 }

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java Mon Oct 28 13:09:52 2013
@@ -2,7 +2,10 @@ package org.apache.streams.persistence.r
 
 import org.apache.streams.persistence.model.ActivityStreamsSubscription;
 
+import java.util.List;
+
 public interface SubscriptionRepository {
-    String getFilters(String id);
+    String getSubscriptionForId(String id);
+    List<ActivityStreamsSubscription> getAllSubscriptions();
     void save(ActivityStreamsSubscription subscription);
 }

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java Mon Oct 28 13:09:52 2013
@@ -1,16 +1,22 @@
 package org.apache.streams.persistence.repository.cassandra;
 
 import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.persistence.configuration.CassandraConfiguration;
 import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
 import org.apache.streams.persistence.repository.SubscriptionRepository;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 @Component
 public class CassandraSubscriptionRepository implements SubscriptionRepository{
     private static final Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
@@ -26,6 +32,7 @@ public class CassandraSubscriptionReposi
         try {
             keyspace.getSession().execute("CREATE TABLE " + configuration.getSubscriptionColumnFamilyName() + " (" +
                     "id text, " +
+                    "inroute text, " +
                     "filters text, " +
 
                     "PRIMARY KEY (id));");
@@ -33,7 +40,7 @@ public class CassandraSubscriptionReposi
         }
     }
 
-    public String getFilters(String id){
+    public String getSubscriptionForId(String id){
         String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName()  + " WHERE id = '" + id+"';";
 
         ResultSet set = keyspace.getSession().execute(cql);
@@ -41,13 +48,33 @@ public class CassandraSubscriptionReposi
         return set.one().getString("filters");
     }
 
+    public List<ActivityStreamsSubscription> getAllSubscriptions(){
+        String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName();
+
+        ResultSet set = keyspace.getSession().execute(cql);
+        List<ActivityStreamsSubscription> results = new ArrayList<ActivityStreamsSubscription>();
+
+        for (Row row : set) {
+            ActivityStreamsSubscription subscription = new CassandraSubscription();
+
+            subscription.setId(row.getString("id"));
+            subscription.setInRoute(row.getString("inroute"));
+            subscription.setFilters(Arrays.asList(row.getString("filters").split(",")));
+
+            results.add(subscription);
+        }
+
+        return results;
+    }
+
     public void save(ActivityStreamsSubscription subscription){
         //TODO: will this overwrite?
         String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName()  + " (" +
-                "id, filters) " +
+                "id, inroute, filters) " +
                 "VALUES ('" +
                 subscription.getId() + "','" +
-                StringUtils.join(subscription.getFilters(), " ") +
+                subscription.getInRoute() + "','" +
+                StringUtils.join(subscription.getFilters(), ",") +
 
                 "')";
         keyspace.getSession().execute(cql);

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties Mon Oct 28 13:09:52 2013
@@ -1,5 +1,5 @@
-cassandra.keyspaceName=keyspacetest
-cassandra.activitystreamsColumnFamilyName=activitystreams
-cassandra.subscriptionColumnFamilyName=subscriptions
-cassandra.publisherColumnFamilyName=publishers
+cassandra.keyspaceName=keyspaceB
+cassandra.activitystreamsColumnFamilyName=activitystreamsB
+cassandra.subscriptionColumnFamilyName=subscriptionsB
+cassandra.publisherColumnFamilyName=publishersB
 cassandra.cassandraPort=127.0.0.1
\ No newline at end of file

Added: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java?rev=1536347&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java Mon Oct 28 13:09:52 2013
@@ -0,0 +1,46 @@
+package org.apache.streams.persistence.repository.cassandra;
+
+import org.apache.streams.persistence.configuration.CassandraConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CassandraSubscriptionRepositoryTest {
+
+    private SubscriptionRepository repository;
+
+    @Before
+    public void setup() {
+        CassandraConfiguration configuration = new CassandraConfiguration();
+        configuration.setCassandraPort("127.0.0.1");
+        configuration.setSubscriptionColumnFamilyName("subscriptionsA");
+        configuration.setKeyspaceName("keyspacetest");
+        CassandraKeyspace keyspace = new CassandraKeyspace(configuration);
+
+        repository = new CassandraSubscriptionRepository(keyspace, configuration);
+    }
+
+    @Ignore
+    @Test
+    public void getAllSubscriptionsTest() {
+        List<ActivityStreamsSubscription> subscriptions = repository.getAllSubscriptions();
+
+    }
+
+    @Ignore
+    @Test
+    public void addSubscriptionsTest() {
+        ActivityStreamsSubscription subscription = new CassandraSubscription();
+        subscription.setFilters(Arrays.asList("r501", "tags"));
+        subscription.setId("newID2");
+        subscription.setInRoute("randomID");
+
+        repository.save(subscription);
+    }
+}

Modified: incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java (original)
+++ incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java Mon Oct 28 13:09:52 2013
@@ -17,11 +17,9 @@ import org.springframework.web.context.W
 import org.junit.Before;
 
 import javax.servlet.ServletException;
-import java.io.IOException;
 
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.view;
 
 @RunWith(SpringJUnit4ClassRunner.class)
 @WebAppConfiguration