You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by le...@apache.org on 2013/02/05 15:31:52 UTC

svn commit: r1442607 - in /incubator/streams/trunk: streams-eip-routes/ streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ streams-eip-routes/src/main/resources/META-INF/spring/ streams-osgi-components/activity-subscriber/src/ma...

Author: letourneau
Date: Tue Feb  5 14:31:52 2013
New Revision: 1442607

URL: http://svn.apache.org/viewvc?rev=1442607&view=rev
Log:
added aggregator

Added:
    incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/
    incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
Modified:
    incubator/streams/trunk/streams-eip-routes/pom.xml
    incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/camelContext.xml
    incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/osgi-component-import.xml
    incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
    incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java

Modified: incubator/streams/trunk/streams-eip-routes/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/pom.xml?rev=1442607&r1=1442606&r2=1442607&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/pom.xml (original)
+++ incubator/streams/trunk/streams-eip-routes/pom.xml Tue Feb  5 14:31:52 2013
@@ -70,9 +70,9 @@
                     <instructions>
                         <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
                         <Bundle-Version>${pom.version}</Bundle-Version>
-                        <Export-Package>${bundle.namespace};version="${pom.version}",org.apache.streams.messaging.configuration,org.apache.streams.messaging.routers,org.apache.streams.messaging.rules,org.apache.streams.messaging.processors,org.apache.activemq,org.codehaus.jackson.*;version="${jackson.version}"</Export-Package>
+                        <Export-Package>${bundle.namespace};version="${pom.version}",org.apache.streams.messaging.configuration,org.apache.streams.messaging.routers,org.apache.streams.messaging.rules,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,org.apache.activemq,org.codehaus.jackson.*;version="${jackson.version}"</Export-Package>
                         <Private-Package>${bundle.namespace}.messaging.routers.impl.*,${bundle.namespace}.messaging.rules.impl.*</Private-Package>
-                        <Import-Package>org.apache.camel.*;version="2.8.5",org.apache.streams.messaging.configuration,org.apache.activemq.camel.component,org.apache.activemq,org.apache.activemq.pool,org.apache.camel.component.jms,org.springframework.*;version="3.0.6.RELEASE",org.apache.commons.logging,org.apache.streams.*,org.apache.streams.osgi.components,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.streams.messaging.processors,javax.jms, javax.net.ssl, javax.transaction.xa, org.apache.activemq.advisory, org.apache.activemq.blob, org.apache.activemq.broker, org.apache.activemq.broker.region, org.apache.activemq.command, org.apache.activemq.filter, org.apache.activemq.jndi, org.apache.activemq.management, org.apache.activemq.selector, org.apache.activemq.state, org.apache.activemq.thread, org.apache.activemq.transaction, org.apache.activemq.trans
 port, org.apache.activemq.transport.failover, org.apache.activemq.transport.tcp, org.apache.activemq.usage, org.apache.activemq.util, org.slf4j,org.codehaus.jackson;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax</Import-Package>
+                        <Import-Package>org.apache.camel.*;version="2.8.5",org.apache.streams.messaging.configuration,org.apache.activemq.camel.component,org.apache.activemq,org.apache.activemq.pool,org.apache.camel.component.jms,org.springframework.*;version="3.0.6.RELEASE",org.apache.commons.logging,org.apache.streams.*,org.apache.streams.osgi.components,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,javax.jms, javax.net.ssl, javax.transaction.xa, org.apache.activemq.advisory, org.apache.activemq.blob, org.apache.activemq.broker, org.apache.activemq.broker.region, org.apache.activemq.command, org.apache.activemq.filter, org.apache.activemq.jndi, org.apache.activemq.management, org.apache.activemq.selector, org.apache.activemq.state, org.apache.activemq.thread, org.apache.active
 mq.transaction, org.apache.activemq.transport, org.apache.activemq.transport.failover, org.apache.activemq.transport.tcp, org.apache.activemq.usage, org.apache.activemq.util, org.slf4j,org.codehaus.jackson;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax</Import-Package>
                         <Include-Resource>src/main/resources</Include-Resource>
                     </instructions>
                 </configuration>

Added: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java?rev=1442607&view=auto
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java (added)
+++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java Tue Feb  5 14:31:52 2013
@@ -0,0 +1,33 @@
+package org.apache.streams.messaging.aggregation;
+
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
+
+import java.util.List;
+
+public class ActivityAggregator {
+
+    private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+
+    public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) {
+        this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
+    }
+
+    public void distributeToSubscribers(Exchange exchange) {
+
+        //iterate over the aggregated messages and send to subscribers in warehouse...they will evaluate and determine if they keep it
+        List<Exchange> grouped = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+
+        for (Exchange e : grouped){
+            //get activity off of exchange
+           String activity= e.getIn().getBody(String.class);
+
+            for(ActivityStreamsSubscriber subscriber:activityStreamsSubscriberWarehouse.getAllSubscribers()){
+                subscriber.receive(activity);
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/camelContext.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/camelContext.xml?rev=1442607&r1=1442606&r2=1442607&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/camelContext.xml (original)
+++ incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/camelContext.xml Tue Feb  5 14:31:52 2013
@@ -40,7 +40,15 @@
             <inOnly uri="activemq:queue:activities"/>
         </route>
 
-
+        <route>
+            <from uri="activemq:queue:activities"/>
+            <aggregate completionTimeout="500" groupExchanges="true">
+                <correlationExpression>
+                    <constant>true</constant>
+                </correlationExpression>
+                <bean ref="activityWarehouseDistributor" method="distributeToSubscribers"/>
+            </aggregate>
+        </route>
 
 
         <!-- register as a subscriber - returned the endpoint to poll and add to subscription sources - GET/POST -->

Modified: incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/osgi-component-import.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/osgi-component-import.xml?rev=1442607&r1=1442606&r2=1442607&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/osgi-component-import.xml (original)
+++ incubator/streams/trunk/streams-eip-routes/src/main/resources/META-INF/spring/osgi-component-import.xml Tue Feb  5 14:31:52 2013
@@ -25,6 +25,10 @@
     </bean>
      <bean id="subscriberRegistrationProcessor" class="org.apache.streams.messaging.processors.ActivityStreamsSubscriberRegistrationProcessor"/>
 
+    <bean id="activityWarehouseDistributor" class="org.apache.streams.messaging.aggregation.ActivityAggregator">
+        <property name="activityStreamsSubscriberWarehouse" ref="activityStreamsSubscriberWarehouse"/>
+    </bean>
+
 
 
 

Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java?rev=1442607&r1=1442606&r2=1442607&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java Tue Feb  5 14:31:52 2013
@@ -11,6 +11,6 @@ public interface ActivityStreamsSubscrib
 
     public ArrayList<ActivityStreamsSubscriber> findSubscribersByFilters(String src);
 
-
+    public ArrayList<ActivityStreamsSubscriber> getAllSubscribers();
 }
 

Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java?rev=1442607&r1=1442606&r2=1442607&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java Tue Feb  5 14:31:52 2013
@@ -29,7 +29,12 @@ public class ActivityStreamsSubscriberWa
 
     //the warehouse can do some interesting things to make the filtering efficient i think...
     public ArrayList<ActivityStreamsSubscriber> findSubscribersByFilters(String src){
-        return null;
+        return subscribers;
+    }
+
+
+    public ArrayList<ActivityStreamsSubscriber> getAllSubscribers(){
+        return subscribers;
     }