You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/11/15 20:26:41 UTC

svn commit: r1542378 [1/2] - in /incubator/streams/branches/webservice: ./ streams-components/ streams-components/src/main/java/org/apache/streams/components/service/ streams-components/src/main/java/org/apache/streams/components/service/impl/ streams-...

Author: dsullivan
Date: Fri Nov 15 19:26:40 2013
New Revision: 1542378

URL: http://svn.apache.org/r1542378
Log:
now runs on mongo, should be easy enough to switch back and forth between mongo and cassandra

Added:
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepositoryTest.java
Removed:
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityRepositoryServiceTest.java
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceIntegrationTest.java
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRepositoryServiceTest.java
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriptionRepositoryServiceTest.java
Modified:
    incubator/streams/branches/webservice/pom.xml
    incubator/streams/branches/webservice/streams-components/pom.xml
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java
    incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java
    incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java
    incubator/streams/branches/webservice/streams-persistence/pom.xml
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java
    incubator/streams/branches/webservice/streams-web/pom.xml

Modified: incubator/streams/branches/webservice/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/pom.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/pom.xml (original)
+++ incubator/streams/branches/webservice/pom.xml Fri Nov 15 19:26:40 2013
@@ -63,6 +63,8 @@
         <camel.version>2.12.1</camel.version>
         <logging.version>1.7.5</logging.version>
         <easymock.version>3.2</easymock.version>
+        <mongo-java-driver.version>2.11.3</mongo-java-driver.version>
+        <javax-servlet.version>2.5</javax-servlet.version>
     </properties>
 
     <packaging>pom</packaging>

Modified: incubator/streams/branches/webservice/streams-components/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/pom.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/pom.xml (original)
+++ incubator/streams/branches/webservice/streams-components/pom.xml Fri Nov 15 19:26:40 2013
@@ -52,6 +52,14 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <!--<exclusion>-->
+                    <!--<groupId>javax.servlet</groupId>-->
+                    <!--<artifactId>servlet-api</artifactId>-->
+                <!--</exclusion>-->
+                <!--<exclusion>-->
+                    <!--<groupId>org.mortbay.jetty</groupId>-->
+                    <!--<artifactId>servlet-api</artifactId>-->
+                <!--</exclusion>-->
             </exclusions>
         </dependency>
 

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java Fri Nov 15 19:26:40 2013
@@ -10,7 +10,7 @@ import java.util.Set;
 
 public interface StreamsActivityRepositoryService {
 
-    void receiveActivity(ActivityStreamsPublisher publisher, String activityJSON) throws Exception;
+    void receiveActivity(String activityJSON) throws Exception;
 
     List<ActivityStreamsEntry> getActivitiesForProviders(Set<String> providers, Date lastUpdated);
 }

Added: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java (added)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/ActivityRepositoryServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,47 @@
+package org.apache.streams.components.service.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsEntry;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraActivityStreamsEntry;
+import org.apache.streams.persistence.model.mongo.MongoActivityStreamsEntry;
+import org.apache.streams.persistence.repository.ActivityStreamsRepository;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+@Component
+public class ActivityRepositoryServiceImpl implements StreamsActivityRepositoryService {
+
+    private static final transient Log LOG = LogFactory.getLog(ActivityRepositoryServiceImpl.class);
+
+    private ActivityStreamsRepository activityStreamsRepository;
+    private Class activityClass;
+    private ObjectMapper mapper;
+
+    @Autowired
+    public ActivityRepositoryServiceImpl(ActivityStreamsRepository activityStreamsRepository) {
+        this.activityStreamsRepository = activityStreamsRepository;
+        this.activityClass = MongoActivityStreamsEntry.class;
+        this.mapper = new ObjectMapper();
+        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public void receiveActivity(String activityJSON) throws Exception {
+        ActivityStreamsEntry streamsEntry = (ActivityStreamsEntry)mapper.readValue(activityJSON, activityClass);
+        streamsEntry.setPublished(new Date());
+        streamsEntry.setId(""+UUID.randomUUID());
+        activityStreamsRepository.save(streamsEntry);
+    }
+
+    @Override
+    public List<ActivityStreamsEntry> getActivitiesForProviders(Set<String> providers, Date lastUpdated){
+        return activityStreamsRepository.getActivitiesForFilters(providers, lastUpdated);
+    }
+}

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -4,8 +4,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.service.StreamsActivityPublishingService;
 import org.apache.streams.components.service.StreamsActivityRepositoryService;
-import org.apache.streams.components.service.StreamsPublisherRepositoryService;
 import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -14,12 +14,12 @@ public class StreamsActivityPublishingSe
     private Log log = LogFactory.getLog(StreamsActivityPublishingServiceImpl.class);
 
     private StreamsActivityRepositoryService activityService;
-    private StreamsPublisherRepositoryService publisherService;
+    private PublisherRepository publisherRepository;
 
     @Autowired
-    public StreamsActivityPublishingServiceImpl(StreamsActivityRepositoryService activityService, StreamsPublisherRepositoryService publisherService) {
+    public StreamsActivityPublishingServiceImpl(StreamsActivityRepositoryService activityService, PublisherRepository publisherRepository) {
         this.activityService = activityService;
-        this.publisherService = publisherService;
+        this.publisherRepository = publisherRepository;
     }
 
     /**
@@ -29,8 +29,9 @@ public class StreamsActivityPublishingSe
      * @return a success message if no errors were thrown
      * */
     public String publish(String publisherID, String activityJSON) throws Exception {
-        ActivityStreamsPublisher publisher = publisherService.getActivityStreamsPublisherByInRoute(publisherID);
-        activityService.receiveActivity(publisher,activityJSON);
+        //TODO: this should eventually authenticate
+        ActivityStreamsPublisher publisher = publisherRepository.getPublisherByInRoute(publisherID);
+        activityService.receiveActivity(activityJSON);
         return activityJSON;
     }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsFiltersServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -4,8 +4,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
 import org.apache.streams.components.service.StreamsFiltersService;
-import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
 import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -20,13 +20,13 @@ import java.util.Map;
 public class StreamsFiltersServiceImpl implements StreamsFiltersService {
     private static final Log LOG = LogFactory.getLog(StreamsFiltersServiceImpl.class);
 
-    private StreamsSubscriptionRepositoryService repositoryService;
+    private SubscriptionRepository subscriptionRepository;
     private ActivityStreamsSubscriberWarehouse subscriberWarehouse;
     private ObjectMapper mapper;
 
     @Autowired
-    public StreamsFiltersServiceImpl(StreamsSubscriptionRepositoryService repositoryService, ActivityStreamsSubscriberWarehouse subscriberWarehouse) {
-        this.repositoryService = repositoryService;
+    public StreamsFiltersServiceImpl(SubscriptionRepository subscriptionRepository, ActivityStreamsSubscriberWarehouse subscriberWarehouse) {
+        this.subscriptionRepository = subscriptionRepository;
         this.subscriberWarehouse = subscriberWarehouse;
         this.mapper = new ObjectMapper();
         mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@@ -34,7 +34,7 @@ public class StreamsFiltersServiceImpl i
 
     @Override
     public String getFilters(String subscriberId) throws Exception{
-        ActivityStreamsSubscription subscription = repositoryService.getSubscriptionByInRoute(subscriberId);
+        ActivityStreamsSubscription subscription = subscriptionRepository.getSubscriptionByInRoute(subscriberId);
         return mapper.writeValueAsString(subscription.getFilters());
     }
 
@@ -43,9 +43,9 @@ public class StreamsFiltersServiceImpl i
         LOG.info("updating filters for " + subscriberId);
 
         Map<String, List> updateFilters = (Map<String, List>) mapper.readValue(tagsJson, Map.class);
-        repositoryService.updateFilters(subscriberId, new HashSet<String>(updateFilters.get("add")), new HashSet<String>(updateFilters.get("remove")));
+        subscriptionRepository.updateFilters(subscriberId, new HashSet<String>(updateFilters.get("add")), new HashSet<String>(updateFilters.get("remove")));
         subscriberWarehouse.getSubscriber(subscriberId).setLastUpdated(new Date(0));
-        subscriberWarehouse.updateSubscriber(repositoryService.getSubscriptionByInRoute(subscriberId));
+        subscriberWarehouse.updateSubscriber(subscriptionRepository.getSubscriptionByInRoute(subscriberId));
 
         return "Filters Updated Successfully!";
     }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -4,9 +4,10 @@ package org.apache.streams.components.se
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.service.StreamsPublisherRegistrationService;
-import org.apache.streams.components.service.StreamsPublisherRepositoryService;
 import org.apache.streams.persistence.model.ActivityStreamsPublisher;
 import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
+import org.apache.streams.persistence.model.mongo.MongoPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -18,13 +19,15 @@ import java.util.UUID;
 public class StreamsPublisherRegistrationServiceImpl implements StreamsPublisherRegistrationService {
     private Log log = LogFactory.getLog(StreamsPublisherRegistrationServiceImpl.class);
 
-    private StreamsPublisherRepositoryService publisherRepositoryService;
+    private PublisherRepository publisherRepository;
+    private Class publisherClass;
     private ObjectMapper mapper;
 
     @Autowired
-    public StreamsPublisherRegistrationServiceImpl(StreamsPublisherRepositoryService publisherRepositoryService) {
-        this.publisherRepositoryService = publisherRepositoryService;
+    public StreamsPublisherRegistrationServiceImpl(PublisherRepository publisherRepositoryService) {
+        this.publisherRepository = publisherRepositoryService;
         this.mapper = new ObjectMapper();
+        this.publisherClass = MongoPublisher.class;
         mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     }
 
@@ -37,20 +40,20 @@ public class StreamsPublisherRegistratio
         log.info("attempting to register publisher json: " + publisherJSON);
 
         // read from file, convert it to user class
-        ActivityStreamsPublisher publisher = mapper.readValue(publisherJSON, CassandraPublisher.class);
+        ActivityStreamsPublisher publisher = (ActivityStreamsPublisher)mapper.readValue(publisherJSON, publisherClass);
 
         if (publisher.getSrc() == null) {
             log.info("configuration src is null");
             throw new Exception("configuration src is null");
         }
 
-        ActivityStreamsPublisher fromDb = publisherRepositoryService.getActivityStreamsPublisherBySrc(publisher.getSrc());
+        ActivityStreamsPublisher fromDb = publisherRepository.getPublisherBySrc(publisher.getSrc());
 
         if(fromDb != null){
             return fromDb.getInRoute();
         }else{
             publisher.setInRoute("" + UUID.randomUUID());
-            publisherRepositoryService.savePublisher(publisher);
+            publisherRepository.save(publisher);
 
             return publisher.getInRoute();
         }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java Fri Nov 15 19:26:40 2013
@@ -4,9 +4,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
 import org.apache.streams.components.service.StreamsSubscriberRegistrationService;
-import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
 import org.apache.streams.persistence.model.ActivityStreamsSubscription;
 import org.apache.streams.persistence.model.cassandra.CassandraSubscription;
+import org.apache.streams.persistence.model.mongo.MongoSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -18,18 +19,20 @@ import java.util.UUID;
 public class StreamsSubscriberRegistrationServiceImpl implements StreamsSubscriberRegistrationService {
     private Log log = LogFactory.getLog(StreamsSubscriberRegistrationServiceImpl.class);
 
-    private StreamsSubscriptionRepositoryService subscriptionRepositoryService;
+    private SubscriptionRepository subscriptionRepository;
     private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+    private Class subscriptionClass;
     private ObjectMapper mapper;
 
     @Autowired
     public StreamsSubscriberRegistrationServiceImpl(
-            StreamsSubscriptionRepositoryService subscriptionRepositoryService,
+            SubscriptionRepository subscriptionRepository,
             ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse
     ) {
-        this.subscriptionRepositoryService = subscriptionRepositoryService;
+        this.subscriptionRepository = subscriptionRepository;
         this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
         this.mapper = new ObjectMapper();
+        this.subscriptionClass = MongoSubscription.class;
         mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     }
 
@@ -43,14 +46,14 @@ public class StreamsSubscriberRegistrati
     public String register(String subscriberJSON) throws Exception {
         log.info("registering subscriber: "+subscriberJSON);
 
-        ActivityStreamsSubscription subscription = mapper.readValue(subscriberJSON, CassandraSubscription.class);
-        ActivityStreamsSubscription fromDB = subscriptionRepositoryService.getSubscriptionByUsername(subscription.getUsername());
+        ActivityStreamsSubscription subscription = (ActivityStreamsSubscription)mapper.readValue(subscriberJSON, subscriptionClass);
+        ActivityStreamsSubscription fromDB = subscriptionRepository.getSubscriptionByUsername(subscription.getUsername());
 
         if (fromDB != null) {
             return fromDB.getInRoute();
         } else {
             subscription.setInRoute("" + UUID.randomUUID());
-            subscriptionRepositoryService.saveSubscription(subscription);
+            subscriptionRepository.save(subscription);
             activityStreamsSubscriberWarehouse.register(subscription);
 
             return subscription.getInRoute();

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/storm/StormSubscriberSpout.java Fri Nov 15 19:26:40 2013
@@ -7,8 +7,8 @@ import backtype.storm.topology.base.Base
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
-import org.apache.streams.components.service.StreamsSubscriptionRepositoryService;
 import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
@@ -20,9 +20,8 @@ import java.util.Map;
 public class StormSubscriberSpout extends BaseRichSpout {
 
     private static ApplicationContext appContext;
-    private StreamsSubscriptionRepositoryService repositoryService;
+    private SubscriptionRepository subscriptionRepository;
     private SpoutOutputCollector _collector;
-    private Iterator iterator;
 
     @Autowired
     public StormSubscriberSpout(ApplicationContext ctx){
@@ -31,7 +30,7 @@ public class StormSubscriberSpout extend
 
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        repositoryService = (StreamsSubscriptionRepositoryService)appContext.getBean("cassandraSubscriptionService");
+        subscriptionRepository = (SubscriptionRepository)appContext.getBean("mongoSubscriptionRepository");
 
         _collector = collector;
     }
@@ -39,7 +38,7 @@ public class StormSubscriberSpout extend
     @Override
     public void nextTuple() {
         Utils.sleep(10000);
-        for (ActivityStreamsSubscription subscription : repositoryService.getAllSubscriptions()) {
+        for (ActivityStreamsSubscription subscription : subscriptionRepository.getAllSubscriptions()) {
             _collector.emit(new Values(subscription));
         }
     }

Modified: incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/resources/streams_components_applicationContext.xml Fri Nov 15 19:26:40 2013
@@ -10,4 +10,8 @@
     <context:annotation-config />
     <context:component-scan base-package="org.apache.streams.components"/>
 
+    <!--<bean id="conversionClass" class="java.lang.Class" factory-method="forName">-->
+        <!--<constructor-arg value="org.apache.streams.persistence.model.mongo.MongoActivityStreamsEntry"/>-->
+    <!--</bean>-->
+
 </beans>
\ No newline at end of file

Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsActivityPublishingServiceTest.java Fri Nov 15 19:26:40 2013
@@ -2,6 +2,7 @@ package org.apache.streams.components.se
 
 import org.apache.streams.components.service.impl.StreamsActivityPublishingServiceImpl;
 import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -14,14 +15,14 @@ public class StreamsActivityPublishingSe
 
     private StreamsActivityPublishingService activityPublishingService;
     private StreamsActivityRepositoryService activityService;
-    private StreamsPublisherRepositoryService publisherService;
+    private PublisherRepository publisherRepository;
 
     @Before
     public void setup(){
         activityService = createMock(StreamsActivityRepositoryService.class);
-        publisherService = createMock(StreamsPublisherRepositoryService.class);
+        publisherRepository = createMock(PublisherRepository.class);
 
-        activityPublishingService = new StreamsActivityPublishingServiceImpl(activityService, publisherService);
+        activityPublishingService = new StreamsActivityPublishingServiceImpl(activityService, publisherRepository);
     }
 
     @Test
@@ -30,11 +31,11 @@ public class StreamsActivityPublishingSe
         String activityJson = "myActionJson";
         ActivityStreamsPublisher publisher = createMock(ActivityStreamsPublisher.class);
 
-        expect(publisherService.getActivityStreamsPublisherByInRoute(inRoute)).andReturn(publisher);
-        activityService.receiveActivity(publisher, activityJson);
+        expect(publisherRepository.getPublisherByInRoute(inRoute)).andReturn(publisher);
+        activityService.receiveActivity(activityJson);
         expectLastCall();
 
-        replay(publisherService,activityService);
+        replay(publisherRepository,activityService);
 
         assertThat(activityJson, is(equalTo(activityPublishingService.publish(inRoute, activityJson))));
     }

Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsFiltersServiceTest.java Fri Nov 15 19:26:40 2013
@@ -5,6 +5,7 @@ import org.apache.streams.components.act
 import org.apache.streams.components.activitysubscriber.impl.ActivityStreamsSubscriberDelegate;
 import org.apache.streams.components.service.impl.StreamsFiltersServiceImpl;
 import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -19,14 +20,14 @@ import static org.junit.Assert.assertTha
 
 public class StreamsFiltersServiceTest {
     private StreamsFiltersService filtersService;
-    private StreamsSubscriptionRepositoryService repositoryService;
+    private SubscriptionRepository subscriptionRepository;
     private ActivityStreamsSubscriberWarehouse subscriberWarehouse;
 
     @Before
     public void setup(){
-        repositoryService = createMock(StreamsSubscriptionRepositoryService.class);
+        subscriptionRepository = createMock(SubscriptionRepository.class);
         subscriberWarehouse = createMock(ActivityStreamsSubscriberWarehouse.class);
-        filtersService = new StreamsFiltersServiceImpl(repositoryService, subscriberWarehouse);
+        filtersService = new StreamsFiltersServiceImpl(subscriptionRepository, subscriberWarehouse);
     }
 
     @Test
@@ -36,13 +37,13 @@ public class StreamsFiltersServiceTest {
         ActivityStreamsSubscription subscription = createMock(ActivityStreamsSubscription.class);
         ActivityStreamsSubscriber subscriber = new ActivityStreamsSubscriberDelegate();
 
-        repositoryService.updateFilters(eq(subscriberId), isA(Set.class), isA(Set.class));
+        subscriptionRepository.updateFilters(eq(subscriberId), isA(Set.class), isA(Set.class));
         expectLastCall();
-        expect(repositoryService.getSubscriptionByInRoute(subscriberId)).andReturn(subscription);
+        expect(subscriptionRepository.getSubscriptionByInRoute(subscriberId)).andReturn(subscription);
         expect(subscriberWarehouse.getSubscriber(subscriberId)).andReturn(subscriber);
         subscriberWarehouse.updateSubscriber(subscription);
         expectLastCall();
-        replay(repositoryService,subscriberWarehouse);
+        replay(subscriptionRepository,subscriberWarehouse);
 
         String returned = filtersService.updateFilters(subscriberId,tagsJson);
 
@@ -56,9 +57,9 @@ public class StreamsFiltersServiceTest {
         ActivityStreamsSubscription subscription = createMock(ActivityStreamsSubscription.class);
         Set<String> filters = new HashSet<String>(Arrays.asList("tags"));
 
-        expect(repositoryService.getSubscriptionByInRoute(subscriberId)).andReturn(subscription);
+        expect(subscriptionRepository.getSubscriptionByInRoute(subscriberId)).andReturn(subscription);
         expect(subscription.getFilters()).andReturn(filters);
-        replay(subscription,repositoryService);
+        replay(subscription,subscriptionRepository);
 
         String returned = filtersService.getFilters(subscriberId);
 

Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsPublisherRegistrationServiceTest.java Fri Nov 15 19:26:40 2013
@@ -1,8 +1,8 @@
 package org.apache.streams.components.service;
 
-import org.apache.streams.components.service.impl.CassandraPublisherService;
 import org.apache.streams.components.service.impl.StreamsPublisherRegistrationServiceImpl;
 import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -16,15 +16,15 @@ import static org.junit.Assert.assertTha
 
 public class StreamsPublisherRegistrationServiceTest {
     private StreamsPublisherRegistrationService publisherRegistrationService;
-    private StreamsPublisherRepositoryService publisherRepositoryService;
+    private PublisherRepository publisherRepository;
 
     @Rule
     public ExpectedException thrown = ExpectedException.none();
 
     @Before
     public void setup(){
-        publisherRepositoryService = createMock(CassandraPublisherService.class);
-        publisherRegistrationService = new StreamsPublisherRegistrationServiceImpl(publisherRepositoryService);
+        publisherRepository = createMock(PublisherRepository.class);
+        publisherRegistrationService = new StreamsPublisherRegistrationServiceImpl(publisherRepository);
     }
 
     @Test
@@ -43,9 +43,9 @@ public class StreamsPublisherRegistratio
         String inRoute = "this is returned inRoute";
         ActivityStreamsPublisher publisher= createMock(ActivityStreamsPublisher.class);
 
-        expect(publisherRepositoryService.getActivityStreamsPublisherBySrc("this is my src!")).andReturn(publisher);
+        expect(publisherRepository.getPublisherBySrc("this is my src!")).andReturn(publisher);
         expect(publisher.getInRoute()).andReturn(inRoute);
-        replay(publisherRepositoryService,publisher);
+        replay(publisherRepository,publisher);
 
         String returned = publisherRegistrationService.register(publisherJson);
 
@@ -57,14 +57,14 @@ public class StreamsPublisherRegistratio
         String publisherJson = "{\"src\":\"this is my src!\"}";
         String inRoute = "this is returned inRoute";
 
-        expect(publisherRepositoryService.getActivityStreamsPublisherBySrc("this is my src!")).andReturn(null);
-        publisherRepositoryService.savePublisher(isA(ActivityStreamsPublisher.class));
+        expect(publisherRepository.getPublisherBySrc("this is my src!")).andReturn(null);
+        publisherRepository.save(isA(ActivityStreamsPublisher.class));
         expectLastCall();
-        replay(publisherRepositoryService);
+        replay(publisherRepository);
 
         String returned = publisherRegistrationService.register(publisherJson);
 
         assertThat(returned, is(instanceOf(String.class)));
-        verify(publisherRepositoryService);
+        verify(publisherRepository);
     }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceIntegrationTest.java Fri Nov 15 19:26:40 2013
@@ -1,7 +1,6 @@
 package org.apache.streams.components.service;
 
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
-import org.apache.streams.components.service.impl.CassandraSubscriptionService;
 import org.apache.streams.components.service.impl.StreamsSubscriberRegistrationServiceImpl;
 import org.apache.streams.persistence.configuration.CassandraConfiguration;
 import org.apache.streams.persistence.repository.SubscriptionRepository;
@@ -28,9 +27,8 @@ public class StreamsSubscriberRegistrati
         SubscriptionRepository subscriptionRepository = new CassandraSubscriptionRepository(keyspace,configuration);
 
         ActivityStreamsSubscriberWarehouse warehouse = createMock(ActivityStreamsSubscriberWarehouse.class);
-        StreamsSubscriptionRepositoryService repositoryService = new CassandraSubscriptionService(subscriptionRepository);
 
-        streamsSubscriberRegistrationService = new StreamsSubscriberRegistrationServiceImpl(repositoryService, warehouse);
+        streamsSubscriberRegistrationService = new StreamsSubscriberRegistrationServiceImpl(subscriptionRepository, warehouse);
     }
 
     @Ignore

Modified: incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/test/java/org/apache/streams/components/service/StreamsSubscriberRegistrationServiceTest.java Fri Nov 15 19:26:40 2013
@@ -3,6 +3,7 @@ package org.apache.streams.components.se
 import org.apache.streams.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
 import org.apache.streams.components.service.impl.StreamsSubscriberRegistrationServiceImpl;
 import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -14,15 +15,15 @@ import static org.junit.Assert.assertTha
 
 public class StreamsSubscriberRegistrationServiceTest {
     private StreamsSubscriberRegistrationService subscriberRegistrationService;
-    private StreamsSubscriptionRepositoryService subscriptionRepositoryService;
+    private SubscriptionRepository subscriptionRepository;
     private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
 
     @Before
     public void setup(){
-        subscriptionRepositoryService = createMock(StreamsSubscriptionRepositoryService.class);
+        subscriptionRepository = createMock(SubscriptionRepository.class);
         activityStreamsSubscriberWarehouse = createMock(ActivityStreamsSubscriberWarehouse.class);
 
-        subscriberRegistrationService = new StreamsSubscriberRegistrationServiceImpl(subscriptionRepositoryService,activityStreamsSubscriberWarehouse);
+        subscriberRegistrationService = new StreamsSubscriberRegistrationServiceImpl(subscriptionRepository,activityStreamsSubscriberWarehouse);
     }
 
     @Test
@@ -32,9 +33,9 @@ public class StreamsSubscriberRegistrati
         String subscriberJson = "{\"username\":\"blah\"}";
 
         ActivityStreamsSubscription subscription = createMock(ActivityStreamsSubscription.class);
-        expect(subscriptionRepositoryService.getSubscriptionByUsername(username)).andReturn(subscription);
+        expect(subscriptionRepository.getSubscriptionByUsername(username)).andReturn(subscription);
         expect(subscription.getInRoute()).andReturn(inRoute);
-        replay(subscriptionRepositoryService, subscription);
+        replay(subscriptionRepository, subscription);
 
         String returned = subscriberRegistrationService.register(subscriberJson);
 
@@ -46,16 +47,16 @@ public class StreamsSubscriberRegistrati
         String username = "blah";
         String subscriberJson = "{\"username\":\"blah\"}";
 
-        expect(subscriptionRepositoryService.getSubscriptionByUsername(username)).andReturn(null);
-        subscriptionRepositoryService.saveSubscription(isA(ActivityStreamsSubscription.class));
+        expect(subscriptionRepository.getSubscriptionByUsername(username)).andReturn(null);
+        subscriptionRepository.save(isA(ActivityStreamsSubscription.class));
         expectLastCall();
         activityStreamsSubscriberWarehouse.register(isA(ActivityStreamsSubscription.class));
         expectLastCall();
-        replay(subscriptionRepositoryService, activityStreamsSubscriberWarehouse);
+        replay(subscriptionRepository, activityStreamsSubscriberWarehouse);
 
         String returned = subscriberRegistrationService.register(subscriberJson);
 
         assertThat(returned, is(instanceOf(String.class)));
-        verify(subscriptionRepositoryService,activityStreamsSubscriberWarehouse);
+        verify(subscriptionRepository,activityStreamsSubscriberWarehouse);
     }
 }

Modified: incubator/streams/branches/webservice/streams-persistence/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/pom.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/pom.xml (original)
+++ incubator/streams/branches/webservice/streams-persistence/pom.xml Fri Nov 15 19:26:40 2013
@@ -42,6 +42,12 @@
             <artifactId>junit</artifactId>
             <version>${junit.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>${mongo-java-driver.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/configuration/MongoConfiguration.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,61 @@
+package org.apache.streams.persistence.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
+
+@Configuration
+@PropertySource("classpath:mongo.properties")
+public class MongoConfiguration {
+
+    @Bean
+    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
+        return new PropertySourcesPlaceholderConfigurer();
+    }
+
+    @Value("${mongo.dbName}")
+    private String dbName;
+
+    @Value("${mongo.activityStreamsCollectionName}")
+    private String activitystreamsCollectionName;
+
+    @Value("${mongo.subscriptionCollectionName}")
+    private String subscriptionCollectionName;
+
+    @Value("${mongo.publisherCollectionName}")
+    private String publisherCollectionName;
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public void setDbName(String dbName) {
+        this.dbName = dbName;
+    }
+
+    public String getActivitystreamsCollectionName() {
+        return activitystreamsCollectionName;
+    }
+
+    public void setActivitystreamsCollectionName(String activitystreamsCollectionName) {
+        this.activitystreamsCollectionName = activitystreamsCollectionName;
+    }
+
+    public String getSubscriptionCollectionName() {
+        return subscriptionCollectionName;
+    }
+
+    public void setSubscriptionCollectionName(String subscriptionCollectionName) {
+        this.subscriptionCollectionName = subscriptionCollectionName;
+    }
+
+    public String getPublisherCollectionName() {
+        return publisherCollectionName;
+    }
+
+    public void setPublisherCollectionName(String publisherCollectionName) {
+        this.publisherCollectionName = publisherCollectionName;
+    }
+}

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java Fri Nov 15 19:26:40 2013
@@ -4,7 +4,7 @@ import java.util.Set;
 
 public interface ActivityStreamsSubscription {
 
-    void setFilters(Set<String> tags);
+    void setFilters(Set<String> filters);
     Set<String> getFilters();
 
     String getInRoute();

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsEntry.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,74 @@
+package org.apache.streams.persistence.model.mongo;
+
+import com.mongodb.BasicDBObject;
+import org.apache.streams.persistence.model.ActivityStreamsEntry;
+import org.apache.streams.persistence.model.ActivityStreamsObject;
+import java.util.*;
+
+
+public class MongoActivityStreamsEntry extends BasicDBObject implements ActivityStreamsEntry {
+
+    public String getId() {
+        return (String)get("id");
+    }
+
+    public void setId(String id) {
+        put("id",id);
+    }
+
+    public String getTags() {
+        return (String)get("tags");
+    }
+
+    public void setTags(String tags) {
+        put("tags",tags);
+    }
+
+    public Date getPublished() {
+        return (Date)get("published");
+    }
+
+    public void setPublished(Date published) {
+        put("published",published);
+    }
+
+    public String getVerb() {
+        return (String)get("verb");
+    }
+
+    public void setVerb(String verb) {
+        put("verb",verb);
+    }
+
+    public ActivityStreamsObject getActor() {
+        return (ActivityStreamsObject)get("actor");
+    }
+
+    public void setActor(ActivityStreamsObject actor) {
+        put("actor",actor);
+    }
+
+    public ActivityStreamsObject getObject() {
+        return (ActivityStreamsObject)get("object");
+    }
+
+    public void setObject(ActivityStreamsObject object) {
+        put("object",object);
+    }
+
+    public ActivityStreamsObject getProvider() {
+        return (ActivityStreamsObject)get("provider");
+    }
+
+    public void setProvider(ActivityStreamsObject provider) {
+        put("provider",provider);
+    }
+
+    public ActivityStreamsObject getTarget() {
+        return (ActivityStreamsObject)get("target");
+    }
+
+    public void setTarget(ActivityStreamsObject target) {
+        put("target",target);
+    }
+}

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoActivityStreamsObject.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,39 @@
+package org.apache.streams.persistence.model.mongo;
+
+import com.mongodb.BasicDBObject;
+import org.apache.streams.persistence.model.ActivityStreamsObject;
+
+public class MongoActivityStreamsObject extends BasicDBObject implements ActivityStreamsObject{
+
+    public String getDisplayName() {
+        return (String)get("displayName");
+    }
+
+    public void setDisplayName(String displayName) {
+        put("displayName", displayName);
+    }
+
+    public String getId() {
+        return (String)get("id");
+    }
+
+    public void setId(String id) {
+        put("id", id);
+    }
+
+    public String getObjectType() {
+        return (String)get("objectType");
+    }
+
+    public void setObjectType(String objectType) {
+        put("objectType", objectType);
+    }
+
+    public String getUrl() {
+        return (String)get("url");
+    }
+
+    public void setUrl(String url) {
+        put("url", url);
+    }
+}

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoPublisher.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,37 @@
+package org.apache.streams.persistence.model.mongo;
+
+import com.mongodb.BasicDBObject;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
+public class MongoPublisher extends BasicDBObject implements ActivityStreamsPublisher {
+
+    @Override
+    public String getInRoute() {
+        return (String)get("inRoute");
+    }
+
+    @Override
+    public String getId() {
+        return (String)get("id");
+    }
+
+    @Override
+    public String getSrc() {
+        return (String)get("src");
+    }
+
+    @Override
+    public void setInRoute(String inRoute) {
+        put("inRoute", inRoute);
+    }
+
+    @Override
+    public void setId(String id) {
+        put("id", id);
+    }
+
+    @Override
+    public void setSrc(String src) {
+        put("src", src);
+    }
+}

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/mongo/MongoSubscription.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,54 @@
+package org.apache.streams.persistence.model.mongo;
+
+
+import com.mongodb.BasicDBObject;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongoSubscription extends BasicDBObject implements ActivityStreamsSubscription {
+
+    public MongoSubscription(){
+        put("filters",new HashSet<String>());
+    }
+
+    @Override
+    public void setFilters(Set<String> filters) {
+        put("filters",filters);
+    }
+
+    @Override
+    public Set<String> getFilters() {
+        Collection filters = (Collection<String>)get("filters");
+
+        if(filters instanceof Set){
+            return (Set<String>)filters;
+        }else{
+            filters = new HashSet<String>(filters);
+            put("filters",filters);
+            return (Set<String>)filters;
+        }
+    }
+
+    @Override
+    public String getInRoute() {
+        return (String)get("inRoute");
+    }
+
+    @Override
+    public void setInRoute(String inRoute) {
+        put("inRoute",inRoute);
+    }
+
+    @Override
+    public String getUsername() {
+        return (String)get("username");
+    }
+
+    @Override
+    public void setUsername(String username) {
+        put("username",username);
+    }
+}

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/ActivityStreamsRepository.java Fri Nov 15 19:26:40 2013
@@ -9,7 +9,7 @@ import java.util.Set;
 
 public interface ActivityStreamsRepository {
     void save(ActivityStreamsEntry entry);
-    List<ActivityStreamsEntry> getActivitiesForProviders(Set<String> providers, Date lastUpdated);
+    List<ActivityStreamsEntry> getActivitiesForFilters(Set<String> filters, Date lastUpdated);
     void dropTable(String table);
 
 }

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraActivityStreamsRepository.java Fri Nov 15 19:26:40 2013
@@ -19,7 +19,6 @@ import org.springframework.stereotype.Co
 
 import java.util.*;
 
-@Component
 public class CassandraActivityStreamsRepository implements ActivityStreamsRepository {
 
     private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
@@ -90,10 +89,10 @@ public class CassandraActivityStreamsRep
     }
 
     @Override
-    public List<ActivityStreamsEntry> getActivitiesForProviders(Set<String> providers, Date lastUpdated) {
+    public List<ActivityStreamsEntry> getActivitiesForFilters(Set<String> filters, Date lastUpdated) {
         List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>();
 
-        for (String providerName : providers) {
+        for (String providerName : filters) {
             Select query = QueryBuilder.select().from(configuration.getActivitystreamsColumnFamilyName())
                     .where(QueryBuilder.eq("provider_displayname", providerName))
                     .and(QueryBuilder.gt("published", lastUpdated))

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraKeyspace.java Fri Nov 15 19:26:40 2013
@@ -3,15 +3,9 @@ package org.apache.streams.persistence.r
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
 import org.apache.streams.persistence.configuration.CassandraConfiguration;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 
-import java.net.InetAddress;
-
-@Component
 public class CassandraKeyspace {
     private CassandraConfiguration configuration;
     private Cluster cluster;

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java Fri Nov 15 19:26:40 2013
@@ -12,7 +12,6 @@ import org.apache.streams.persistence.re
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-@Component
 public class CassandraPublisherRepository implements PublisherRepository {
 
     private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java Fri Nov 15 19:26:40 2013
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-@Component
 public class CassandraSubscriptionRepository implements SubscriptionRepository {
     private Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
 

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepository.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,62 @@
+package org.apache.streams.persistence.repository.mongo;
+
+import com.mongodb.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsEntry;
+import org.apache.streams.persistence.model.mongo.MongoActivityStreamsEntry;
+import org.apache.streams.persistence.repository.ActivityStreamsRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+@Component
+public class MongoActivityStreamsRepository implements ActivityStreamsRepository {
+    private static final Log log = LogFactory.getLog(MongoActivityStreamsRepository.class);
+
+    private MongoDatabase database;
+    private MongoConfiguration configuration;
+    private DBCollection activityStreamsCollection;
+
+    @Autowired
+    public MongoActivityStreamsRepository(MongoDatabase database, MongoConfiguration configuration) {
+        this.database = database;
+        this.activityStreamsCollection = database.getDb().getCollection(configuration.getActivitystreamsCollectionName());
+        activityStreamsCollection.setObjectClass(MongoActivityStreamsEntry.class);
+    }
+
+
+    @Override
+    public void save(ActivityStreamsEntry entry) {
+        if (entry instanceof DBObject) {
+            activityStreamsCollection.save((DBObject) entry);
+        }
+    }
+
+    @Override
+    public List<ActivityStreamsEntry> getActivitiesForFilters(Set<String> filters, Date lastUpdated) {
+
+
+        DBObject query = QueryBuilder.start("published").greaterThan(lastUpdated).and(QueryBuilder.start().or(
+                QueryBuilder.start("provider.displayName").in(new ArrayList<String>(filters)).get(),
+                QueryBuilder.start("actor.displayName").in(new ArrayList<String>(filters)).get()
+        ).get()).get();
+
+        DBCursor cursor = activityStreamsCollection.find(query);
+
+        List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>();
+
+        while (cursor.hasNext()) {
+            ActivityStreamsEntry entry = (MongoActivityStreamsEntry) cursor.next();
+            results.add(entry);
+        }
+
+        return results;
+    }
+
+    @Override
+    public void dropTable(String table) {
+    }
+}

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoDatabase.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,35 @@
+package org.apache.streams.persistence.repository.mongo;
+
+import com.mongodb.DB;
+import com.mongodb.MongoClient;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.net.UnknownHostException;
+
+@Component
+public class MongoDatabase {
+    private Log log = LogFactory.getLog(MongoDatabase.class);
+    private DB db;
+
+    @Autowired
+    public MongoDatabase(MongoConfiguration configuration) {
+        try {
+            MongoClient mongoClient = new MongoClient();
+            db = mongoClient.getDB(configuration.getDbName());
+        } catch (UnknownHostException e) {
+            log.error("There was an error connecting to the database", e);
+        }
+    }
+
+    public DB getDb() {
+        return db;
+    }
+
+    public void setDb(DB db) {
+        this.db = db;
+    }
+}

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepository.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,45 @@
+package org.apache.streams.persistence.repository.mongo;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.mongo.MongoPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MongoPublisherRepository implements PublisherRepository {
+    private static final Log log = LogFactory.getLog(MongoPublisherRepository.class);
+
+    private DBCollection publisherCollection;
+
+    @Autowired
+    public MongoPublisherRepository(MongoDatabase database, MongoConfiguration configuration) {
+        this.publisherCollection = database.getDb().getCollection(configuration.getPublisherCollectionName());
+        publisherCollection.setObjectClass(MongoPublisher.class);
+    }
+
+    @Override
+    public ActivityStreamsPublisher getPublisherByInRoute(String inRoute) {
+        DBObject query = QueryBuilder.start("inRoute").is(inRoute).get();
+        return (ActivityStreamsPublisher) publisherCollection.findOne(query);
+    }
+
+    @Override
+    public ActivityStreamsPublisher getPublisherBySrc(String src) {
+        DBObject query = QueryBuilder.start("src").is(src).get();
+        return (ActivityStreamsPublisher) publisherCollection.findOne(query);
+    }
+
+    @Override
+    public void save(ActivityStreamsPublisher publisher) {
+        if (publisher instanceof DBObject) {
+            publisherCollection.save((DBObject)publisher);
+        }
+    }
+}

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/mongo/MongoSubscriptionRepository.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,60 @@
+package org.apache.streams.persistence.repository.mongo;
+
+import com.mongodb.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsSubscription;
+import org.apache.streams.persistence.model.mongo.MongoSubscription;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Set;
+
+@Component
+public class MongoSubscriptionRepository implements SubscriptionRepository{
+    private static final Log log = LogFactory.getLog(MongoActivityStreamsRepository.class);
+
+    private DBCollection subscriptionCollection;
+
+    @Autowired
+    public MongoSubscriptionRepository(MongoDatabase database, MongoConfiguration configuration) {
+        this.subscriptionCollection = database.getDb().getCollection(configuration.getSubscriptionCollectionName());
+        subscriptionCollection.setObjectClass(MongoSubscription.class);
+    }
+
+    @Override
+    public ActivityStreamsSubscription getSubscriptionByInRoute(String inRoute) {
+        DBObject query = QueryBuilder.start("inRoute").is(inRoute).get();
+        return (ActivityStreamsSubscription)subscriptionCollection.findOne(query);
+    }
+
+    @Override
+    public ActivityStreamsSubscription getSubscriptionByUsername(String username) {
+        DBObject query = QueryBuilder.start("username").is(username).get();
+        return (ActivityStreamsSubscription)subscriptionCollection.findOne(query);
+    }
+
+    @Override
+    public List<ActivityStreamsSubscription> getAllSubscriptions() {
+        return (List<ActivityStreamsSubscription>)(List<?>)subscriptionCollection.find().toArray();
+    }
+
+    @Override
+    public void save(ActivityStreamsSubscription subscription) {
+        if (subscription instanceof DBObject) {
+            subscriptionCollection.save((DBObject)subscription);
+        }
+    }
+
+    @Override
+    public void updateFilters(String subscriberId, Set<String> add, Set<String> remove) {
+        DBObject query = QueryBuilder.start("inRoute").is(subscriberId).get();
+        BasicDBObject dbAdd = new BasicDBObject().append("$addToSet",new BasicDBObject().append("filters", new BasicDBObject().append("$each", add)));
+        BasicDBObject dbRemove = new BasicDBObject().append("$pullAll",new BasicDBObject().append("filters", remove));
+        subscriptionCollection.update(query, dbAdd);
+        subscriptionCollection.update(query, dbRemove);
+    }
+}

Added: incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/resources/mongo.properties Fri Nov 15 19:26:40 2013
@@ -0,0 +1,4 @@
+mongo.dbName=mongodatabase1
+mongo.activityStreamsCollectionName=mongoacitivytstreams1
+mongo.subscriptionCollectionName=mongosubscription1
+mongo.publisherCollectionName=mongopublisher1
\ No newline at end of file

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/resources/streams_persistence_applicationContext.xml Fri Nov 15 19:26:40 2013
@@ -9,4 +9,8 @@
     <context:annotation-config />
     <context:component-scan base-package="org.apache.streams.persistence"/>
 
+    <!--<bean id="activityStreamsRepository" class="org.apache.streams.persistence.repository.mongo.MongoActivityStreamsRepository"/>-->
+    <!--<bean id="publisherRepository" class="org.apache.streams.persistence.repository.mongo.MongoPublisherRepository"/>-->
+    <!--<bean id="subscriberRepository" class="org.apache.streams.persistence.repository.mongo.MongoSubscriptionRepository"/>-->
+    <!--<bean id="mongoDatabase" class="org.apache.streams.persistence.repository.mongo.MongoDatabase"/>-->
 </beans>
\ No newline at end of file

Modified: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java?rev=1542378&r1=1542377&r2=1542378&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraActiivtyStreamsRepositoryTest.java Fri Nov 15 19:26:40 2013
@@ -35,13 +35,13 @@ public class CassandraActiivtyStreamsRep
     @Ignore
     @Test
     public void testNullTags(){
-       repository.getActivitiesForProviders(null,new Date(0));
+       repository.getActivitiesForFilters(null,new Date(0));
     }
 
     @Ignore
     @Test
     public void getActivitiesForTagsTest(){
-        List<ActivityStreamsEntry> activities = repository.getActivitiesForProviders(new HashSet<String>(Arrays.asList("provider_displayname")),new Date(0));
+        List<ActivityStreamsEntry> activities = repository.getActivitiesForFilters(new HashSet<String>(Arrays.asList("provider_displayname")),new Date(0));
     }
 
     @Ignore

Added: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoActivityStreamsRepositoryTest.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,76 @@
+package org.apache.streams.persistence.repository.mongo;
+
+
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsEntry;
+import org.apache.streams.persistence.model.ActivityStreamsObject;
+import org.apache.streams.persistence.model.mongo.MongoActivityStreamsEntry;
+import org.apache.streams.persistence.model.mongo.MongoActivityStreamsObject;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+
+public class MongoActivityStreamsRepositoryTest {
+    private MongoActivityStreamsRepository repository;
+    private MongoConfiguration configuration;
+    private MongoDatabase database;
+
+    @Before
+    public void setup() {
+        configuration = new MongoConfiguration();
+        configuration.setDbName("testdb1");
+        configuration.setActivitystreamsCollectionName("activitystreamstest2");
+        database = new MongoDatabase(configuration);
+        repository = new MongoActivityStreamsRepository(database, configuration);
+    }
+
+    @Ignore
+    @Test
+    public void saveActivity(){
+        ActivityStreamsEntry entry = new MongoActivityStreamsEntry();
+        ActivityStreamsObject actor = new MongoActivityStreamsObject();
+        ActivityStreamsObject target = new MongoActivityStreamsObject();
+        ActivityStreamsObject object = new MongoActivityStreamsObject();
+        ActivityStreamsObject provider = new MongoActivityStreamsObject();
+
+        actor.setDisplayName("actor_displayname");
+        actor.setId("actor_id");
+        actor.setObjectType("actor_objecttype");
+        actor.setUrl("actor_url");
+
+        target.setDisplayName("target_displayname");
+        target.setId("target_id");
+        target.setUrl("target_url");
+
+        object.setDisplayName("object_displayname");
+        object.setObjectType("object_objecttype");
+        object.setUrl("object_url");
+        object.setId("object_id");
+
+        provider.setUrl("provider_url");
+        provider.setDisplayName("provider_displayname");
+
+        entry.setPublished(new Date());
+        entry.setVerb("verb");
+        entry.setId("newid");
+        entry.setTags("tags");
+        entry.setActor(actor);
+        entry.setTarget(target);
+        entry.setObject(object);
+        entry.setProvider(provider);
+
+        repository.save(entry);
+    }
+
+    @Ignore
+    @Test
+    public void getActivitiesForProvidersTest(){
+        List<ActivityStreamsEntry> activities = repository.getActivitiesForFilters(new HashSet<String>(Arrays.asList("provider_displayname")),new Date(0));
+    }
+
+}

Added: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java?rev=1542378&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/mongo/MongoPublisherRepositoryTest.java Fri Nov 15 19:26:40 2013
@@ -0,0 +1,54 @@
+package org.apache.streams.persistence.repository.mongo;
+
+
+import org.apache.streams.persistence.configuration.MongoConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
+import org.apache.streams.persistence.model.mongo.MongoPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
+import org.apache.streams.persistence.repository.SubscriptionRepository;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MongoPublisherRepositoryTest {
+
+    private PublisherRepository repository;
+    private MongoConfiguration configuration;
+    private MongoDatabase database;
+
+    @Before
+    public void setup() {
+        configuration = new MongoConfiguration();
+        configuration.setDbName("testdb1");
+        configuration.setPublisherCollectionName("publishertest2");
+        database = new MongoDatabase(configuration);
+        repository = new MongoPublisherRepository(database, configuration);
+    }
+
+    @Ignore
+    @Test
+    public void saveTest(){
+        ActivityStreamsPublisher publisher = new MongoPublisher();
+
+        publisher.setId("newId");
+        publisher.setSrc("http://www.google.comd");
+        publisher.setInRoute("inRoute");
+
+        repository.save(publisher);
+    }
+
+    @Ignore
+    @Test
+    public void getPublisherTest(){
+
+        ActivityStreamsPublisher publisher = repository.getPublisherByInRoute("inRoute");
+
+        assertEquals(publisher.getSrc(),("http://www.google.comd"));
+        assertEquals(publisher.getId(),("newId"));
+        assertEquals(publisher.getInRoute(),("inRoute"));
+    }
+
+}