You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/10/11 16:26:32 UTC
svn commit: r1531306 - in /incubator/streams/branches/webservice: ./
streams-components/src/main/java/org/apache/streams/components/service/
streams-components/src/main/java/org/apache/streams/components/service/impl/
streams-osgi-components/ streams-p...
Author: dsullivan
Date: Fri Oct 11 14:26:31 2013
New Revision: 1531306
URL: http://svn.apache.org/r1531306
Log:
adding publisher persistence
Added:
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/
incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java
Removed:
incubator/streams/branches/webservice/streams-osgi-components/
Modified:
incubator/streams/branches/webservice/pom.xml
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java
incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java
incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java
Modified: incubator/streams/branches/webservice/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/pom.xml?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/pom.xml (original)
+++ incubator/streams/branches/webservice/pom.xml Fri Oct 11 14:26:31 2013
@@ -53,6 +53,7 @@
<rave.version>0.22</rave.version>
<datastax.version>1.0.3</datastax.version>
<spring.version>3.0.5.RELEASE</spring.version>
+ <junit.version>4.9</junit.version>
</properties>
<packaging>pom</packaging>
@@ -69,4 +70,12 @@
</build>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ </dependency>
+ </dependencies>
+
</project>
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityPublishingService.java Fri Oct 11 14:26:31 2013
@@ -3,5 +3,5 @@ package org.apache.streams.components.se
import java.io.IOException;
public interface StreamsActivityPublishingService {
- String publish(String publisherID, String activityJSON) throws IOException;
+ String publish(String publisherID, String activityJSON) throws Exception;
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsActivityRepositoryService.java Fri Oct 11 14:26:31 2013
@@ -1,5 +1,7 @@
package org.apache.streams.components.service;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
import java.io.IOException;
import java.util.Date;
import java.util.List;
@@ -7,7 +9,7 @@ import java.util.List;
public interface StreamsActivityRepositoryService {
- void receiveActivity(String activityJSON) throws IOException;
+ void receiveActivity(ActivityStreamsPublisher publisher, String activityJSON) throws Exception;
List<String> getActivitiesForFilters(List<String> filters, Date lastUpdated);
}
Added: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java (added)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/StreamsPublisherRepositoryService.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,8 @@
+package org.apache.streams.components.service;
+
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
+public interface StreamsPublisherRepositoryService {
+ void savePublisher(ActivityStreamsPublisher publisher);
+ ActivityStreamsPublisher getActivityStreamsPublisher(String inRoute);
+}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraActivityService.java Fri Oct 11 14:26:31 2013
@@ -4,6 +4,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rave.model.ActivityStreamsEntry;
import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
import org.apache.streams.persistence.model.cassandra.CassandraActivityStreamsEntry;
import org.apache.streams.persistence.repository.ActivityStreamsRepository;
import org.codehaus.jackson.map.DeserializationConfig;
@@ -12,10 +13,7 @@ import org.springframework.beans.factory
import org.springframework.stereotype.Component;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
@Component
public class CassandraActivityService implements StreamsActivityRepositoryService {
@@ -33,9 +31,13 @@ public class CassandraActivityService im
}
@Override
- public void receiveActivity(String activityJSON) throws IOException {
+ public void receiveActivity(ActivityStreamsPublisher publisher, String activityJSON) throws Exception {
ActivityStreamsEntry streamsEntry = mapper.readValue(activityJSON, CassandraActivityStreamsEntry.class);
+ if(!publisher.getSrc().equals(streamsEntry.getProvider().getUrl())){
+ throw new Exception("The Publisher source: "+ publisher.getSrc() +" and Activity Provider source: " + streamsEntry.getProvider().getUrl() + " were not equal");
+ }
streamsEntry.setPublished(new Date());
+ streamsEntry.setId(""+UUID.randomUUID());
activityStreamsRepository.save(streamsEntry);
}
@@ -44,7 +46,6 @@ public class CassandraActivityService im
List<ActivityStreamsEntry> activityObjects = activityStreamsRepository.getActivitiesForFilters(filters, lastUpdated);
Collections.sort(activityObjects, Collections.reverseOrder());
//TODO: make the number of streams returned configurable
- //TODO: what happens if this .subList(0,0)?
return getJsonList(activityObjects.subList(0, Math.min(activityObjects.size(), 10)));
}
Added: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java (added)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraPublisherService.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,30 @@
+package org.apache.streams.components.service.impl;
+
+import org.apache.streams.components.service.StreamsPublisherRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+
+@Component
+public class CassandraPublisherService implements StreamsPublisherRepositoryService {
+ private PublisherRepository repository;
+
+ @Autowired
+ public CassandraPublisherService(PublisherRepository repository) {
+ this.repository = repository;
+ }
+
+ @Override
+ public void savePublisher(ActivityStreamsPublisher publisher) {
+ publisher.setId("" + UUID.randomUUID());
+ repository.save(publisher);
+ }
+
+ @Override
+ public ActivityStreamsPublisher getActivityStreamsPublisher(String inRoute) {
+ return repository.getPublisher(inRoute);
+ }
+}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/CassandraSubscriptionService.java Fri Oct 11 14:26:31 2013
@@ -8,6 +8,7 @@ import org.springframework.stereotype.Co
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
@Component
public class CassandraSubscriptionService implements StreamsSubscriptionRepositoryService {
@@ -15,15 +16,16 @@ public class CassandraSubscriptionServic
private SubscriptionRepository repository;
@Autowired
- public CassandraSubscriptionService(SubscriptionRepository repository){
+ public CassandraSubscriptionService(SubscriptionRepository repository) {
this.repository = repository;
}
- public List<String> getFilters(String authToken){
- return Arrays.asList(repository.getFilters(authToken).split(" "));
+ public List<String> getFilters(String authToken) {
+ return Arrays.asList(repository.getFilters(authToken).split(" "));
}
- public void saveFilters(ActivityStreamsSubscription subscription){
- //repository.save(subscription);
+ public void saveFilters(ActivityStreamsSubscription subscription) {
+ subscription.setId("" + UUID.randomUUID());
+ repository.save(subscription);
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsActivityPublishingServiceImpl.java Fri Oct 11 14:26:31 2013
@@ -4,6 +4,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.service.StreamsActivityPublishingService;
import org.apache.streams.components.service.StreamsActivityRepositoryService;
+import org.apache.streams.components.service.StreamsPublisherRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -14,10 +16,12 @@ public class StreamsActivityPublishingSe
private Log log = LogFactory.getLog(StreamsActivityPublishingServiceImpl.class);
private StreamsActivityRepositoryService activityService;
+ private StreamsPublisherRepositoryService publisherService;
@Autowired
- public StreamsActivityPublishingServiceImpl(StreamsActivityRepositoryService activityService) {
+ public StreamsActivityPublishingServiceImpl(StreamsActivityRepositoryService activityService, StreamsPublisherRepositoryService publisherService) {
this.activityService = activityService;
+ this.publisherService = publisherService;
}
/**
@@ -26,8 +30,9 @@ public class StreamsActivityPublishingSe
* @param activityJSON the activityJSON being published
* @return a success message if no errors were thrown
* */
- public String publish(String publisherID, String activityJSON) throws IOException {
- activityService.receiveActivity(activityJSON);
+ public String publish(String publisherID, String activityJSON) throws Exception {
+ ActivityStreamsPublisher publisher = publisherService.getActivityStreamsPublisher(publisherID);
+ activityService.receiveActivity(publisher,activityJSON);
return "The activity was successfully published!";
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsPublisherRegistrationServiceImpl.java Fri Oct 11 14:26:31 2013
@@ -4,8 +4,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.components.service.StreamsPublisherRegistrationService;
import org.apache.streams.components.configuration.StreamsConfiguration;
-import org.apache.streams.components.activityconsumer.ActivityConsumer;
-import org.apache.streams.components.activityconsumer.ActivityConsumerWarehouse;
+import org.apache.streams.components.service.StreamsPublisherRepositoryService;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
@@ -17,12 +18,12 @@ import java.util.UUID;
public class StreamsPublisherRegistrationServiceImpl implements StreamsPublisherRegistrationService {
private Log log = LogFactory.getLog(StreamsPublisherRegistrationServiceImpl.class);
- private ActivityConsumerWarehouse activityConsumerWarehouse;
+ private StreamsPublisherRepositoryService publisherRepositoryService;
private StreamsConfiguration configuration;
@Autowired
- public StreamsPublisherRegistrationServiceImpl(ActivityConsumerWarehouse activityConsumerWarehouse, StreamsConfiguration configuration) {
- this.activityConsumerWarehouse = activityConsumerWarehouse;
+ public StreamsPublisherRegistrationServiceImpl(StreamsPublisherRepositoryService publisherRepositoryService, StreamsConfiguration configuration) {
+ this.publisherRepositoryService = publisherRepositoryService;
this.configuration = configuration;
}
@@ -37,16 +38,16 @@ public class StreamsPublisherRegistratio
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// read from file, convert it to user class
- ActivityConsumer activityConsumer = mapper.readValue(publisherJSON, ActivityConsumer.class);
+ ActivityStreamsPublisher publisher = mapper.readValue(publisherJSON, CassandraPublisher.class);
- if (activityConsumer.getSrc() == null) {
+ if (publisher.getSrc() == null) {
log.info("configuration src is null");
throw new Exception("configuration src is null");
}
- activityConsumer.setAuthenticated(true);
- activityConsumer.setInRoute("" + UUID.randomUUID());
- activityConsumerWarehouse.register(activityConsumer);
- return configuration.getBaseUrlPath() + "postActivity/" + activityConsumer.getInRoute();
+ publisher.setInRoute("" + UUID.randomUUID());
+ publisherRepositoryService.savePublisher(publisher);
+
+ return configuration.getBaseUrlPath() + "postActivity/" + publisher.getInRoute();
}
}
Modified: incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java (original)
+++ incubator/streams/branches/webservice/streams-components/src/main/java/org/apache/streams/components/service/impl/StreamsSubscriberRegistrationServiceImpl.java Fri Oct 11 14:26:31 2013
@@ -51,7 +51,7 @@ public class StreamsSubscriberRegistrati
ActivityStreamsSubscription subscription = mapper.readValue(subscriberJSON, ActivityStreamsSubscription.class);
if (subscription.getFilters() == null) {
- subscription.setFilters(subscriptionService.getFilters(subscription.getAuthToken()));
+ subscription.setFilters(subscriptionService.getFilters(subscription.getId()));
} else {
subscriptionService.saveFilters(subscription);
}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsPublisher.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,10 @@
+package org.apache.streams.persistence.model;
+
+public interface ActivityStreamsPublisher {
+ String getInRoute();
+ String getId();
+ String getSrc();
+ void setInRoute(String inRoute);
+ void setId(String id);
+ void setSrc(String src);
+}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/ActivityStreamsSubscription.java Fri Oct 11 14:26:31 2013
@@ -10,7 +10,7 @@ public interface ActivityStreamsSubscrip
public void setFilters(List<String> filters);
public List<String> getFilters();
- public String getAuthToken();
- public void setAuthToken(String token);
+ public String getId();
+ public void setId(String id);
}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraPublisher.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,33 @@
+package org.apache.streams.persistence.model.cassandra;
+
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
+public class CassandraPublisher implements ActivityStreamsPublisher {
+ private String id;
+ private String inRoute;
+ private String src;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getInRoute() {
+ return inRoute;
+ }
+
+ public void setInRoute(String inRoute) {
+ this.inRoute = inRoute;
+ }
+
+ public String getSrc() {
+ return src;
+ }
+
+ public void setSrc(String src) {
+ this.src = src;
+ }
+}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/model/cassandra/CassandraSubscription.java Fri Oct 11 14:26:31 2013
@@ -11,7 +11,7 @@ public class CassandraSubscription imple
@JsonDeserialize(as=ArrayList.class)
private List<String> filters;
- private String authToken;
+ private String id;
public void setFilters(List<String> filters) {
//TODO: it's possible that this could be null
@@ -25,12 +25,12 @@ public class CassandraSubscription imple
}
@Override
- public String getAuthToken() {
- return authToken;
+ public String getId() {
+ return id;
}
@Override
- public void setAuthToken(String auth_token) {
- this.authToken = auth_token;
+ public void setId(String id) {
+ this.id = id;
}
}
Added: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/PublisherRepository.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,8 @@
+package org.apache.streams.persistence.repository;
+
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+
+public interface PublisherRepository {
+ ActivityStreamsPublisher getPublisher(String inRoute);
+ void save(ActivityStreamsPublisher publisher);
+}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepository.java Fri Oct 11 14:26:31 2013
@@ -1,14 +1,19 @@
package org.apache.streams.persistence.repository.cassandra;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.persistence.configuration.CassandraConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
+import org.apache.streams.persistence.repository.PublisherRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
-public class CassandraPublisherRepository {
+public class CassandraPublisherRepository implements PublisherRepository{
private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
@@ -20,13 +25,47 @@ public class CassandraPublisherRepositor
this.configuration = configuration;
this.keyspace = keyspace;
-// try {
-// keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
-// "id text, " +
-// "published timestamp, " +
-//
-// "PRIMARY KEY (id, tags, published));");
-// } catch (AlreadyExistsException ignored) {
-// }
+ try {
+ keyspace.getSession().execute("CREATE TABLE " + configuration.getPublisherColumnFamilyName() + " (" +
+ "id text, " +
+ "inroute text, " +
+ "src text, " +
+
+ "PRIMARY KEY (id, inroute));");
+ } catch (AlreadyExistsException ignored) {
+ }
+ }
+
+ @Override
+ public void save(ActivityStreamsPublisher publisher){
+ String cql = "INSERT INTO " + configuration.getPublisherColumnFamilyName() + " (" +
+ "id, inroute, src) " +
+ "VALUES ('" +
+ publisher.getId() + "','" +
+ publisher.getInRoute() + "','" +
+ publisher.getSrc() +
+
+ "')";
+ keyspace.getSession().execute(cql);
+ }
+
+ @Override
+ public ActivityStreamsPublisher getPublisher(String inRoute){
+ String cql = "SELECT * FROM " + configuration.getPublisherColumnFamilyName() + " WHERE inroute = '" + inRoute+"' ALLOW FILTERING;";
+
+ ResultSet set = keyspace.getSession().execute(cql);
+ Row row = set.one();
+
+ ActivityStreamsPublisher publisher = new CassandraPublisher();
+ publisher.setId(row.getString("id"));
+ publisher.setSrc(row.getString("src"));
+ publisher.setInRoute(row.getString("inroute"));
+
+ return publisher;
+ }
+
+ public void dropTable() {
+ String cql = "TRUNCATE " + configuration.getPublisherColumnFamilyName();
+ keyspace.getSession().execute(cql);
}
}
Modified: incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java (original)
+++ incubator/streams/branches/webservice/streams-persistence/src/main/java/org/apache/streams/persistence/repository/cassandra/CassandraSubscriptionRepository.java Fri Oct 11 14:26:31 2013
@@ -46,7 +46,7 @@ public class CassandraSubscriptionReposi
String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName() + " (" +
"id, filters) " +
"VALUES ('" +
- subscription.getAuthToken() + "','" +
+ subscription.getId() + "','" +
StringUtils.join(subscription.getFilters(), " ") +
"')";
Added: incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java?rev=1531306&view=auto
==============================================================================
--- incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java (added)
+++ incubator/streams/branches/webservice/streams-persistence/src/test/java/org/apache/streams/persistence/repository/cassandra/CassandraPublisherRepositoryTest.java Fri Oct 11 14:26:31 2013
@@ -0,0 +1,54 @@
+package org.apache.streams.persistence.repository.cassandra;
+
+import org.apache.streams.persistence.configuration.CassandraConfiguration;
+import org.apache.streams.persistence.model.ActivityStreamsPublisher;
+import org.apache.streams.persistence.model.cassandra.CassandraPublisher;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CassandraPublisherRepositoryTest {
+ private CassandraPublisherRepository repository;
+
+ @Before
+ public void setup(){
+ CassandraConfiguration configuration = new CassandraConfiguration();
+ configuration.setCassandraPort("127.0.0.1");
+ configuration.setPublisherColumnFamilyName("publishersA");
+ configuration.setKeyspaceName("keyspacetest");
+ CassandraKeyspace keyspace = new CassandraKeyspace(configuration);
+
+ repository = new CassandraPublisherRepository(keyspace,configuration);
+ }
+
+ @Ignore
+ @Test
+ public void saveTest(){
+ ActivityStreamsPublisher publisher = new CassandraPublisher();
+
+ publisher.setId("newId");
+ publisher.setSrc("http://www.google.comd");
+ publisher.setInRoute("inRoute");
+
+ repository.save(publisher);
+ }
+
+ @Ignore
+ @Test
+ public void getPublisherTest(){
+
+ ActivityStreamsPublisher publisher = repository.getPublisher("inRoute");
+
+ assertEquals(publisher.getSrc(),("http://www.google.com"));
+ assertEquals(publisher.getId(),("newId"));
+ assertEquals(publisher.getInRoute(),("inRoute"));
+ }
+
+ @Ignore
+ @Test
+ public void dropTableTest(){
+ repository.dropTable();
+ }
+}
Modified: incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java?rev=1531306&r1=1531305&r2=1531306&view=diff
==============================================================================
--- incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java (original)
+++ incubator/streams/branches/webservice/streams-web/src/main/java/org/apache/streams/mvc/controller/StreamsWebController.java Fri Oct 11 14:26:31 2013
@@ -50,7 +50,7 @@ public class StreamsWebController {
return new ResponseEntity<String>(publisherRegistrationService.register(payload), HttpStatus.OK);
}catch(Exception e){
log.error(e);
- return new ResponseEntity<String>("Verify the POST contains a valid JSON object", HttpStatus.BAD_REQUEST);
+ return new ResponseEntity<String>(e.getMessage(), HttpStatus.BAD_REQUEST);
}
}
@@ -67,7 +67,7 @@ public class StreamsWebController {
return new ResponseEntity<String>(subscriberRegistrationService.register(payload), HttpStatus.OK);
}catch(Exception e){
log.error(e);
- return new ResponseEntity<String>("Verify the POST contains a valid JSON object", HttpStatus.BAD_REQUEST);
+ return new ResponseEntity<String>(e.getMessage(), HttpStatus.BAD_REQUEST);
}
}
@@ -85,7 +85,7 @@ public class StreamsWebController {
return new ResponseEntity<String>(activityPublishingService.publish(publisherID, payload), HttpStatus.OK);
}catch(Exception e){
log.error(e);
- return new ResponseEntity<String>("Verify the POST contains a valid JSON object", HttpStatus.BAD_REQUEST);
+ return new ResponseEntity<String>(e.getMessage(), HttpStatus.BAD_REQUEST);
}
}