You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/11/15 20:26:41 UTC
svn commit: r1542378 [1/2] - in /incubator/streams/branches/webservice: ./
streams-components/
streams-components/src/main/java/org/apache/streams/components/service/
streams-components/src/main/java/org/apache/streams/components/service/impl/
streams-...
Author: dsullivan
Date: Fri Nov 15 19:26:40 2013
New Revision: 1542378
URL: http://svn.apache.org/r1542378
Log:
now runs on mongo, should be easy enough to switch back and forth between mongo and cassandra
Added:
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepositoryTest.java
Removed:
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityRepositoryServiceTest.java
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceIntegrationTest.java
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRepositoryServiceTest.java
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryServiceTest.java
Modified:
incubator/streams/branches/webservice/pom.xml
incubator/streams/branches/webservice/streams-components/pom.xml
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java
incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java
incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java
incubator/streams/branches/webservice/streams-persistence/pom.xml
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java
incubator/streams/branches/webservice/streams-web/pom.xml
Modified: incubator/streams/branches/webservice/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/pom.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/pom.xml (original)
+++ incubator/streams/branches/webservice/pom.xml Fri Nov 15 19:26:40 2013
@@ -63,6 +63,8 @@
<camel.version>2.12.1</camel.version>
<logging.version>1.7.5</logging.version>
<easymock.version>3.2</easymock.version>
+ <mongo-java-driver.version>2.11.3</mongo-java-driver.version>
+ <javax-servlet.version>2.5</javax-servlet.version>
</properties>
<packaging>pom</packaging>
Modified: incubator/streams/branches/webservice/streams-components/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/pom.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/pom.xml (original)
+++ incubator/streams/branches/webservice/streams-components/pom.xml Fri Nov 15 19:26:40 2013
@@ -52,6 +52,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <!--<exclusion>-->
+ <!--<groupId>javax.servlet</groupId>-->
+ <!--<artifactId>servlet-api</artifactId>-->
+ <!--</exclusion>-->
+ <!--<exclusion>-->
+ <!--<groupId>org.mortbay.jetty</groupId>-->
+ <!--<artifactId>servlet-api</artifactId>-->
+ <!--</exclusion>-->
</exclusions>
</dependency>
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java Fri Nov 15 19:26:40 2013
@@ -10,7 +10,7 @@ import java.util.Set;
public interface StreamsActivityRepositoryService {
- void receiveActivity(ActivityStreamsPublisher publisher, String activityJSON) throws Exception;
+ void receiveActivity(String activityJSON) throws Exception;
List<ActivityStreamsEntry> getActivitiesForProviders(Set<String> providers, Date lastUpdated);
}
Added: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java (added)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,47 @@
+package org.apache.streams.components.service.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsEntry;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraActivityStreamsEntry;
+import org.apache.streams.persistence.model.mongo.MongoActivityStreamsEntry;
+import org.apache.streams.persistence.repository.ActivityStreamsRepository;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+@Component
+public class ActivityRepositoryServiceImpl implements StreamsActivityRepositoryService {
+
+ private static final transient Log LOG = LogFactory.getLog(ActivityRepositoryServiceImpl.class);
+
+ private ActivityStreamsRepository activityStreamsRepository;
+ private Class activityClass;
+ private ObjectMapper mapper;
+
+ @Autowired
+ public ActivityRepositoryServiceImpl(ActivityStreamsRepository activityStreamsRepository) {
+ this.activityStreamsRepository = activityStreamsRepository;
+ this.activityClass = MongoActivityStreamsEntry.class;
+ this.mapper = new ObjectMapper();
+ mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ @Override
+ public void receiveActivity(String activityJSON) throws Exception {
+ ActivityStreamsEntry streamsEntry = (ActivityStreamsEntry)mapper.readValue(activityJSON, activityClass);
+ streamsEntry.setPublished(new Date());
+ streamsEntry.setId(""+UUID.randomUUID());
+ activityStreamsRepository.save(streamsEntry);
+ }
+
+ @Override
+ public List<ActivityStreamsEntry> getActivitiesForProviders(Set<String> providers, Date lastUpdated){
+ return activityStreamsRepository.getActivitiesForFilters(providers, lastUpdated);
+ }
+}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -4,8 +4,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.service.StreamsActivityPublishingService;
import org.apache.streams.components.service.StreamsActivityRepositoryService;
-import org.apache.streams.components.service.StreamsPublisherRepositoryService;
import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -14,12 +14,12 @@ public class StreamsActivityPublishingSe
private Log log = LogFactory.getLog(StreamsActivityPublishingServiceImpl.class);
private StreamsActivityRepositoryService activityService;
- private StreamsPublisherRepositoryService publisherService;
+ private PublisherRepository publisherRepository;
@Autowired
- public StreamsActivityPublishingServiceImpl(StreamsActivityRepositoryService activityService, StreamsPublisherRepositoryService publisherService) {
+ public StreamsActivityPublishingServiceImpl(StreamsActivityRepositoryService activityService, PublisherRepository publisherRepository) {
this.activityService = activityService;
- this.publisherService = publisherService;
+ this.publisherRepository = publisherRepository;
}
/**
@@ -29,8 +29,9 @@ public class StreamsActivityPublishingSe
* @return a success message if no errors were thrown
* */
public String publish(String publisherID, String activityJSON) throws Exception {
- ActivityStreamsPublisher publisher = publisherService.getActivityStreamsPublisherByInRoute(publisherID);
- activityService.receiveActivity(publisher,activityJSON);
+ //TODO: this should eventually authenticate
+ ActivityStreamsPublisher publisher = publisherRepository.getPublisherByInRoute(publisherID);
+ activityService.receiveActivity(activityJSON);
return activityJSON;
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -4,8 +4,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import org.apache.streams.components.service.StreamsFiltersService;
-import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
@@ -20,13 +20,13 @@ import java.util.Map;
public class StreamsFiltersServiceImpl implements StreamsFiltersService {
private static final Log LOG = LogFactory.getLog(StreamsFiltersServiceImpl.class);
- private StreamsSubscriptionRepositoryService repositoryService;
+ private SubscriptionRepository subscriptionRepository;
private ActivityStreamsSubscriberWarehouse subscriberWarehouse;
private ObjectMapper mapper;
@Autowired
- public StreamsFiltersServiceImpl(StreamsSubscriptionRepositoryService repositoryService, ActivityStreamsSubscriberWarehouse subscriberWarehouse) {
- this.repositoryService = repositoryService;
+ public StreamsFiltersServiceImpl(SubscriptionRepository subscriptionRepository, ActivityStreamsSubscriberWarehouse subscriberWarehouse) {
+ this.subscriptionRepository = subscriptionRepository;
this.subscriberWarehouse = subscriberWarehouse;
this.mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@@ -34,7 +34,7 @@ public class StreamsFiltersServiceImpl i
@Override
public String getFilters(String subscriberId) throws Exception{
- ActivityStreamsSubscription subscription = repositoryService.getSubscriptionByInRoute(subscriberId);
+ ActivityStreamsSubscription subscription = subscriptionRepository.getSubscriptionByInRoute(subscriberId);
return mapper.writeValueAsString(subscription.getFilters());
}
@@ -43,9 +43,9 @@ public class StreamsFiltersServiceImpl i
LOG.info("updating filters for " + subscriberId);
Map<String, List> updateFilters = (Map<String, List>) mapper.readValue(tagsJson, Map.class);
- repositoryService.updateFilters(subscriberId, new HashSet<String>(updateFilters.get("add")), new HashSet<String>(updateFilters.get("remove")));
+ subscriptionRepository.updateFilters(subscriberId, new HashSet<String>(updateFilters.get("add")), new HashSet<String>(updateFilters.get("remove")));
subscriberWarehouse.getSubscriber(subscriberId).setLastUpdated(new Date(0));
- subscriberWarehouse.updateSubscriber(repositoryService.getSubscriptionByInRoute(subscriberId));
+ subscriberWarehouse.updateSubscriber(subscriptionRepository.getSubscriptionByInRoute(subscriberId));
return "Filters Updated Successfully!";
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -4,9 +4,10 @@ package org.apache.streams.components.se
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.service.StreamsPublisherRegistrationService;
-import org.apache.streams.components.service.StreamsPublisherRepositoryService;
import org.apache.streams.persistence.model.ActivityStreamsPublisher;
import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
+import org.apache.streams.persistence.model.mongo.MongoPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
@@ -18,13 +19,15 @@ import java.util.UUID;
public class StreamsPublisherRegistrationServiceImpl implements StreamsPublisherRegistrationService {
private Log log = LogFactory.getLog(StreamsPublisherRegistrationServiceImpl.class);
- private StreamsPublisherRepositoryService publisherRepositoryService;
+ private PublisherRepository publisherRepository;
+ private Class publisherClass;
private ObjectMapper mapper;
@Autowired
- public StreamsPublisherRegistrationServiceImpl(StreamsPublisherRepositoryService publisherRepositoryService) {
- this.publisherRepositoryService = publisherRepositoryService;
+ public StreamsPublisherRegistrationServiceImpl(PublisherRepository publisherRepositoryService) {
+ this.publisherRepository = publisherRepositoryService;
this.mapper = new ObjectMapper();
+ this.publisherClass = MongoPublisher.class;
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@@ -37,20 +40,20 @@ public class StreamsPublisherRegistratio
log.info("attempting to register publisher json: " + publisherJSON);
// read from file, convert it to user class
- ActivityStreamsPublisher publisher = mapper.readValue(publisherJSON, CassandraPublisher.class);
+ ActivityStreamsPublisher publisher = (ActivityStreamsPublisher)mapper.readValue(publisherJSON, publisherClass);
if (publisher.getSrc() == null) {
log.info("configuration src is null");
throw new Exception("configuration src is null");
}
- ActivityStreamsPublisher fromDb = publisherRepositoryService.getActivityStreamsPublisherBySrc(publisher.getSrc());
+ ActivityStreamsPublisher fromDb = publisherRepository.getPublisherBySrc(publisher.getSrc());
if(fromDb != null){
return fromDb.getInRoute();
}else{
publisher.setInRoute("" + UUID.randomUUID());
- publisherRepositoryService.savePublisher(publisher);
+ publisherRepository.save(publisher);
return publisher.getInRoute();
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -4,9 +4,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import org.apache.streams.components.service.StreamsSubscriberRegistrationService;
-import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
import org.apache.streams.persistence.model.ActivityStreamsSubscription;
import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
+import org.apache.streams.persistence.model.mongo.MongoSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
@@ -18,18 +19,20 @@ import java.util.UUID;
public class StreamsSubscriberRegistrationServiceImpl implements StreamsSubscriberRegistrationService {
private Log log = LogFactory.getLog(StreamsSubscriberRegistrationServiceImpl.class);
- private StreamsSubscriptionRepositoryService subscriptionRepositoryService;
+ private SubscriptionRepository subscriptionRepository;
private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+ private Class subscriptionClass;
private ObjectMapper mapper;
@Autowired
public StreamsSubscriberRegistrationServiceImpl(
- StreamsSubscriptionRepositoryService subscriptionRepositoryService,
+ SubscriptionRepository subscriptionRepository,
ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse
) {
- this.subscriptionRepositoryService = subscriptionRepositoryService;
+ this.subscriptionRepository = subscriptionRepository;
this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
this.mapper = new ObjectMapper();
+ this.subscriptionClass = MongoSubscription.class;
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@@ -43,14 +46,14 @@ public class StreamsSubscriberRegistrati
public String register(String subscriberJSON) throws Exception {
log.info("registering subscriber: "+subscriberJSON);
- ActivityStreamsSubscription subscription = mapper.readValue(subscriberJSON, CassandraSubscription.class);
- ActivityStreamsSubscription fromDB = subscriptionRepositoryService.getSubscriptionByUsername(subscription.getUsername());
+ ActivityStreamsSubscription subscription = (ActivityStreamsSubscription)mapper.readValue(subscriberJSON, subscriptionClass);
+ ActivityStreamsSubscription fromDB = subscriptionRepository.getSubscriptionByUsername(subscription.getUsername());
if (fromDB != null) {
return fromDB.getInRoute();
} else {
subscription.setInRoute("" + UUID.randomUUID());
- subscriptionRepositoryService.saveSubscription(subscription);
+ subscriptionRepository.save(subscription);
activityStreamsSubscriberWarehouse.register(subscription);
return subscription.getInRoute();
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java Fri Nov 15 19:26:40 2013
@@ -7,8 +7,8 @@ import backtype.storm.topology.base.Base
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
-import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@@ -20,9 +20,8 @@ import java.util.Map;
public class StormSubscriberSpout extends BaseRichSpout {
private static ApplicationContext appContext;
- private StreamsSubscriptionRepositoryService repositoryService;
+ private SubscriptionRepository subscriptionRepository;
private SpoutOutputCollector _collector;
- private Iterator iterator;
@Autowired
public StormSubscriberSpout(ApplicationContext ctx){
@@ -31,7 +30,7 @@ public class StormSubscriberSpout extend
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- repositoryService = (StreamsSubscriptionRepositoryService)appContext.getBean("cassandraSubscriptionService");
+ subscriptionRepository = (SubscriptionRepository)appContext.getBean("mongoSubscriptionRepository");
_collector = collector;
}
@@ -39,7 +38,7 @@ public class StormSubscriberSpout extend
@Override
public void nextTuple() {
Utils.sleep(10000);
- for (ActivityStreamsSubscription subscription : repositoryService.getAllSubscriptions()) {
+ for (ActivityStreamsSubscription subscription : subscriptionRepository.getAllSubscriptions()) {
_collector.emit(new Values(subscription));
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml Fri Nov 15 19:26:40 2013
@@ -10,4 +10,8 @@
<context:annotation-config />
<context:component-scan base-package="org.apache.streams.components"/>
+ <!--<bean id="conversionClass" class="java.lang.Class" factory-method="forName">-->
+ <!--<constructor-arg value="org.apache.streams.persistence.model.mongo.MongoActivityStreamsEntry"/>-->
+ <!--</bean>-->
+
</beans>
\ No newline at end of file
Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java Fri Nov 15 19:26:40 2013
@@ -2,6 +2,7 @@ package org.apache.streams.components.se
import org.apache.streams.components.service.impl.StreamsActivityPublishingServiceImpl;
import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
import org.junit.Before;
import org.junit.Test;
@@ -14,14 +15,14 @@ public class StreamsActivityPublishingSe
private StreamsActivityPublishingService activityPublishingService;
private StreamsActivityRepositoryService activityService;
- private StreamsPublisherRepositoryService publisherService;
+ private PublisherRepository publisherRepository;
@Before
public void setup(){
activityService = createMock(StreamsActivityRepositoryService.class);
- publisherService = createMock(StreamsPublisherRepositoryService.class);
+ publisherRepository = createMock(PublisherRepository.class);
- activityPublishingService = new StreamsActivityPublishingServiceImpl(activityService, publisherService);
+ activityPublishingService = new StreamsActivityPublishingServiceImpl(activityService, publisherRepository);
}
@Test
@@ -30,11 +31,11 @@ public class StreamsActivityPublishingSe
String activityJson = "myActionJson";
ActivityStreamsPublisher publisher = createMock(ActivityStreamsPublisher.class);
- expect(publisherService.getActivityStreamsPublisherByInRoute(inRoute)).andReturn(publisher);
- activityService.receiveActivity(publisher, activityJson);
+ expect(publisherRepository.getPublisherByInRoute(inRoute)).andReturn(publisher);
+ activityService.receiveActivity(activityJson);
expectLastCall();
- replay(publisherService,activityService);
+ replay(publisherRepository,activityService);
assertThat(activityJson, is(equalTo(activityPublishingService.publish(inRoute, activityJson))));
}
Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java Fri Nov 15 19:26:40 2013
@@ -5,6 +5,7 @@ import org.apache.streams.components.act
import org.apache.streams.components.activitysubscriber.impl.ActivityStreamsSubscriberDelegate;
import org.apache.streams.components.service.impl.StreamsFiltersServiceImpl;
import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
import org.junit.Before;
import org.junit.Test;
@@ -19,14 +20,14 @@ import static org.junit.Assert.assertTha
public class StreamsFiltersServiceTest {
private StreamsFiltersService filtersService;
- private StreamsSubscriptionRepositoryService repositoryService;
+ private SubscriptionRepository subscriptionRepository;
private ActivityStreamsSubscriberWarehouse subscriberWarehouse;
@Before
public void setup(){
- repositoryService = createMock(StreamsSubscriptionRepositoryService.class);
+ subscriptionRepository = createMock(SubscriptionRepository.class);
subscriberWarehouse = createMock(ActivityStreamsSubscriberWarehouse.class);
- filtersService = new StreamsFiltersServiceImpl(repositoryService, subscriberWarehouse);
+ filtersService = new StreamsFiltersServiceImpl(subscriptionRepository, subscriberWarehouse);
}
@Test
@@ -36,13 +37,13 @@ public class StreamsFiltersServiceTest {
ActivityStreamsSubscription subscription = createMock(ActivityStreamsSubscription.class);
ActivityStreamsSubscriber subscriber = new ActivityStreamsSubscriberDelegate();
- repositoryService.updateFilters(eq(subscriberId), isA(Set.class), isA(Set.class));
+ subscriptionRepository.updateFilters(eq(subscriberId), isA(Set.class), isA(Set.class));
expectLastCall();
- expect(repositoryService.getSubscriptionByInRoute(subscriberId)).andReturn(subscription);
+ expect(subscriptionRepository.getSubscriptionByInRoute(subscriberId)).andReturn(subscription);
expect(subscriberWarehouse.getSubscriber(subscriberId)).andReturn(subscriber);
subscriberWarehouse.updateSubscriber(subscription);
expectLastCall();
- replay(repositoryService,subscriberWarehouse);
+ replay(subscriptionRepository,subscriberWarehouse);
String returned = filtersService.updateFilters(subscriberId,tagsJson);
@@ -56,9 +57,9 @@ public class StreamsFiltersServiceTest {
ActivityStreamsSubscription subscription = createMock(ActivityStreamsSubscription.class);
Set<String> filters = new HashSet<String>(Arrays.asList("tags"));
- expect(repositoryService.getSubscriptionByInRoute(subscriberId)).andReturn(subscription);
+ expect(subscriptionRepository.getSubscriptionByInRoute(subscriberId)).andReturn(subscription);
expect(subscription.getFilters()).andReturn(filters);
- replay(subscription,repositoryService);
+ replay(subscription,subscriptionRepository);
String returned = filtersService.getFilters(subscriberId);
Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java Fri Nov 15 19:26:40 2013
@@ -1,8 +1,8 @@
package org.apache.streams.components.service;
-import org.apache.streams.components.service.impl.CassandraPublisherService;
import org.apache.streams.components.service.impl.StreamsPublisherRegistrationServiceImpl;
import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -16,15 +16,15 @@ import static org.junit.Assert.assertTha
public class StreamsPublisherRegistrationServiceTest {
private StreamsPublisherRegistrationService publisherRegistrationService;
- private StreamsPublisherRepositoryService publisherRepositoryService;
+ private PublisherRepository publisherRepository;
@Rule
public ExpectedException thrown = ExpectedException.none();
@Before
public void setup(){
- publisherRepositoryService = createMock(CassandraPublisherService.class);
- publisherRegistrationService = new StreamsPublisherRegistrationServiceImpl(publisherRepositoryService);
+ publisherRepository = createMock(PublisherRepository.class);
+ publisherRegistrationService = new StreamsPublisherRegistrationServiceImpl(publisherRepository);
}
@Test
@@ -43,9 +43,9 @@ public class StreamsPublisherRegistratio
String inRoute = "this is returned inRoute";
ActivityStreamsPublisher publisher= createMock(ActivityStreamsPublisher.class);
- expect(publisherRepositoryService.getActivityStreamsPublisherBySrc("this is my src!")).andReturn(publisher);
+ expect(publisherRepository.getPublisherBySrc("this is my src!")).andReturn(publisher);
expect(publisher.getInRoute()).andReturn(inRoute);
- replay(publisherRepositoryService,publisher);
+ replay(publisherRepository,publisher);
String returned = publisherRegistrationService.register(publisherJson);
@@ -57,14 +57,14 @@ public class StreamsPublisherRegistratio
String publisherJson = "{\"src\":\"this is my src!\"}";
String inRoute = "this is returned inRoute";
- expect(publisherRepositoryService.getActivityStreamsPublisherBySrc("this is my src!")).andReturn(null);
- publisherRepositoryService.savePublisher(isA(ActivityStreamsPublisher.class));
+ expect(publisherRepository.getPublisherBySrc("this is my src!")).andReturn(null);
+ publisherRepository.save(isA(ActivityStreamsPublisher.class));
expectLastCall();
- replay(publisherRepositoryService);
+ replay(publisherRepository);
String returned = publisherRegistrationService.register(publisherJson);
assertThat(returned, is(instanceOf(String.class)));
- verify(publisherRepositoryService);
+ verify(publisherRepository);
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java Fri Nov 15 19:26:40 2013
@@ -1,7 +1,6 @@
package org.apache.streams.components.service;
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
-import org.apache.streams.components.service.impl.CassandraSubscriptionService;
import org.apache.streams.components.service.impl.StreamsSubscriberRegistrationServiceImpl;
import org.apache.streams.persistence.configuration.CassandraConfiguration;
import org.apache.streams.persistence.repository.SubscriptionRepository;
@@ -28,9 +27,8 @@ public class StreamsSubscriberRegistrati
SubscriptionRepository subscriptionRepository = new CassandraSubscriptionRepository(keyspace,configuration);
ActivityStreamsSubscriberWarehouse warehouse = createMock(ActivityStreamsSubscriberWarehouse.class);
- StreamsSubscriptionRepositoryService repositoryService = new CassandraSubscriptionService(subscriptionRepository);
- streamsSubscriberRegistrationService = new StreamsSubscriberRegistrationServiceImpl(repositoryService, warehouse);
+ streamsSubscriberRegistrationService = new StreamsSubscriberRegistrationServiceImpl(subscriptionRepository, warehouse);
}
@Ignore
Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java Fri Nov 15 19:26:40 2013
@@ -3,6 +3,7 @@ package org.apache.streams.components.se
import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import org.apache.streams.components.service.impl.StreamsSubscriberRegistrationServiceImpl;
import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
import org.junit.Before;
import org.junit.Test;
@@ -14,15 +15,15 @@ import static org.junit.Assert.assertTha
public class StreamsSubscriberRegistrationServiceTest {
private StreamsSubscriberRegistrationService subscriberRegistrationService;
- private StreamsSubscriptionRepositoryService subscriptionRepositoryService;
+ private SubscriptionRepository subscriptionRepository;
private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
@Before
public void setup(){
- subscriptionRepositoryService = createMock(StreamsSubscriptionRepositoryService.class);
+ subscriptionRepository = createMock(SubscriptionRepository.class);
activityStreamsSubscriberWarehouse = createMock(ActivityStreamsSubscriberWarehouse.class);
- subscriberRegistrationService = new StreamsSubscriberRegistrationServiceImpl(subscriptionRepositoryService,activityStreamsSubscriberWarehouse);
+ subscriberRegistrationService = new StreamsSubscriberRegistrationServiceImpl(subscriptionRepository,activityStreamsSubscriberWarehouse);
}
@Test
@@ -32,9 +33,9 @@ public class StreamsSubscriberRegistrati
String subscriberJson = "{\"username\":\"blah\"}";
ActivityStreamsSubscription subscription = createMock(ActivityStreamsSubscription.class);
- expect(subscriptionRepositoryService.getSubscriptionByUsername(username)).andReturn(subscription);
+ expect(subscriptionRepository.getSubscriptionByUsername(username)).andReturn(subscription);
expect(subscription.getInRoute()).andReturn(inRoute);
- replay(subscriptionRepositoryService, subscription);
+ replay(subscriptionRepository, subscription);
String returned = subscriberRegistrationService.register(subscriberJson);
@@ -46,16 +47,16 @@ public class StreamsSubscriberRegistrati
String username = "blah";
String subscriberJson = "{\"username\":\"blah\"}";
- expect(subscriptionRepositoryService.getSubscriptionByUsername(username)).andReturn(null);
- subscriptionRepositoryService.saveSubscription(isA(ActivityStreamsSubscription.class));
+ expect(subscriptionRepository.getSubscriptionByUsername(username)).andReturn(null);
+ subscriptionRepository.save(isA(ActivityStreamsSubscription.class));
expectLastCall();
activityStreamsSubscriberWarehouse.register(isA(ActivityStreamsSubscription.class));
expectLastCall();
- replay(subscriptionRepositoryService, activityStreamsSubscriberWarehouse);
+ replay(subscriptionRepository, activityStreamsSubscriberWarehouse);
String returned = subscriberRegistrationService.register(subscriberJson);
assertThat(returned, is(instanceOf(String.class)));
- verify(subscriptionRepositoryService,activityStreamsSubscriberWarehouse);
+ verify(subscriptionRepository,activityStreamsSubscriberWarehouse);
}
}
Modified: incubator/streams/branches/webservice/streams-persistence/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/pom.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/pom.xml (original)
+++ incubator/streams/branches/webservice/streams-persistence/pom.xml Fri Nov 15 19:26:40 2013
@@ -42,6 +42,12 @@
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>${mongo-java-driver.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,61 @@
+package org.apache.streams.persistence.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
+
+@Configuration
+@PropertySource("classpath:mongo.properties")
+public class MongoConfiguration {
+
+ @Bean
+ public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
+ return new PropertySourcesPlaceholderConfigurer();
+ }
+
+ @Value("${mongo.dbName}")
+ private String dbName;
+
+ @Value("${mongo.activityStreamsCollectionName}")
+ private String activitystreamsCollectionName;
+
+ @Value("${mongo.subscriptionCollectionName}")
+ private String subscriptionCollectionName;
+
+ @Value("${mongo.publisherCollectionName}")
+ private String publisherCollectionName;
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+
+ public String getActivitystreamsCollectionName() {
+ return activitystreamsCollectionName;
+ }
+
+ public void setActivitystreamsCollectionName(String activitystreamsCollectionName) {
+ this.activitystreamsCollectionName = activitystreamsCollectionName;
+ }
+
+ public String getSubscriptionCollectionName() {
+ return subscriptionCollectionName;
+ }
+
+ public void setSubscriptionCollectionName(String subscriptionCollectionName) {
+ this.subscriptionCollectionName = subscriptionCollectionName;
+ }
+
+ public String getPublisherCollectionName() {
+ return publisherCollectionName;
+ }
+
+ public void setPublisherCollectionName(String publisherCollectionName) {
+ this.publisherCollectionName = publisherCollectionName;
+ }
+}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java Fri Nov 15 19:26:40 2013
@@ -4,7 +4,7 @@ import java.util.Set;
public interface ActivityStreamsSubscription {
- void setFilters(Set<String> tags);
+ void setFilters(Set<String> filters);
Set<String> getFilters();
String getInRoute();
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,74 @@
+package org.apache.streams.persistence.model.mongo;
+
+import com.mongodb.BasicDBObject;
+import org.apache.streams.persistence.model.ActivityStreamsEntry;
+import org.apache.streams.persistence.model.ActivityStreamsObject;
+import java.util.*;
+
+
+public class MongoActivityStreamsEntry extends BasicDBObject implements ActivityStreamsEntry {
+
+ public String getId() {
+ return (String)get("id");
+ }
+
+ public void setId(String id) {
+ put("id",id);
+ }
+
+ public String getTags() {
+ return (String)get("tags");
+ }
+
+ public void setTags(String tags) {
+ put("tags",tags);
+ }
+
+ public Date getPublished() {
+ return (Date)get("published");
+ }
+
+ public void setPublished(Date published) {
+ put("published",published);
+ }
+
+ public String getVerb() {
+ return (String)get("verb");
+ }
+
+ public void setVerb(String verb) {
+ put("verb",verb);
+ }
+
+ public ActivityStreamsObject getActor() {
+ return (ActivityStreamsObject)get("actor");
+ }
+
+ public void setActor(ActivityStreamsObject actor) {
+ put("actor",actor);
+ }
+
+ public ActivityStreamsObject getObject() {
+ return (ActivityStreamsObject)get("object");
+ }
+
+ public void setObject(ActivityStreamsObject object) {
+ put("object",object);
+ }
+
+ public ActivityStreamsObject getProvider() {
+ return (ActivityStreamsObject)get("provider");
+ }
+
+ public void setProvider(ActivityStreamsObject provider) {
+ put("provider",provider);
+ }
+
+ public ActivityStreamsObject getTarget() {
+ return (ActivityStreamsObject)get("target");
+ }
+
+ public void setTarget(ActivityStreamsObject target) {
+ put("target",target);
+ }
+}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,39 @@
+package org.apache.streams.persistence.model.mongo;
+
+import com.mongodb.BasicDBObject;
+import org.apache.streams.persistence.model.ActivityStreamsObject;
+
+public class MongoActivityStreamsObject extends BasicDBObject implements ActivityStreamsObject{
+
+ public String getDisplayName() {
+ return (String)get("displayName");
+ }
+
+ public void setDisplayName(String displayName) {
+ put("displayName", displayName);
+ }
+
+ public String getId() {
+ return (String)get("id");
+ }
+
+ public void setId(String id) {
+ put("id", id);
+ }
+
+ public String getObjectType() {
+ return (String)get("objectType");
+ }
+
+ public void setObjectType(String objectType) {
+ put("objectType", objectType);
+ }
+
+ public String getUrl() {
+ return (String)get("url");
+ }
+
+ public void setUrl(String url) {
+ put("url", url);
+ }
+}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,37 @@
+package org.apache.streams.persistence.model.mongo;
+
+import com.mongodb.BasicDBObject;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
+public class MongoPublisher extends BasicDBObject implements ActivityStreamsPublisher {
+
+ @Override
+ public String getInRoute() {
+ return (String)get("inRoute");
+ }
+
+ @Override
+ public String getId() {
+ return (String)get("id");
+ }
+
+ @Override
+ public String getSrc() {
+ return (String)get("src");
+ }
+
+ @Override
+ public void setInRoute(String inRoute) {
+ put("inRoute", inRoute);
+ }
+
+ @Override
+ public void setId(String id) {
+ put("id", id);
+ }
+
+ @Override
+ public void setSrc(String src) {
+ put("src", src);
+ }
+}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,54 @@
+package org.apache.streams.persistence.model.mongo;
+
+
+import com.mongodb.BasicDBObject;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongoSubscription extends BasicDBObject implements ActivityStreamsSubscription {
+
+ public MongoSubscription(){
+ put("filters",new HashSet<String>());
+ }
+
+ @Override
+ public void setFilters(Set<String> filters) {
+ put("filters",filters);
+ }
+
+ @Override
+ public Set<String> getFilters() {
+ Collection filters = (Collection<String>)get("filters");
+
+ if(filters instanceof Set){
+ return (Set<String>)filters;
+ }else{
+ filters = new HashSet<String>(filters);
+ put("filters",filters);
+ return (Set<String>)filters;
+ }
+ }
+
+ @Override
+ public String getInRoute() {
+ return (String)get("inRoute");
+ }
+
+ @Override
+ public void setInRoute(String inRoute) {
+ put("inRoute",inRoute);
+ }
+
+ @Override
+ public String getUsername() {
+ return (String)get("username");
+ }
+
+ @Override
+ public void setUsername(String username) {
+ put("username",username);
+ }
+}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java Fri Nov 15 19:26:40 2013
@@ -9,7 +9,7 @@ import java.util.Set;
public interface ActivityStreamsRepository {
void save(ActivityStreamsEntry entry);
- List<ActivityStreamsEntry> getActivitiesForProviders(Set<String> providers, Date lastUpdated);
+ List<ActivityStreamsEntry> getActivitiesForFilters(Set<String> filters, Date lastUpdated);
void dropTable(String table);
}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java Fri Nov 15 19:26:40 2013
@@ -19,7 +19,6 @@ import org.springframework.stereotype.Co
import java.util.*;
-@Component
public class CassandraActivityStreamsRepository implements ActivityStreamsRepository {
private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
@@ -90,10 +89,10 @@ public class CassandraActivityStreamsRep
}
@Override
- public List<ActivityStreamsEntry> getActivitiesForProviders(Set<String> providers, Date lastUpdated) {
+ public List<ActivityStreamsEntry> getActivitiesForFilters(Set<String> filters, Date lastUpdated) {
List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>();
- for (String providerName : providers) {
+ for (String providerName : filters) {
Select query = QueryBuilder.select().from(configuration.getActivitystreamsColumnFamilyName())
.where(QueryBuilder.eq("provider_displayname", providerName))
.and(QueryBuilder.gt("published", lastUpdated))
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java Fri Nov 15 19:26:40 2013
@@ -3,15 +3,9 @@ package org.apache.streams.persistence.r
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import org.apache.streams.persistence.configuration.CassandraConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import java.net.InetAddress;
-
-@Component
public class CassandraKeyspace {
private CassandraConfiguration configuration;
private Cluster cluster;
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java Fri Nov 15 19:26:40 2013
@@ -12,7 +12,6 @@ import org.apache.streams.persistence.re
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-@Component
public class CassandraPublisherRepository implements PublisherRepository {
private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java Fri Nov 15 19:26:40 2013
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-@Component
public class CassandraSubscriptionRepository implements SubscriptionRepository {
private Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,62 @@
+package org.apache.streams.persistence.repository.mongo;
+
+import com.mongodb.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsEntry;
+import org.apache.streams.persistence.model.mongo.MongoActivityStreamsEntry;
+import org.apache.streams.persistence.repository.ActivityStreamsRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+@Component
+public class MongoActivityStreamsRepository implements ActivityStreamsRepository {
+ private static final Log log = LogFactory.getLog(MongoActivityStreamsRepository.class);
+
+ private MongoDatabase database;
+ private MongoConfiguration configuration;
+ private DBCollection activityStreamsCollection;
+
+ @Autowired
+ public MongoActivityStreamsRepository(MongoDatabase database, MongoConfiguration configuration) {
+ this.database = database;
+ this.activityStreamsCollection = database.getDb().getCollection(configuration.getActivitystreamsCollectionName());
+ activityStreamsCollection.setObjectClass(MongoActivityStreamsEntry.class);
+ }
+
+
+ @Override
+ public void save(ActivityStreamsEntry entry) {
+ if (entry instanceof DBObject) {
+ activityStreamsCollection.save((DBObject) entry);
+ }
+ }
+
+ @Override
+ public List<ActivityStreamsEntry> getActivitiesForFilters(Set<String> filters, Date lastUpdated) {
+
+
+ DBObject query = QueryBuilder.start("published").greaterThan(lastUpdated).and(QueryBuilder.start().or(
+ QueryBuilder.start("provider.displayName").in(new ArrayList<String>(filters)).get(),
+ QueryBuilder.start("actor.displayName").in(new ArrayList<String>(filters)).get()
+ ).get()).get();
+
+ DBCursor cursor = activityStreamsCollection.find(query);
+
+ List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>();
+
+ while (cursor.hasNext()) {
+ ActivityStreamsEntry entry = (MongoActivityStreamsEntry) cursor.next();
+ results.add(entry);
+ }
+
+ return results;
+ }
+
+ @Override
+ public void dropTable(String table) {
+ }
+}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,35 @@
+package org.apache.streams.persistence.repository.mongo;
+
+import com.mongodb.DB;
+import com.mongodb.MongoClient;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.net.UnknownHostException;
+
+@Component
+public class MongoDatabase {
+ private Log log = LogFactory.getLog(MongoDatabase.class);
+ private DB db;
+
+ @Autowired
+ public MongoDatabase(MongoConfiguration configuration) {
+ try {
+ MongoClient mongoClient = new MongoClient();
+ db = mongoClient.getDB(configuration.getDbName());
+ } catch (UnknownHostException e) {
+ log.error("There was an error connecting to the database", e);
+ }
+ }
+
+ public DB getDb() {
+ return db;
+ }
+
+ public void setDb(DB db) {
+ this.db = db;
+ }
+}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,45 @@
+package org.apache.streams.persistence.repository.mongo;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.mongo.MongoPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MongoPublisherRepository implements PublisherRepository {
+ private static final Log log = LogFactory.getLog(MongoPublisherRepository.class);
+
+ private DBCollection publisherCollection;
+
+ @Autowired
+ public MongoPublisherRepository(MongoDatabase database, MongoConfiguration configuration) {
+ this.publisherCollection = database.getDb().getCollection(configuration.getPublisherCollectionName());
+ publisherCollection.setObjectClass(MongoPublisher.class);
+ }
+
+ @Override
+ public ActivityStreamsPublisher getPublisherByInRoute(String inRoute) {
+ DBObject query = QueryBuilder.start("inRoute").is(inRoute).get();
+ return (ActivityStreamsPublisher) publisherCollection.findOne(query);
+ }
+
+ @Override
+ public ActivityStreamsPublisher getPublisherBySrc(String src) {
+ DBObject query = QueryBuilder.start("src").is(src).get();
+ return (ActivityStreamsPublisher) publisherCollection.findOne(query);
+ }
+
+ @Override
+ public void save(ActivityStreamsPublisher publisher) {
+ if (publisher instanceof DBObject) {
+ publisherCollection.save((DBObject)publisher);
+ }
+ }
+}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,60 @@
+package org.apache.streams.persistence.repository.mongo;
+
+import com.mongodb.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.model.mongo.MongoSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Set;
+
+@Component
+public class MongoSubscriptionRepository implements SubscriptionRepository{
+ private static final Log log = LogFactory.getLog(MongoActivityStreamsRepository.class);
+
+ private DBCollection subscriptionCollection;
+
+ @Autowired
+ public MongoSubscriptionRepository(MongoDatabase database, MongoConfiguration configuration) {
+ this.subscriptionCollection = database.getDb().getCollection(configuration.getSubscriptionCollectionName());
+ subscriptionCollection.setObjectClass(MongoSubscription.class);
+ }
+
+ @Override
+ public ActivityStreamsSubscription getSubscriptionByInRoute(String inRoute) {
+ DBObject query = QueryBuilder.start("inRoute").is(inRoute).get();
+ return (ActivityStreamsSubscription)subscriptionCollection.findOne(query);
+ }
+
+ @Override
+ public ActivityStreamsSubscription getSubscriptionByUsername(String username) {
+ DBObject query = QueryBuilder.start("username").is(username).get();
+ return (ActivityStreamsSubscription)subscriptionCollection.findOne(query);
+ }
+
+ @Override
+ public List<ActivityStreamsSubscription> getAllSubscriptions() {
+ return (List<ActivityStreamsSubscription>)(List<?>)subscriptionCollection.find().toArray();
+ }
+
+ @Override
+ public void save(ActivityStreamsSubscription subscription) {
+ if (subscription instanceof DBObject) {
+ subscriptionCollection.save((DBObject)subscription);
+ }
+ }
+
+ @Override
+ public void updateFilters(String subscriberId, Set<String> add, Set<String> remove) {
+ DBObject query = QueryBuilder.start("inRoute").is(subscriberId).get();
+ BasicDBObject dbAdd = new BasicDBObject().append("$addToSet",new BasicDBObject().append("filters", new BasicDBObject().append("$each", add)));
+ BasicDBObject dbRemove = new BasicDBObject().append("$pullAll",new BasicDBObject().append("filters", remove));
+ subscriptionCollection.update(query, dbAdd);
+ subscriptionCollection.update(query, dbRemove);
+ }
+}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties Fri Nov 15 19:26:40 2013
@@ -0,0 +1,4 @@
+mongo.dbName=mongodatabase1
+mongo.activityStreamsCollectionName=mongoacitivytstreams1
+mongo.subscriptionCollectionName=mongosubscription1
+mongo.publisherCollectionName=mongopublisher1
\ No newline at end of file
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml Fri Nov 15 19:26:40 2013
@@ -9,4 +9,8 @@
<context:annotation-config />
<context:component-scan base-package="org.apache.streams.persistence"/>
+ <!--<bean id="activityStreamsRepository" class="org.apache.streams.persistence.repository.mongo.MongoActivityStreamsRepository"/>-->
+ <!--<bean id="publisherRepository" class="org.apache.streams.persistence.repository.mongo.MongoPublisherRepository"/>-->
+ <!--<bean id="subscriberRepository" class="org.apache.streams.persistence.repository.mongo.MongoSubscriptionRepository"/>-->
+ <!--<bean id="mongoDatabase" class="org.apache.streams.persistence.repository.mongo.MongoDatabase"/>-->
</beans>
\ No newline at end of file
Modified: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java Fri Nov 15 19:26:40 2013
@@ -35,13 +35,13 @@ public class CassandraActiivtyStreamsRep
@Ignore
@Test
public void testNullTags(){
- repository.getActivitiesForProviders(null,new Date(0));
+ repository.getActivitiesForFilters(null,new Date(0));
}
@Ignore
@Test
public void getActivitiesForTagsTest(){
- List<ActivityStreamsEntry> activities = repository.getActivitiesForProviders(new HashSet<String>(Arrays.asList("provider_displayname")),new Date(0));
+ List<ActivityStreamsEntry> activities = repository.getActivitiesForFilters(new HashSet<String>(Arrays.asList("provider_displayname")),new Date(0));
}
@Ignore
Added: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,76 @@
+package org.apache.streams.persistence.repository.mongo;
+
+
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsEntry;
+import org.apache.streams.persistence.model.ActivityStreamsObject;
+import org.apache.streams.persistence.model.mongo.MongoActivityStreamsEntry;
+import org.apache.streams.persistence.model.mongo.MongoActivityStreamsObject;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+
+public class MongoActivityStreamsRepositoryTest {
+ private MongoActivityStreamsRepository repository;
+ private MongoConfiguration configuration;
+ private MongoDatabase database;
+
+ @Before
+ public void setup() {
+ configuration = new MongoConfiguration();
+ configuration.setDbName("testdb1");
+ configuration.setActivitystreamsCollectionName("activitystreamstest2");
+ database = new MongoDatabase(configuration);
+ repository = new MongoActivityStreamsRepository(database, configuration);
+ }
+
+ @Ignore
+ @Test
+ public void saveActivity(){
+ ActivityStreamsEntry entry = new MongoActivityStreamsEntry();
+ ActivityStreamsObject actor = new MongoActivityStreamsObject();
+ ActivityStreamsObject target = new MongoActivityStreamsObject();
+ ActivityStreamsObject object = new MongoActivityStreamsObject();
+ ActivityStreamsObject provider = new MongoActivityStreamsObject();
+
+ actor.setDisplayName("actor_displayname");
+ actor.setId("actor_id");
+ actor.setObjectType("actor_objecttype");
+ actor.setUrl("actor_url");
+
+ target.setDisplayName("target_displayname");
+ target.setId("target_id");
+ target.setUrl("target_url");
+
+ object.setDisplayName("object_displayname");
+ object.setObjectType("object_objecttype");
+ object.setUrl("object_url");
+ object.setId("object_id");
+
+ provider.setUrl("provider_url");
+ provider.setDisplayName("provider_displayname");
+
+ entry.setPublished(new Date());
+ entry.setVerb("verb");
+ entry.setId("newid");
+ entry.setTags("tags");
+ entry.setActor(actor);
+ entry.setTarget(target);
+ entry.setObject(object);
+ entry.setProvider(provider);
+
+ repository.save(entry);
+ }
+
+ @Ignore
+ @Test
+ public void getActivitiesForProvidersTest(){
+ List<ActivityStreamsEntry> activities = repository.getActivitiesForFilters(new HashSet<String>(Arrays.asList("provider_displayname")),new Date(0));
+ }
+
+}
Added: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,54 @@
+package org.apache.streams.persistence.repository.mongo;
+
+
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
+import org.apache.streams.persistence.model.mongo.MongoPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoPublisherRepositoryTest {
+
+ private PublisherRepository repository;
+ private MongoConfiguration configuration;
+ private MongoDatabase database;
+
+ @Before
+ public void setup() {
+ configuration = new MongoConfiguration();
+ configuration.setDbName("testdb1");
+ configuration.setPublisherCollectionName("publishertest2");
+ database = new MongoDatabase(configuration);
+ repository = new MongoPublisherRepository(database, configuration);
+ }
+
+ @Ignore
+ @Test
+ public void saveTest(){
+ ActivityStreamsPublisher publisher = new MongoPublisher();
+
+ publisher.setId("newId");
+ publisher.setSrc("http://www.google.comd");
+ publisher.setInRoute("inRoute");
+
+ repository.save(publisher);
+ }
+
+ @Ignore
+ @Test
+ public void getPublisherTest(){
+
+ ActivityStreamsPublisher publisher = repository.getPublisherByInRoute("inRoute");
+
+ assertEquals(publisher.getSrc(),("http://www.google.comd"));
+ assertEquals(publisher.getId(),("newId"));
+ assertEquals(publisher.getInRoute(),("inRoute"));
+ }
+
+}