You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by ds...@apache.org on 2013/08/08 21:53:28 UTC

svn commit: r1512000 - in /incubator/streams/branches/cassandra: streams-cassandra/ streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/ streams-cassandra/src/test/ streams-cassandra/src/test/java/ streams-cassandra/src/test/ja...

Author: dsullivan
Date: Thu Aug  8 19:53:27 2013
New Revision: 1512000

URL: http://svn.apache.org/r1512000
Log:
adding methods to receive activities from the repository

Added:
    incubator/streams/branches/cassandra/streams-cassandra/src/test/
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/
    incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/
    incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
    incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java
Modified:
    incubator/streams/branches/cassandra/streams-cassandra/pom.xml
    incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
    incubator/streams/branches/cassandra/streams-eip-routes/pom.xml
    incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
    incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java
    incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
    incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml
    incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java
    incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml
    incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
    incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java
    incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java
    incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
    incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java

Modified: incubator/streams/branches/cassandra/streams-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/pom.xml?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/pom.xml Thu Aug  8 19:53:27 2013
@@ -116,6 +116,12 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.8.2</version>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file

Modified: incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java Thu Aug  8 19:53:27 2013
@@ -18,6 +18,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
 
+import java.util.List;
+
 public class CassandraActivityStreamsRepository {
 
     private final String KEYSPACE_NAME = "ActivityStreams";
@@ -118,4 +120,10 @@ public class CassandraActivityStreamsRep
         }
     }
 
+    public List<CassandraActivityStreamsEntry> getActivitiesForQuery(String query) {
+            //return entities that match the given cql query
+            LOG.info("executing the query: "+query);
+            return entityManager.find(query);
+    }
+
 }

Added: incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java?rev=1512000&view=auto
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java (added)
+++ incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java Thu Aug  8 19:53:27 2013
@@ -0,0 +1,27 @@
+package org.apache.streams.cassandra.repository.impl;
+
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.model.CqlResult;
+import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class CassandraActivityStreamsRepositoryTest {
+
+    public CassandraActivityStreamsRepository repository;
+
+
+    @Before
+    public void setup() {
+        repository = new CassandraActivityStreamsRepository();
+    }
+
+    @Test
+    public void getActivitiesForQuery() {
+        String cql = "select * from Activities";
+        List<CassandraActivityStreamsEntry> list= repository.getActivitiesForQuery(cql);
+        assert(list != null);
+    }
+}

Modified: incubator/streams/branches/cassandra/streams-eip-routes/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/pom.xml?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/pom.xml Thu Aug  8 19:53:27 2013
@@ -188,6 +188,35 @@
             <version>0.22-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+
+        <!--astyanax-->
+        <dependency>
+            <groupId>com.netflix.astyanax</groupId>
+            <artifactId>astyanax-core</artifactId>
+            <version>1.56.43-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.netflix.astyanax</groupId>
+            <artifactId>astyanax-thrift</artifactId>
+            <version>1.56.43-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.netflix.astyanax</groupId>
+            <artifactId>astyanax-cassandra</artifactId>
+            <version>1.56.43-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.netflix.astyanax</groupId>
+            <artifactId>astyanax-entity-mapper</artifactId>
+            <version>1.56.43-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java Thu Aug  8 19:53:27 2013
@@ -5,47 +5,43 @@ import org.apache.camel.Exchange;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
+import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
+import org.apache.streams.messaging.service.impl.CassandraActivityService;
 import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
 import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.scheduling.annotation.Scheduled;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class ActivityAggregator {
 
     private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+    private CassandraActivityService activityService;
     private static final transient Log LOG = LogFactory.getLog(ActivityAggregator.class);
 
     public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) {
         this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
     }
 
-    public void distributeToSubscribers(Exchange exchange) {
-
-        //iterate over the aggregated messages and send to subscribers in warehouse...they will evaluate and determine if they keep it
-        List<Exchange> grouped = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
-
-        //initialize the ObjectMapper
-        ObjectMapper mapper = new ObjectMapper();
+    public void setActivityService(CassandraActivityService activityService) {
+        this.activityService = activityService;
+    }
 
-        for (Exchange e : grouped){
-            //get activity off of exchange
-           LOG.info("Processing the activity...");
-           LOG.info("Exchange: "+e);
-
-            try {
-                //extract the ActivityStreamsEntry object and translate to JSON
-                CassandraActivityStreamsEntry activity = e.getIn().getBody(CassandraActivityStreamsEntry.class);
-                String activityJSON = mapper.writeValueAsString(activity);
-                LOG.info("Activity Object: "+activityJSON);
-                for(ActivityStreamsSubscriber subscriber:activityStreamsSubscriberWarehouse.getAllSubscribers()){
-                    subscriber.receive(activityJSON);
-                }
-            } catch (IOException err) {
-                LOG.warn("There was an error translating the java object to JSON");
-                LOG.warn(err);
+    @Scheduled(fixedRate=30000)
+    public void distributeToSubscribers() {
+        for (ActivityStreamsSubscriber subscriber : activityStreamsSubscriberWarehouse.getAllSubscribers()) {
+            Set<String> activities = new HashSet<String>();
+            for (ActivityStreamsSubscriptionFilter filter: subscriber.getActivityStreamsSubscriberConfiguration().getActivityStreamsSubscriptionFilters()){
+                //send the query of each filter to the service to receive the activities of that filter
+                activities.addAll(activityService.getActivitiesForQuery(filter.getQuery()));
             }
+            subscriber.receive(new ArrayList<String>(activities));
         }
     }
-}
\ No newline at end of file
+}

Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java Thu Aug  8 19:53:27 2013
@@ -2,7 +2,11 @@ package org.apache.streams.messaging.ser
 
 import org.apache.camel.Exchange;
 
+import java.util.List;
+
 public interface ActivityService {
 
     void receiveExchange(Exchange exchange);
+
+    List<String> getActivitiesForQuery(String query);
 }

Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java Thu Aug  8 19:53:27 2013
@@ -6,9 +6,11 @@ import org.apache.camel.Exchange;
 import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
 import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository;
 import org.apache.streams.messaging.service.ActivityService;
+import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 public class CassandraActivityService implements ActivityService {
@@ -16,9 +18,12 @@ public class CassandraActivityService im
     private static final transient Log LOG = LogFactory.getLog(CassandraActivityService.class);
 
     private CassandraActivityStreamsRepository cassandraActivityStreamsRepository;
+    private ObjectMapper mapper;
 
-    public CassandraActivityService(){
+    public CassandraActivityService() {
         this.cassandraActivityStreamsRepository = new CassandraActivityStreamsRepository();
+        this.mapper = new ObjectMapper();
+        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     }
 
     public void receiveExchange(Exchange exchange) {
@@ -26,18 +31,38 @@ public class CassandraActivityService im
         //receive the exchange as a list
         List<Exchange> grouped = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
 
-        //initialize the ObjectMapper
-        ObjectMapper mapper = new ObjectMapper();
-
         for (Exchange e : grouped) {
             //get activity off of exchange
-            LOG.info("Processing the activity...");
             LOG.info("Exchange: " + e);
 
             //extract the ActivityStreamsEntry object and save it in the database
-            CassandraActivityStreamsEntry activity = e.getIn().getBody(CassandraActivityStreamsEntry.class);
-            LOG.info("The activity object with id: " + activity.getId() + "was received from the exchange");
-            cassandraActivityStreamsRepository.save(activity);
+            LOG.info("About to preform the translation to JSON Object");
+            String activityJson = e.getIn().getBody(String.class);
+
+            try {
+                CassandraActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class);
+                cassandraActivityStreamsRepository.save(streamsEntry);
+            } catch (IOException err) {
+                LOG.error("there was an error while converting the json string to an object and saving to the database", err);
+            }
+
+        }
+    }
+
+    public List<String> getActivitiesForQuery(String query) {
+        List<CassandraActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForQuery(query);
+        return getJsonList(activityObjects);
+    }
+
+    private List<String> getJsonList(List<CassandraActivityStreamsEntry> activities) {
+        List<String> jsonList = new ArrayList<String>();
+        for (CassandraActivityStreamsEntry entry : activities) {
+            try {
+                jsonList.add(mapper.writeValueAsString(entry));
+            } catch (IOException e) {
+                LOG.error("There was an error while trying to convert the java object to a string: " + entry, e);
+            }
         }
+        return jsonList;
     }
 }

Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml (original)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml Thu Aug  8 19:53:27 2013
@@ -18,11 +18,12 @@
   -->
 
 <beans
-       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xmlns="http://www.springframework.org/schema/beans"
-       xmlns:context="http://www.springframework.org/schema/context"
-       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
-        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:context="http://www.springframework.org/schema/context"
+        xmlns:task="http://www.springframework.org/schema/task"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
 
 
     <bean id="activityConsumerRouter" class="org.apache.streams.messaging.routers.impl.ActivityConsumerRouter">
@@ -39,10 +40,14 @@
     </bean>
     <bean id="subscriberRegistrationProcessor" class="org.apache.streams.messaging.processors.ActivityStreamsSubscriberRegistrationProcessor"/>
 
-    <bean id="activityService" class="org.apache.streams.messaging.service.impl.CassandraActivityService">
+    <bean id="activityService" class="org.apache.streams.messaging.service.impl.CassandraActivityService"></bean>
 
+    <bean id="activityAggregator" class="org.apache.streams.messaging.aggregation.ActivityAggregator">
+        <property name="activityService" ref="activityService"/>
+        <property name="activityStreamsSubscriberWarehouse" ref="activityStreamsSubscriberWarehouse"/>
     </bean>
 
+    <task:annotation-driven/>
 
     <bean id="jmsConnectionFactory"
           class="org.apache.activemq.ActiveMQConnectionFactory">

Added: incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java?rev=1512000&view=auto
==============================================================================
--- incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java (added)
+++ incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java Thu Aug  8 19:53:27 2013
@@ -0,0 +1,22 @@
+package org.apache.streams.messaging.service.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class CassandraActivityServiceTest {
+
+    private CassandraActivityService cassandraActivityService;
+
+    @Before
+    public void setup(){
+        cassandraActivityService = new CassandraActivityService();
+    }
+
+    @Test
+    public void getActivititiesForQueryTest(){
+        List<String> activities = cassandraActivityService.getActivitiesForQuery("select * from Activities");
+        assert(activities != null);
+    }
+}

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java Thu Aug  8 19:53:27 2013
@@ -66,20 +66,11 @@ public class PushActivityConsumer implem
 
     }
 
-    public List<CassandraActivityStreamsEntry> split(String activities) {
+    public List<String> split(String activities) {
         LOG.info("I am going to split this message: " + activities);
-        ObjectMapper mapper = new ObjectMapper();
-        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-        List<CassandraActivityStreamsEntry> activitiesList = new ArrayList<CassandraActivityStreamsEntry>();
-
-        try {
-            //attempt to convert the activity to a java object
-            LOG.info("About to preform the translation to JSON Object");
-            CassandraActivityStreamsEntry streamsEntry = mapper.readValue(activities, CassandraActivityStreamsEntry.class);
-            activitiesList.add(streamsEntry);
-        } catch (Exception e) {
-            LOG.info("Error while converting the JSON object to POJO and saving to database",e);
-        }
+
+        List<String> activitiesList = new ArrayList<String>();
+        activitiesList.add(activities);
         return activitiesList;
     }
 

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-registration/pom.xml Thu Aug  8 19:53:27 2013
@@ -82,7 +82,7 @@
                         <Bundle-Version>${project.version}</Bundle-Version>
                         <Export-Package>${bundle.namespace};version="${project.version}"</Export-Package>
                         <Private-Package>${bundle.namespace}.impl.*</Private-Package>
-                        <Import-Package>org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.commons.logging</Import-Package>
+                        <Import-Package>org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.commons.logging, org.codehaus.jackson.map</Import-Package>
                     </instructions>
                 </configuration>
             </plugin>

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java Thu Aug  8 19:53:27 2013
@@ -1,8 +1,10 @@
 package org.apache.streams.osgi.components.activitysubscriber;
 
 
+import java.util.List;
+
 public interface ActivityStreamsSubscriber {
-    public void receive(String activity);
+    public void receive(List<String> activity);
     public String getStream();
     public void init();
     public void setInRoute(String route);
@@ -11,5 +13,5 @@ public interface ActivityStreamsSubscrib
     public void updateActivityStreamsSubscriberConfiguration(String config);
     public boolean isAuthenticated();
     public void setAuthenticated(boolean authenticated);
-
+    public ActivityStreamsSubscription getActivityStreamsSubscriberConfiguration();
 }

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java Thu Aug  8 19:53:27 2013
@@ -15,7 +15,4 @@ public interface ActivityStreamsSubscrip
     public String getAuthToken();
     public void setAuthToken(String token);
 
-
-
-
 }

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriptionFilter.java Thu Aug  8 19:53:27 2013
@@ -9,6 +9,8 @@ public interface ActivityStreamsSubscrip
 
     public void setQuery(String query);
 
+    public String getQuery();
+
     public boolean evaluate(String activity);
 
 }

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java Thu Aug  8 19:53:27 2013
@@ -8,6 +8,7 @@ import org.codehaus.jackson.map.Deserial
 import org.codehaus.jackson.map.ObjectMapper;
 
 import java.util.ArrayList;
+import java.util.List;
 
 public class ActivityStreamsSubscriberDelegate implements ActivityStreamsSubscriber {
 
@@ -69,11 +70,10 @@ public class ActivityStreamsSubscriberDe
         this.inRoute = inRoute;
     }
 
-    public void receive (String activity){
-        //receive activities...do anything that is necessary
-        LOG.info("got a message i subscribed to: " + activity);
-        //guarenteed unique?
-        stream.add(activity);
+    public void receive (List<String> activity){
+        //add new activities to stream
+        LOG.info("adding activities to subscription stream");
+        stream.addAll(activity);
 
     }
 

Added: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java?rev=1512000&view=auto
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java (added)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java Thu Aug  8 19:53:27 2013
@@ -0,0 +1,21 @@
+package org.apache.streams.osgi.components.activitysubscriber.impl;
+
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
+
+public class ActivityStreamsSubscriptionCassandraFilterImpl implements ActivityStreamsSubscriptionFilter {
+    private String query;
+
+    public ActivityStreamsSubscriptionCassandraFilterImpl(){}
+
+    public void setQuery(String query) {
+        this.query=query;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public boolean evaluate(String activity){
+        return true;
+    }
+}

Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java?rev=1512000&r1=1511999&r2=1512000&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java (original)
+++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionLuceneFilterImpl.java Thu Aug  8 19:53:27 2013
@@ -13,6 +13,10 @@ public class ActivityStreamsSubscription
         this.query=query;
     }
 
+    public String getQuery() {
+        return query;
+    }
+
     public boolean evaluate(String activity){
         return true;
     }