You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/10/28 14:09:53 UTC
svn commit: r1536347 - in /incubator/streams/branches/webservice:
streams-components/src/main/java/org/apache/streams/components/activityconsumer/
streams-components/src/main/java/org/apache/streams/components/activitysubscriber/
streams-components/src...
Author: dsullivan
Date: Mon Oct 28 13:09:52 2013
New Revision: 1536347
URL: http://svn.apache.org/r1536347
Log:
publisher and subscriber url no longer need to be recreated
Added:
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java
Removed:
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activityconsumer/
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/aggregation/
Modified:
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties
incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriber.java Mon Oct 28 13:09:52 2013
@@ -1,22 +1,11 @@
package org.apache.streams.components.activitysubscriber;
-
-import org.apache.streams.persistence.model.ActivityStreamsSubscription;
-
import java.util.Date;
import java.util.List;
public interface ActivityStreamsSubscriber {
- public void receive(List<String> activity);
- public String getStream();
- public void init();
- public void setInRoute(String route);
- public String getInRoute();
- public void setSubscription(ActivityStreamsSubscription config);
- public void updateSubscription(String config);
- public boolean isAuthenticated();
- public void setAuthenticated(boolean authenticated);
- public ActivityStreamsSubscription getSubscription();
+ void receive(List<String> activity);
+ String getStream();
Date getLastUpdated();
void setLastUpdated(Date lastUpdated);
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java Mon Oct 28 13:09:52 2013
@@ -1,16 +1,15 @@
package org.apache.streams.components.activitysubscriber;
-import java.util.Collection;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
/**
* Public API representing an example OSGi service
*/
public interface ActivityStreamsSubscriberWarehouse {
- public void register(ActivityStreamsSubscriber activitySubscriber);
-
- public ActivityStreamsSubscriber findSubscribersByID(String id);
-
- public Collection<ActivityStreamsSubscriber> getAllSubscribers();
+ String getStream(String inRoute);
+ ActivityStreamsSubscriber getSubscriber(String inRoute);
+ void updateSubscriber(ActivityStreamsSubscription subscription);
+ void register(ActivityStreamsSubscription subscription);
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java Mon Oct 28 13:09:52 2013
@@ -3,10 +3,6 @@ package org.apache.streams.components.ac
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriber;
-import org.apache.streams.persistence.model.ActivityStreamsSubscription;
-import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
import java.util.ArrayList;
import java.util.Date;
@@ -16,62 +12,16 @@ public class ActivityStreamsSubscriberDe
private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberDelegate.class);
- private boolean authenticated;
-
- private ActivityStreamsSubscription subscription;
-
- private String inRoute;
-
//an individual subscriber gets ONE stream which is an aggregation of all its SRCs
private List<String> stream;
private Date lastUpdated;
- public ActivityStreamsSubscriberDelegate(ActivityStreamsSubscription subscription){
- this.subscription = subscription;
+ public ActivityStreamsSubscriberDelegate(){
this.stream = new ArrayList<String>();
this.lastUpdated = new Date(0);
}
-
- public void updateSubscription(String subscriptionJSON) {
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false);
-
- try {
- // read from file, convert it to user class
- ActivityStreamsSubscription subscription = mapper.readValue(subscriptionJSON, CassandraSubscription.class);
- this.subscription = subscription;
-
- } catch (Exception e) {
- LOG.info("exception" + e);
-
- }
-
- }
-
- public boolean isAuthenticated() {
- return authenticated;
- }
-
- public void setAuthenticated(boolean authenticated) {
- this.authenticated = authenticated;
- }
-
- public String getInRoute() {
- return inRoute;
- }
-
- public void setInRoute(String inRoute) {
- this.inRoute = inRoute;
- }
-
- public void receive (List<String> activity){
- //add new activities to stream
- LOG.info("adding activities to subscription stream");
- stream.addAll(0,activity);
- }
-
//return the list of activities (stream) as a json string
public String getStream() {
@@ -86,18 +36,7 @@ public class ActivityStreamsSubscriberDe
this.lastUpdated = lastUpdated;
}
- public void init(){
- //any initialization... gets called directly after registration
-
-
-
- }
-
- public ActivityStreamsSubscription getSubscription() {
- return subscription;
- }
-
- public void setSubscription(ActivityStreamsSubscription subscription) {
- this.subscription = subscription;
+ public void receive(List<String> activities){
+ stream.addAll(activities);
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java Mon Oct 28 13:09:52 2013
@@ -1,13 +1,13 @@
package org.apache.streams.components.activitysubscriber.impl;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriber;
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
+import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -16,29 +16,46 @@ public class ActivityStreamsSubscriberWa
private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberWarehouseImpl.class);
private Map<String, ActivityStreamsSubscriber> subscribers;
+ private StreamsActivityRepositoryService activityService;
- public ActivityStreamsSubscriberWarehouseImpl(){
+ @Autowired
+ public ActivityStreamsSubscriberWarehouseImpl(StreamsActivityRepositoryService activityService) {
+ this.activityService = activityService;
subscribers = new HashMap<String, ActivityStreamsSubscriber>();
}
- public void register(ActivityStreamsSubscriber activitySubscriber) {
- if (!subscribers.containsKey(activitySubscriber.getInRoute())){
- subscribers.put(activitySubscriber.getInRoute(), activitySubscriber);
- activitySubscriber.init();
+ @Override
+ public void register(ActivityStreamsSubscription subscription) {
+ if (!subscribers.containsKey(subscription.getInRoute())) {
+ ActivityStreamsSubscriber subscriber = new ActivityStreamsSubscriberDelegate();
+ subscribers.put(subscription.getInRoute(), subscriber);
}
-
}
- //the warehouse can do some interesting things to make the filtering efficient i think...
- public ActivityStreamsSubscriber findSubscribersByID(String id){
- return subscribers.get(id);
+ @Override
+ public String getStream(String inRoute) {
+ ActivityStreamsSubscriber subscriber = getSubscriber(inRoute);
+ if (subscriber != null) {
+ return subscriber.getStream();
+ } else {
+ return "Registration Needed";
+ }
}
-
- public Collection<ActivityStreamsSubscriber> getAllSubscribers(){
- return subscribers.values();
+ @Override
+ public ActivityStreamsSubscriber getSubscriber(String inRoute) {
+ return subscribers.get(inRoute);
}
-
-
+ @Override
+ public void updateSubscriber(ActivityStreamsSubscription subscription) {
+ ActivityStreamsSubscriber subscriber = getSubscriber(subscription.getInRoute());
+ if (subscriber != null) {
+ //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost
+ Set<String> activities = new TreeSet<String>();
+ activities.addAll(activityService.getActivitiesForFilters(subscription.getFilters(), subscriber.getLastUpdated()));
+ subscriber.setLastUpdated(new Date());
+ subscriber.receive(new ArrayList<String>(activities));
+ }
+ }
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriberRegistrationService.java Mon Oct 28 13:09:52 2013
@@ -1,7 +1,5 @@
package org.apache.streams.components.service;
-import org.codehaus.jackson.JsonParseException;
-
import java.io.IOException;
public interface StreamsSubscriberRegistrationService {
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java Mon Oct 28 13:09:52 2013
@@ -5,7 +5,6 @@ import org.apache.streams.persistence.mo
import java.util.List;
public interface StreamsSubscriptionRepositoryService{
-
- List<String> getFilters(String authToken);
- void saveFilters(ActivityStreamsSubscription subscription);
+ void saveSubscription(ActivityStreamsSubscription subscription);
+ List<ActivityStreamsSubscription> getAllSubscriptions();
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java Mon Oct 28 13:09:52 2013
@@ -20,12 +20,14 @@ public class CassandraSubscriptionServic
this.repository = repository;
}
- public List<String> getFilters(String authToken) {
- return Arrays.asList(repository.getFilters(authToken).split(" "));
- }
-
- public void saveFilters(ActivityStreamsSubscription subscription) {
+ @Override
+ public void saveSubscription(ActivityStreamsSubscription subscription) {
subscription.setId("" + UUID.randomUUID());
repository.save(subscription);
}
+
+ @Override
+ public List<ActivityStreamsSubscription> getAllSubscriptions(){
+ return repository.getAllSubscriptions();
+ }
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityReceivingServiceImpl.java Mon Oct 28 13:09:52 2013
@@ -25,7 +25,6 @@ public class StreamsActivityReceivingSer
* @return the stream list in string form
* */
public String getActivity(String subscriberID){
- ActivityStreamsSubscriber subscriber = activityStreamsSubscriberWarehouse.findSubscribersByID(subscriberID);
- return subscriber.getStream();
+ return activityStreamsSubscriberWarehouse.getStream(subscriberID);
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java Mon Oct 28 13:09:52 2013
@@ -3,12 +3,9 @@ package org.apache.streams.components.se
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.service.StreamsSubscriberRegistrationService;
-import org.apache.streams.components.aggregation.ActivityAggregator;
import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
-import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriber;
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import org.apache.streams.persistence.model.ActivityStreamsSubscription;
-import org.apache.streams.components.activitysubscriber.impl.ActivityStreamsSubscriberDelegate;
import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
@@ -23,17 +20,14 @@ public class StreamsSubscriberRegistrati
private Log log = LogFactory.getLog(StreamsSubscriberRegistrationServiceImpl.class);
private StreamsSubscriptionRepositoryService subscriptionService;
- private ActivityAggregator activityAggregator;
private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
@Autowired
public StreamsSubscriberRegistrationServiceImpl(
StreamsSubscriptionRepositoryService subscriptionService,
- ActivityAggregator activityAggregator,
ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse
) {
this.subscriptionService = subscriptionService;
- this.activityAggregator = activityAggregator;
this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
}
@@ -47,18 +41,10 @@ public class StreamsSubscriberRegistrati
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
ActivityStreamsSubscription subscription = mapper.readValue(subscriberJSON, CassandraSubscription.class);
- if (subscription.getFilters() == null) {
- subscription.setFilters(subscriptionService.getFilters(subscription.getId()));
- } else {
- subscriptionService.saveFilters(subscription);
- }
-
- ActivityStreamsSubscriber subscriber = new ActivityStreamsSubscriberDelegate(subscription);
- subscriber.setAuthenticated(true);
- subscriber.setInRoute("" + UUID.randomUUID());
- activityAggregator.updateSubscriber(subscriber);
- activityStreamsSubscriberWarehouse.register(subscriber);
+ subscription.setInRoute("" + UUID.randomUUID());
+ subscriptionService.saveSubscription(subscription);
+ activityStreamsSubscriberWarehouse.register(subscription);
- return subscriber.getInRoute();
+ return subscription.getInRoute();
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberBolt.java Mon Oct 28 13:09:52 2013
@@ -7,8 +7,10 @@ import backtype.storm.topology.OutputFie
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
+import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import org.apache.streams.components.service.StreamsActivityRepositoryService;
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriber;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@@ -19,7 +21,7 @@ import java.util.*;
public class StormSubscriberBolt extends BaseBasicBolt {
private static ApplicationContext appContext;
BatchOutputCollector _collector;
- StreamsActivityRepositoryService activityService;
+ private ActivityStreamsSubscriberWarehouse subscriberWarehouse;
@Autowired
public StormSubscriberBolt(ApplicationContext ctx) {
@@ -28,24 +30,23 @@ public class StormSubscriberBolt extends
@Override
public void prepare(Map stormConf, TopologyContext context) {
- activityService = (StreamsActivityRepositoryService) appContext.getBean("cassandraActivityService");
+ subscriberWarehouse = (ActivityStreamsSubscriberWarehouse) appContext.getBean("activityStreamsSubscriberWarehouseImpl");
super.prepare(stormConf, context);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
- if(tuple.getValue(0) instanceof ActivityStreamsSubscriber){
- ActivityStreamsSubscriber subscriber = (ActivityStreamsSubscriber) tuple.getValue(0);
- this.updateSubscriber(subscriber);
+ if(tuple.getValue(0) instanceof ActivityStreamsSubscription){
+ ActivityStreamsSubscription subscription = (ActivityStreamsSubscription) tuple.getValue(0);
+ if(subscriberWarehouse.getSubscriber(subscription.getInRoute()) == null){
+ subscriberWarehouse.register(subscription);
+ }
+ subscriberWarehouse.updateSubscriber(subscription);
}
}
- public void updateSubscriber(ActivityStreamsSubscriber subscriber) {
- Set<String> activities = new TreeSet<String>();
- activities.addAll(activityService.getActivitiesForFilters(subscriber.getSubscription().getFilters(), subscriber.getLastUpdated()));
- //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost
- subscriber.setLastUpdated(new Date());
- subscriber.receive(new ArrayList<String>(activities));
+ public void updateSubscription(ActivityStreamsSubscription subscription) {
+
}
@Override
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java Mon Oct 28 13:09:52 2013
@@ -11,6 +11,9 @@ import org.apache.streams.components.act
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import org.apache.streams.components.activitysubscriber.impl.ActivityStreamsSubscriberWarehouseImpl;
import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.xml.XmlBeanFactory;
@@ -26,7 +29,7 @@ import java.util.Map;
public class StormSubscriberSpout extends BaseRichSpout {
private static ApplicationContext appContext;
- private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+ private StreamsSubscriptionRepositoryService repositoryService;
private SpoutOutputCollector _collector;
private Iterator iterator;
@@ -37,7 +40,7 @@ public class StormSubscriberSpout extend
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- activityStreamsSubscriberWarehouse = (ActivityStreamsSubscriberWarehouse)appContext.getBean("activityStreamsSubscriberWarehouseImpl");
+ repositoryService = (StreamsSubscriptionRepositoryService)appContext.getBean("cassandraSubscriptionService");
_collector = collector;
}
@@ -45,14 +48,13 @@ public class StormSubscriberSpout extend
@Override
public void nextTuple() {
Utils.sleep(10000);
- for (ActivityStreamsSubscriber activityStreamsSubscriber : activityStreamsSubscriberWarehouse.getAllSubscribers()) {
- _collector.emit(new Values(activityStreamsSubscriber));
+ for (ActivityStreamsSubscription subscription : repositoryService.getAllSubscriptions()) {
+ _collector.emit(new Values(subscription));
}
}
@Override
public void ack(Object id) {
- System.out.println("RandomSentenceSpout.ack: "+ id);
}
@Override
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java Mon Oct 28 13:09:52 2013
@@ -1,15 +1,16 @@
package org.apache.streams.persistence.model;
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
import java.util.List;
public interface ActivityStreamsSubscription {
- public void setFilters(List<String> filters);
- public List<String> getFilters();
+ void setFilters(List<String> filters);
+ List<String> getFilters();
+
+ String getId();
+ void setId(String id);
- public String getId();
- public void setId(String id);
+ String getInRoute();
+ void setInRoute(String inRoute);
}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java Mon Oct 28 13:09:52 2013
@@ -13,6 +13,8 @@ public class CassandraSubscription imple
private String id;
+ private String inRoute;
+
public void setFilters(List<String> filters) {
//TODO: it's possible that this could be null
this.filters = filters;
@@ -33,4 +35,14 @@ public class CassandraSubscription imple
public void setId(String id) {
this.id = id;
}
+
+ @Override
+ public String getInRoute() {
+ return inRoute;
+ }
+
+ @Override
+ public void setInRoute(String inRoute) {
+ this.inRoute = inRoute;
+ }
}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/SubscriptionRepository.java Mon Oct 28 13:09:52 2013
@@ -2,7 +2,10 @@ package org.apache.streams.persistence.r
import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import java.util.List;
+
public interface SubscriptionRepository {
- String getFilters(String id);
+ String getSubscriptionForId(String id);
+ List<ActivityStreamsSubscription> getAllSubscriptions();
void save(ActivityStreamsSubscription subscription);
}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java Mon Oct 28 13:09:52 2013
@@ -1,16 +1,22 @@
package org.apache.streams.persistence.repository.cassandra;
import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.persistence.configuration.CassandraConfiguration;
import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
import org.apache.streams.persistence.repository.SubscriptionRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
@Component
public class CassandraSubscriptionRepository implements SubscriptionRepository{
private static final Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
@@ -26,6 +32,7 @@ public class CassandraSubscriptionReposi
try {
keyspace.getSession().execute("CREATE TABLE " + configuration.getSubscriptionColumnFamilyName() + " (" +
"id text, " +
+ "inroute text, " +
"filters text, " +
"PRIMARY KEY (id));");
@@ -33,7 +40,7 @@ public class CassandraSubscriptionReposi
}
}
- public String getFilters(String id){
+ public String getSubscriptionForId(String id){
String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName() + " WHERE id = '" + id+"';";
ResultSet set = keyspace.getSession().execute(cql);
@@ -41,13 +48,33 @@ public class CassandraSubscriptionReposi
return set.one().getString("filters");
}
+ public List<ActivityStreamsSubscription> getAllSubscriptions(){
+ String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName();
+
+ ResultSet set = keyspace.getSession().execute(cql);
+ List<ActivityStreamsSubscription> results = new ArrayList<ActivityStreamsSubscription>();
+
+ for (Row row : set) {
+ ActivityStreamsSubscription subscription = new CassandraSubscription();
+
+ subscription.setId(row.getString("id"));
+ subscription.setInRoute(row.getString("inroute"));
+ subscription.setFilters(Arrays.asList(row.getString("filters").split(",")));
+
+ results.add(subscription);
+ }
+
+ return results;
+ }
+
public void save(ActivityStreamsSubscription subscription){
//TODO: will this overwrite?
String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName() + " (" +
- "id, filters) " +
+ "id, inroute, filters) " +
"VALUES ('" +
subscription.getId() + "','" +
- StringUtils.join(subscription.getFilters(), " ") +
+ subscription.getInRoute() + "','" +
+ StringUtils.join(subscription.getFilters(), ",") +
"')";
keyspace.getSession().execute(cql);
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/resources/cassandra.properties Mon Oct 28 13:09:52 2013
@@ -1,5 +1,5 @@
-cassandra.keyspaceName=keyspacetest
-cassandra.activitystreamsColumnFamilyName=activitystreams
-cassandra.subscriptionColumnFamilyName=subscriptions
-cassandra.publisherColumnFamilyName=publishers
+cassandra.keyspaceName=keyspaceB
+cassandra.activitystreamsColumnFamilyName=activitystreamsB
+cassandra.subscriptionColumnFamilyName=subscriptionsB
+cassandra.publisherColumnFamilyName=publishersB
cassandra.cassandraPort=127.0.0.1
\ No newline at end of file
Added: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java?rev=1536347&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepositoryTest.java Mon Oct 28 13:09:52 2013
@@ -0,0 +1,46 @@
+package org.apache.streams.persistence.repository.cassandra;
+
+import org.apache.streams.persistence.configuration.CassandraConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CassandraSubscriptionRepositoryTest {
+
+ private SubscriptionRepository repository;
+
+ @Before
+ public void setup() {
+ CassandraConfiguration configuration = new CassandraConfiguration();
+ configuration.setCassandraPort("127.0.0.1");
+ configuration.setSubscriptionColumnFamilyName("subscriptionsA");
+ configuration.setKeyspaceName("keyspacetest");
+ CassandraKeyspace keyspace = new CassandraKeyspace(configuration);
+
+ repository = new CassandraSubscriptionRepository(keyspace, configuration);
+ }
+
+ @Ignore
+ @Test
+ public void getAllSubscriptionsTest() {
+ List<ActivityStreamsSubscription> subscriptions = repository.getAllSubscriptions();
+
+ }
+
+ @Ignore
+ @Test
+ public void addSubscriptionsTest() {
+ ActivityStreamsSubscription subscription = new CassandraSubscription();
+ subscription.setFilters(Arrays.asList("r501", "tags"));
+ subscription.setId("newID2");
+ subscription.setInRoute("randomID");
+
+ repository.save(subscription);
+ }
+}
Modified: incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java?rev=1536347&r1=1536346&r2=1536347&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java (original)
+++ incubator/streams/branches/webservice/streams-web/src/test/java/org/apache/streams/mvc/integration/integration/IntegrationTest.java Mon Oct 28 13:09:52 2013
@@ -17,11 +17,9 @@ import org.springframework.web.context.W
import org.junit.Before;
import javax.servlet.ServletException;
-import java.io.IOException;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.view;
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration