You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/10/11 16:26:32 UTC

svn commit: r1531306 - in /incubator/streams/branches/webservice: ./ streams-components/src/main/java/org/apache/streams/components/service/ streams-components/src/main/java/org/apache/streams/components/service/impl/ streams-osgi-components/ streams-p...

Author: dsullivan
Date: Fri Oct 11 14:26:31 2013
New Revision: 1531306

URL: http://svn.apache.org/r1531306
Log:
adding publisher persistence

Added:
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/
    incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java
Removed:
    incubator/streams/branches/webservice/streams-osgi-components/
Modified:
    incubator/streams/branches/webservice/pom.xml
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java
    incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java
    incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
    incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java

Modified: incubator/streams/branches/webservice/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/pom.xml?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/pom.xml (original)
+++ incubator/streams/branches/webservice/pom.xml Fri Oct 11 14:26:31 2013
@@ -53,6 +53,7 @@
         <rave.version>0.22</rave.version>
         <datastax.version>1.0.3</datastax.version>
         <spring.version>3.0.5.RELEASE</spring.version>
+        <junit.version>4.9</junit.version>
     </properties>
 
     <packaging>pom</packaging>
@@ -69,4 +70,12 @@
 
     </build>
 
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+        </dependency>
+    </dependencies>
+
 </project>

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java Fri Oct 11 14:26:31 2013
@@ -3,5 +3,5 @@ package org.apache.streams.components.se
 import java.io.IOException;
 
 public interface StreamsActivityPublishingService {
-    String publish(String publisherID, String activityJSON) throws IOException;
+    String publish(String publisherID, String activityJSON) throws Exception;
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java Fri Oct 11 14:26:31 2013
@@ -1,5 +1,7 @@
 package org.apache.streams.components.service;
 
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
 import java.io.IOException;
 import java.util.Date;
 import java.util.List;
@@ -7,7 +9,7 @@ import java.util.List;
 
 public interface StreamsActivityRepositoryService {
 
-    void receiveActivity(String activityJSON) throws IOException;
+    void receiveActivity(ActivityStreamsPublisher publisher, String activityJSON) throws Exception;
 
     List<String> getActivitiesForFilters(List<String> filters, Date lastUpdated);
 }

Added: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java (added)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,8 @@
+package org.apache.streams.components.service;
+
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
+public interface StreamsPublisherRepositoryService {
+    void savePublisher(ActivityStreamsPublisher publisher);
+    ActivityStreamsPublisher getActivityStreamsPublisher(String inRoute);
+}

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java Fri Oct 11 14:26:31 2013
@@ -4,6 +4,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rave.model.ActivityStreamsEntry;
 import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
 import org.apache.streams.persistence.model.cassandra.CassandraActivityStreamsEntry;
 import org.apache.streams.persistence.repository.ActivityStreamsRepository;
 import org.codehaus.jackson.map.DeserializationConfig;
@@ -12,10 +13,7 @@ import org.springframework.beans.factory
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 
 @Component
 public class CassandraActivityService implements StreamsActivityRepositoryService {
@@ -33,9 +31,13 @@ public class CassandraActivityService im
     }
 
     @Override
-    public void receiveActivity(String activityJSON) throws IOException {
+    public void receiveActivity(ActivityStreamsPublisher publisher, String activityJSON) throws Exception {
         ActivityStreamsEntry streamsEntry = mapper.readValue(activityJSON, CassandraActivityStreamsEntry.class);
+        if(!publisher.getSrc().equals(streamsEntry.getProvider().getUrl())){
+            throw new Exception("The Publisher source: "+ publisher.getSrc() +" and Activity Provider source: " + streamsEntry.getProvider().getUrl() + " were not equal");
+        }
         streamsEntry.setPublished(new Date());
+        streamsEntry.setId(""+UUID.randomUUID());
         activityStreamsRepository.save(streamsEntry);
     }
 
@@ -44,7 +46,6 @@ public class CassandraActivityService im
         List<ActivityStreamsEntry> activityObjects = activityStreamsRepository.getActivitiesForFilters(filters, lastUpdated);
         Collections.sort(activityObjects, Collections.reverseOrder());
         //TODO: make the number of streams returned configurable
-        //TODO: what happens if this .subList(0,0)?
         return getJsonList(activityObjects.subList(0, Math.min(activityObjects.size(), 10)));
     }
 

Added: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java (added)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,30 @@
+package org.apache.streams.components.service.impl;
+
+import org.apache.streams.components.service.StreamsPublisherRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+
+@Component
+public class CassandraPublisherService implements StreamsPublisherRepositoryService {
+    private PublisherRepository repository;
+
+    @Autowired
+    public CassandraPublisherService(PublisherRepository repository) {
+        this.repository = repository;
+    }
+
+    @Override
+    public void savePublisher(ActivityStreamsPublisher publisher) {
+        publisher.setId("" + UUID.randomUUID());
+        repository.save(publisher);
+    }
+
+    @Override
+    public ActivityStreamsPublisher getActivityStreamsPublisher(String inRoute) {
+        return repository.getPublisher(inRoute);
+    }
+}

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java Fri Oct 11 14:26:31 2013
@@ -8,6 +8,7 @@ import org.springframework.stereotype.Co
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 
 @Component
 public class CassandraSubscriptionService implements StreamsSubscriptionRepositoryService {
@@ -15,15 +16,16 @@ public class CassandraSubscriptionServic
     private SubscriptionRepository repository;
 
     @Autowired
-    public CassandraSubscriptionService(SubscriptionRepository repository){
+    public CassandraSubscriptionService(SubscriptionRepository repository) {
         this.repository = repository;
     }
 
-    public List<String> getFilters(String authToken){
-          return Arrays.asList(repository.getFilters(authToken).split(" "));
+    public List<String> getFilters(String authToken) {
+        return Arrays.asList(repository.getFilters(authToken).split(" "));
     }
 
-    public void saveFilters(ActivityStreamsSubscription subscription){
-          //repository.save(subscription);
+    public void saveFilters(ActivityStreamsSubscription subscription) {
+        subscription.setId("" + UUID.randomUUID());
+        repository.save(subscription);
     }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java Fri Oct 11 14:26:31 2013
@@ -4,6 +4,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.service.StreamsActivityPublishingService;
 import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.components.service.StreamsPublisherRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -14,10 +16,12 @@ public class StreamsActivityPublishingSe
     private Log log = LogFactory.getLog(StreamsActivityPublishingServiceImpl.class);
 
     private StreamsActivityRepositoryService activityService;
+    private StreamsPublisherRepositoryService publisherService;
 
     @Autowired
-    public StreamsActivityPublishingServiceImpl(StreamsActivityRepositoryService activityService) {
+    public StreamsActivityPublishingServiceImpl(StreamsActivityRepositoryService activityService, StreamsPublisherRepositoryService publisherService) {
         this.activityService = activityService;
+        this.publisherService = publisherService;
     }
 
     /**
@@ -26,8 +30,9 @@ public class StreamsActivityPublishingSe
      * @param activityJSON the activityJSON being published
      * @return a success message if no errors were thrown
      * */
-    public String publish(String publisherID, String activityJSON) throws IOException {
-        activityService.receiveActivity(activityJSON);
+    public String publish(String publisherID, String activityJSON) throws Exception {
+        ActivityStreamsPublisher publisher = publisherService.getActivityStreamsPublisher(publisherID);
+        activityService.receiveActivity(publisher,activityJSON);
         return "The activity was successfully published!";
     }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java Fri Oct 11 14:26:31 2013
@@ -4,8 +4,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.components.service.StreamsPublisherRegistrationService;
 import org.apache.streams.components.configuration.StreamsConfiguration;
-import org.apache.streams.components.activityconsumer.ActivityConsumer;
-import org.apache.streams.components.activityconsumer.ActivityConsumerWarehouse;
+import org.apache.streams.components.service.StreamsPublisherRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -17,12 +18,12 @@ import java.util.UUID;
 public class StreamsPublisherRegistrationServiceImpl implements StreamsPublisherRegistrationService {
     private Log log = LogFactory.getLog(StreamsPublisherRegistrationServiceImpl.class);
 
-    private ActivityConsumerWarehouse activityConsumerWarehouse;
+    private StreamsPublisherRepositoryService publisherRepositoryService;
     private StreamsConfiguration configuration;
 
     @Autowired
-    public StreamsPublisherRegistrationServiceImpl(ActivityConsumerWarehouse activityConsumerWarehouse, StreamsConfiguration configuration) {
-        this.activityConsumerWarehouse = activityConsumerWarehouse;
+    public StreamsPublisherRegistrationServiceImpl(StreamsPublisherRepositoryService publisherRepositoryService, StreamsConfiguration configuration) {
+        this.publisherRepositoryService = publisherRepositoryService;
         this.configuration = configuration;
     }
 
@@ -37,16 +38,16 @@ public class StreamsPublisherRegistratio
         mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 
         // read from file, convert it to user class
-        ActivityConsumer activityConsumer = mapper.readValue(publisherJSON, ActivityConsumer.class);
+        ActivityStreamsPublisher publisher = mapper.readValue(publisherJSON, CassandraPublisher.class);
 
-        if (activityConsumer.getSrc() == null) {
+        if (publisher.getSrc() == null) {
             log.info("configuration src is null");
             throw new Exception("configuration src is null");
         }
 
-        activityConsumer.setAuthenticated(true);
-        activityConsumer.setInRoute("" + UUID.randomUUID());
-        activityConsumerWarehouse.register(activityConsumer);
-        return configuration.getBaseUrlPath() + "postActivity/" + activityConsumer.getInRoute();
+        publisher.setInRoute("" + UUID.randomUUID());
+        publisherRepositoryService.savePublisher(publisher);
+
+        return configuration.getBaseUrlPath() + "postActivity/" + publisher.getInRoute();
     }
 }

Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java Fri Oct 11 14:26:31 2013
@@ -51,7 +51,7 @@ public class StreamsSubscriberRegistrati
 
         ActivityStreamsSubscription subscription = mapper.readValue(subscriberJSON, ActivityStreamsSubscription.class);
         if (subscription.getFilters() == null) {
-            subscription.setFilters(subscriptionService.getFilters(subscription.getAuthToken()));
+            subscription.setFilters(subscriptionService.getFilters(subscription.getId()));
         } else {
             subscriptionService.saveFilters(subscription);
         }

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,10 @@
+package org.apache.streams.persistence.model;
+
+public interface ActivityStreamsPublisher {
+    String getInRoute();
+    String getId();
+    String getSrc();
+    void setInRoute(String inRoute);
+    void setId(String id);
+    void setSrc(String src);
+}

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java Fri Oct 11 14:26:31 2013
@@ -10,7 +10,7 @@ public interface ActivityStreamsSubscrip
     public void setFilters(List<String> filters);
     public List<String> getFilters();
 
-    public String getAuthToken();
-    public void setAuthToken(String token);
+    public String getId();
+    public void setId(String id);
 
 }

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,33 @@
+package org.apache.streams.persistence.model.cassandra;
+
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
+public class CassandraPublisher implements ActivityStreamsPublisher {
+    private String id;
+    private String inRoute;
+    private String src;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getInRoute() {
+        return inRoute;
+    }
+
+    public void setInRoute(String inRoute) {
+        this.inRoute = inRoute;
+    }
+
+    public String getSrc() {
+        return src;
+    }
+
+    public void setSrc(String src) {
+        this.src = src;
+    }
+}

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java Fri Oct 11 14:26:31 2013
@@ -11,7 +11,7 @@ public class CassandraSubscription imple
     @JsonDeserialize(as=ArrayList.class)
     private List<String> filters;
 
-    private String authToken;
+    private String id;
 
     public void setFilters(List<String> filters) {
         //TODO: it's possible that this could be null
@@ -25,12 +25,12 @@ public class CassandraSubscription imple
     }
 
     @Override
-    public String getAuthToken() {
-        return authToken;
+    public String getId() {
+        return id;
     }
 
     @Override
-    public void setAuthToken(String auth_token) {
-        this.authToken = auth_token;
+    public void setId(String id) {
+        this.id = id;
     }
 }

Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,8 @@
+package org.apache.streams.persistence.repository;
+
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
+public interface PublisherRepository {
+    ActivityStreamsPublisher getPublisher(String inRoute);
+    void save(ActivityStreamsPublisher publisher);
+}

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java Fri Oct 11 14:26:31 2013
@@ -1,14 +1,19 @@
 package org.apache.streams.persistence.repository.cassandra;
 
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.persistence.configuration.CassandraConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
-public class CassandraPublisherRepository {
+public class CassandraPublisherRepository implements PublisherRepository{
 
     private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
 
@@ -20,13 +25,47 @@ public class CassandraPublisherRepositor
         this.configuration = configuration;
         this.keyspace = keyspace;
 
-//        try {
-//            keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
-//                    "id text, " +
-//                    "published timestamp, " +
-//
-//                    "PRIMARY KEY (id, tags, published));");
-//        } catch (AlreadyExistsException ignored) {
-//        }
+        try {
+            keyspace.getSession().execute("CREATE TABLE " + configuration.getPublisherColumnFamilyName() + " (" +
+                    "id text, " +
+                    "inroute text, " +
+                    "src text, " +
+
+                    "PRIMARY KEY (id, inroute));");
+        } catch (AlreadyExistsException ignored) {
+        }
+    }
+
+    @Override
+    public void save(ActivityStreamsPublisher publisher){
+        String cql = "INSERT INTO " + configuration.getPublisherColumnFamilyName()  + " (" +
+                "id, inroute, src) " +
+                "VALUES ('" +
+                publisher.getId() + "','" +
+                publisher.getInRoute() + "','" +
+                publisher.getSrc() +
+
+                "')";
+        keyspace.getSession().execute(cql);
+    }
+
+    @Override
+    public ActivityStreamsPublisher getPublisher(String inRoute){
+        String cql = "SELECT * FROM " + configuration.getPublisherColumnFamilyName()  + " WHERE inroute = '" + inRoute+"' ALLOW FILTERING;";
+
+        ResultSet set = keyspace.getSession().execute(cql);
+        Row row = set.one();
+
+        ActivityStreamsPublisher publisher = new CassandraPublisher();
+        publisher.setId(row.getString("id"));
+        publisher.setSrc(row.getString("src"));
+        publisher.setInRoute(row.getString("inroute"));
+
+        return publisher;
+    }
+
+    public void dropTable() {
+        String cql = "TRUNCATE " + configuration.getPublisherColumnFamilyName();
+        keyspace.getSession().execute(cql);
     }
 }

Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java Fri Oct 11 14:26:31 2013
@@ -46,7 +46,7 @@ public class CassandraSubscriptionReposi
         String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName()  + " (" +
                 "id, filters) " +
                 "VALUES ('" +
-                subscription.getAuthToken() + "','" +
+                subscription.getId() + "','" +
                 StringUtils.join(subscription.getFilters(), " ") +
 
                 "')";

Added: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,54 @@
+package org.apache.streams.persistence.repository.cassandra;
+
+import org.apache.streams.persistence.configuration.CassandraConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CassandraPublisherRepositoryTest {
+    private CassandraPublisherRepository repository;
+
+    @Before
+    public void setup(){
+        CassandraConfiguration configuration = new CassandraConfiguration();
+        configuration.setCassandraPort("127.0.0.1");
+        configuration.setPublisherColumnFamilyName("publishersA");
+        configuration.setKeyspaceName("keyspacetest");
+        CassandraKeyspace keyspace = new CassandraKeyspace(configuration);
+
+        repository = new CassandraPublisherRepository(keyspace,configuration);
+    }
+
+    @Ignore
+    @Test
+    public void saveTest(){
+        ActivityStreamsPublisher publisher = new CassandraPublisher();
+
+        publisher.setId("newId");
+        publisher.setSrc("http://www.google.comd");
+        publisher.setInRoute("inRoute");
+
+        repository.save(publisher);
+    }
+
+    @Ignore
+    @Test
+    public void getPublisherTest(){
+
+        ActivityStreamsPublisher publisher = repository.getPublisher("inRoute");
+
+        assertEquals(publisher.getSrc(),("http://www.google.com"));
+        assertEquals(publisher.getId(),("newId"));
+        assertEquals(publisher.getInRoute(),("inRoute"));
+    }
+
+    @Ignore
+    @Test
+    public void dropTableTest(){
+        repository.dropTable();
+    }
+}

Modified: incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java (original)
+++ incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java Fri Oct 11 14:26:31 2013
@@ -50,7 +50,7 @@ public class StreamsWebController {
             return new ResponseEntity<String>(publisherRegistrationService.register(payload), HttpStatus.OK);
         }catch(Exception e){
             log.error(e);
-            return new ResponseEntity<String>("Verify the POST contains a valid JSON object", HttpStatus.BAD_REQUEST);
+            return new ResponseEntity<String>(e.getMessage(), HttpStatus.BAD_REQUEST);
         }
     }
 
@@ -67,7 +67,7 @@ public class StreamsWebController {
             return new ResponseEntity<String>(subscriberRegistrationService.register(payload), HttpStatus.OK);
         }catch(Exception e){
             log.error(e);
-            return new ResponseEntity<String>("Verify the POST contains a valid JSON object", HttpStatus.BAD_REQUEST);
+            return new ResponseEntity<String>(e.getMessage(), HttpStatus.BAD_REQUEST);
         }
     }
 
@@ -85,7 +85,7 @@ public class StreamsWebController {
             return new ResponseEntity<String>(activityPublishingService.publish(publisherID, payload), HttpStatus.OK);
         }catch(Exception e){
             log.error(e);
-            return new ResponseEntity<String>("Verify the POST contains a valid JSON object", HttpStatus.BAD_REQUEST);
+            return new ResponseEntity<String>(e.getMessage(), HttpStatus.BAD_REQUEST);
         }
     }