You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/08/08 21:53:28 UTC
svn commit: r1512000 - in /incubator/streams/branches/cassandra:
streams-cassandra/
streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/
streams-cassandra/src/test/ streams-cassandra/src/test/java/
streams-cassandra/src/test/ja...
Author: dsullivan
Date: Thu Aug 8 19:53:27 2013
New Revision: 1512000
URL: http://svn.apache.org/r1512000
Log:
adding methods to receive activities from the repository
Added:
incubator/streams/branches/cassandra/streams-cassandra/src/test/
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
incubator/streams/branches/cassandra/streams-eip-routes/src/test/
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java
Modified:
incubator/streams/branches/cassandra/streams-cassandra/pom.xml
incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
incubator/streams/branches/cassandra/streams-eip-routes/pom.xml
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml
incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java
incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java
Modified: incubator/streams/branches/cassandra/streams-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/pom.xml?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/pom.xml Thu Aug 8 19:53:27 2013
@@ -116,6 +116,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
Modified: incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java Thu Aug 8 19:53:27 2013
@@ -18,6 +18,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import java.util.List;
+
public class CassandraActivityStreamsRepository {
private final String KEYSPACE_NAME = "ActivityStreams";
@@ -118,4 +120,10 @@ public class CassandraActivityStreamsRep
}
}
+ public List<CassandraActivityStreamsEntry> getActivitiesForQuery(String query) {
+ //return entities that match the given cql query
+ LOG.info("executing the query: "+query);
+ return entityManager.find(query);
+ }
+
}
Added: incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java?rev=1512000&view=auto
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java (added)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java Thu Aug 8 19:53:27 2013
@@ -0,0 +1,27 @@
+package org.apache.streams.cassandra.repository.impl;
+
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.model.CqlResult;
+import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class CassandraActivityStreamsRepositoryTest {
+
+ public CassandraActivityStreamsRepository repository;
+
+
+ @Before
+ public void setup() {
+ repository = new CassandraActivityStreamsRepository();
+ }
+
+ @Test
+ public void getActivitiesForQuery() {
+ String cql = "select * from Activities";
+ List<CassandraActivityStreamsEntry> list= repository.getActivitiesForQuery(cql);
+ assert(list != null);
+ }
+}
Modified: incubator/streams/branches/cassandra/streams-eip-routes/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/pom.xml?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/pom.xml Thu Aug 8 19:53:27 2013
@@ -188,6 +188,35 @@
<version>0.22-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+
+ <!--astyanax-->
+ <dependency>
+ <groupId>com.netflix.astyanax</groupId>
+ <artifactId>astyanax-core</artifactId>
+ <version>1.56.43-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.netflix.astyanax</groupId>
+ <artifactId>astyanax-thrift</artifactId>
+ <version>1.56.43-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.netflix.astyanax</groupId>
+ <artifactId>astyanax-cassandra</artifactId>
+ <version>1.56.43-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.netflix.astyanax</groupId>
+ <artifactId>astyanax-entity-mapper</artifactId>
+ <version>1.56.43-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java Thu Aug 8 19:53:27 2013
@@ -5,47 +5,43 @@ import org.apache.camel.Exchange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
+import org.apache.streams.messaging.service.impl.CassandraActivityService;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.scheduling.annotation.Scheduled;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class ActivityAggregator {
private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+ private CassandraActivityService activityService;
private static final transient Log LOG = LogFactory.getLog(ActivityAggregator.class);
public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) {
this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
}
- public void distributeToSubscribers(Exchange exchange) {
-
- //iterate over the aggregated messages and send to subscribers in warehouse...they will evaluate and determine if they keep it
- List<Exchange> grouped = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
-
- //initialize the ObjectMapper
- ObjectMapper mapper = new ObjectMapper();
+ public void setActivityService(CassandraActivityService activityService) {
+ this.activityService = activityService;
+ }
- for (Exchange e : grouped){
- //get activity off of exchange
- LOG.info("Processing the activity...");
- LOG.info("Exchange: "+e);
-
- try {
- //extract the ActivityStreamsEntry object and translate to JSON
- CassandraActivityStreamsEntry activity = e.getIn().getBody(CassandraActivityStreamsEntry.class);
- String activityJSON = mapper.writeValueAsString(activity);
- LOG.info("Activity Object: "+activityJSON);
- for(ActivityStreamsSubscriber subscriber:activityStreamsSubscriberWarehouse.getAllSubscribers()){
- subscriber.receive(activityJSON);
- }
- } catch (IOException err) {
- LOG.warn("There was an error translating the java object to JSON");
- LOG.warn(err);
+ @Scheduled(fixedRate=30000)
+ public void distributeToSubscribers() {
+ for (ActivityStreamsSubscriber subscriber : activityStreamsSubscriberWarehouse.getAllSubscribers()) {
+ Set<String> activities = new HashSet<String>();
+ for (ActivityStreamsSubscriptionFilter filter: subscriber.getActivityStreamsSubscriberConfiguration().getActivityStreamsSubscriptionFilters()){
+ //send the query of each filter to the service to receive the activities of that filter
+ activities.addAll(activityService.getActivitiesForQuery(filter.getQuery()));
}
+ subscriber.receive(new ArrayList<String>(activities));
}
}
-}
\ No newline at end of file
+}
Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java Thu Aug 8 19:53:27 2013
@@ -2,7 +2,11 @@ package org.apache.streams.messaging.ser
import org.apache.camel.Exchange;
+import java.util.List;
+
public interface ActivityService {
void receiveExchange(Exchange exchange);
+
+ List<String> getActivitiesForQuery(String query);
}
Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java Thu Aug 8 19:53:27 2013
@@ -6,9 +6,11 @@ import org.apache.camel.Exchange;
import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
import org.apache.streams.messaging.service.ActivityService;
+import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
public class CassandraActivityService implements ActivityService {
@@ -16,9 +18,12 @@ public class CassandraActivityService im
private static final transient Log LOG = LogFactory.getLog(CassandraActivityService.class);
private CassandraActivityStreamsRepository cassandraActivityStreamsRepository;
+ private ObjectMapper mapper;
- public CassandraActivityService(){
+ public CassandraActivityService() {
this.cassandraActivityStreamsRepository = new CassandraActivityStreamsRepository();
+ this.mapper = new ObjectMapper();
+ mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
public void receiveExchange(Exchange exchange) {
@@ -26,18 +31,38 @@ public class CassandraActivityService im
//receive the exchange as a list
List<Exchange> grouped = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
- //initialize the ObjectMapper
- ObjectMapper mapper = new ObjectMapper();
-
for (Exchange e : grouped) {
//get activity off of exchange
- LOG.info("Processing the activity...");
LOG.info("Exchange: " + e);
//extract the ActivityStreamsEntry object and save it in the database
- CassandraActivityStreamsEntry activity = e.getIn().getBody(CassandraActivityStreamsEntry.class);
- LOG.info("The activity object with id: " + activity.getId() + "was received from the exchange");
- cassandraActivityStreamsRepository.save(activity);
+ LOG.info("About to preform the translation to JSON Object");
+ String activityJson = e.getIn().getBody(String.class);
+
+ try {
+ CassandraActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class);
+ cassandraActivityStreamsRepository.save(streamsEntry);
+ } catch (IOException err) {
+ LOG.error("there was an error while converting the json string to an object and saving to the database", err);
+ }
+
+ }
+ }
+
+ public List<String> getActivitiesForQuery(String query) {
+ List<CassandraActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForQuery(query);
+ return getJsonList(activityObjects);
+ }
+
+ private List<String> getJsonList(List<CassandraActivityStreamsEntry> activities) {
+ List<String> jsonList = new ArrayList<String>();
+ for (CassandraActivityStreamsEntry entry : activities) {
+ try {
+ jsonList.add(mapper.writeValueAsString(entry));
+ } catch (IOException e) {
+ LOG.error("There was an error while trying to convert the java object to a string: " + entry, e);
+ }
}
+ return jsonList;
}
}
Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml Thu Aug 8 19:53:27 2013
@@ -18,11 +18,12 @@
-->
<beans
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xmlns:task="http://www.springframework.org/schema/task"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<bean id="activityConsumerRouter" class="org.apache.streams.messaging.routers.impl.ActivityConsumerRouter">
@@ -39,10 +40,14 @@
</bean>
<bean id="subscriberRegistrationProcessor" class="org.apache.streams.messaging.processors.ActivityStreamsSubscriberRegistrationProcessor"/>
- <bean id="activityService" class="org.apache.streams.messaging.service.impl.CassandraActivityService">
+ <bean id="activityService" class="org.apache.streams.messaging.service.impl.CassandraActivityService"></bean>
+ <bean id="activityAggregator" class="org.apache.streams.messaging.aggregation.ActivityAggregator">
+ <property name="activityService" ref="activityService"/>
+ <property name="activityStreamsSubscriberWarehouse" ref="activityStreamsSubscriberWarehouse"/>
</bean>
+ <task:annotation-driven/>
<bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
Added: incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java?rev=1512000&view=auto
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java (added)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java Thu Aug 8 19:53:27 2013
@@ -0,0 +1,22 @@
+package org.apache.streams.messaging.service.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class CassandraActivityServiceTest {
+
+ private CassandraActivityService cassandraActivityService;
+
+ @Before
+ public void setup(){
+ cassandraActivityService = new CassandraActivityService();
+ }
+
+ @Test
+ public void getActivititiesForQueryTest(){
+ List<String> activities = cassandraActivityService.getActivitiesForQuery("select * from Activities");
+ assert(activities != null);
+ }
+}
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java Thu Aug 8 19:53:27 2013
@@ -66,20 +66,11 @@ public class PushActivityConsumer implem
}
- public List<CassandraActivityStreamsEntry> split(String activities) {
+ public List<String> split(String activities) {
LOG.info("I am going to split this message: " + activities);
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- List<CassandraActivityStreamsEntry> activitiesList = new ArrayList<CassandraActivityStreamsEntry>();
-
- try {
- //attempt to convert the activity to a java object
- LOG.info("About to preform the translation to JSON Object");
- CassandraActivityStreamsEntry streamsEntry = mapper.readValue(activities, CassandraActivityStreamsEntry.class);
- activitiesList.add(streamsEntry);
- } catch (Exception e) {
- LOG.info("Error while converting the JSON object to POJO and saving to database",e);
- }
+
+ List<String> activitiesList = new ArrayList<String>();
+ activitiesList.add(activities);
return activitiesList;
}
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml Thu Aug 8 19:53:27 2013
@@ -82,7 +82,7 @@
<Bundle-Version>${project.version}</Bundle-Version>
<Export-Package>${bundle.namespace};version="${project.version}"</Export-Package>
<Private-Package>${bundle.namespace}.impl.*</Private-Package>
- <Import-Package>org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.commons.logging</Import-Package>
+ <Import-Package>org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.commons.logging, org.codehaus.jackson.map</Import-Package>
</instructions>
</configuration>
</plugin>
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java Thu Aug 8 19:53:27 2013
@@ -1,8 +1,10 @@
package org.apache.streams.osgi.components.activitysubscriber;
+import java.util.List;
+
public interface ActivityStreamsSubscriber {
- public void receive(String activity);
+ public void receive(List<String> activity);
public String getStream();
public void init();
public void setInRoute(String route);
@@ -11,5 +13,5 @@ public interface ActivityStreamsSubscrib
public void updateActivityStreamsSubscriberConfiguration(String config);
public boolean isAuthenticated();
public void setAuthenticated(boolean authenticated);
-
+ public ActivityStreamsSubscription getActivityStreamsSubscriberConfiguration();
}
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java Thu Aug 8 19:53:27 2013
@@ -15,7 +15,4 @@ public interface ActivityStreamsSubscrip
public String getAuthToken();
public void setAuthToken(String token);
-
-
-
}
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java Thu Aug 8 19:53:27 2013
@@ -9,6 +9,8 @@ public interface ActivityStreamsSubscrip
public void setQuery(String query);
+ public String getQuery();
+
public boolean evaluate(String activity);
}
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java Thu Aug 8 19:53:27 2013
@@ -8,6 +8,7 @@ import org.codehaus.jackson.map.Deserial
import org.codehaus.jackson.map.ObjectMapper;
import java.util.ArrayList;
+import java.util.List;
public class ActivityStreamsSubscriberDelegate implements ActivityStreamsSubscriber {
@@ -69,11 +70,10 @@ public class ActivityStreamsSubscriberDe
this.inRoute = inRoute;
}
- public void receive (String activity){
- //receive activities...do anything that is necessary
- LOG.info("got a message i subscribed to: " + activity);
- //guarenteed unique?
- stream.add(activity);
+ public void receive (List<String> activity){
+ //add new activities to stream
+ LOG.info("adding activities to subscription stream");
+ stream.addAll(activity);
}
Added: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java?rev=1512000&view=auto
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java (added)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java Thu Aug 8 19:53:27 2013
@@ -0,0 +1,21 @@
+package org.apache.streams.osgi.components.activitysubscriber.impl;
+
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
+
+public class ActivityStreamsSubscriptionCassandraFilterImpl implements ActivityStreamsSubscriptionFilter {
+ private String query;
+
+ public ActivityStreamsSubscriptionCassandraFilterImpl(){}
+
+ public void setQuery(String query) {
+ this.query=query;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public boolean evaluate(String activity){
+ return true;
+ }
+}
Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java Thu Aug 8 19:53:27 2013
@@ -13,6 +13,10 @@ public class ActivityStreamsSubscription
this.query=query;
}
+ public String getQuery() {
+ return query;
+ }
+
public boolean evaluate(String activity){
return true;
}