You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by le...@apache.org on 2013/02/04 14:42:20 UTC
svn commit: r1442114 - in /incubator/streams/trunk: streams-eip-routes/
streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/
streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/
streams-osgi-components/activi...
Author: letourneau
Date: Mon Feb 4 13:42:19 2013
New Revision: 1442114
URL: http://svn.apache.org/viewvc?rev=1442114&view=rev
Log:
adding configuration of subscribers to be json object
Added:
incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java
Modified:
incubator/streams/trunk/streams-eip-routes/pom.xml
incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java
incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java
incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java
incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java
Modified: incubator/streams/trunk/streams-eip-routes/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/pom.xml?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/pom.xml (original)
+++ incubator/streams/trunk/streams-eip-routes/pom.xml Mon Feb 4 13:42:19 2013
@@ -6,6 +6,7 @@
<properties>
<bundle.symbolicName>streams-eip-routes</bundle.symbolicName>
<bundle.namespace>org.apache.streams</bundle.namespace>
+ <jackson.version>1.9.11</jackson.version>
</properties>
<modelVersion>4.0.0</modelVersion>
@@ -111,6 +112,18 @@
</dependency>
<dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mrbean</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi_R4_core</artifactId>
<version>1.0</version>
Modified: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java (original)
+++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java Mon Feb 4 13:42:19 2013
@@ -3,7 +3,12 @@ package org.apache.streams.messaging.pro
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.commons.logging.impl.SimpleLog;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberConfiguration;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
@@ -31,20 +36,21 @@ public class ActivityStreamsSubscriberRe
//OAuth token? What does subscriber post to init a subscription URL?
//maybe its a list of URLs to subscribe to subscriptions=1,2,3,4&auth_token=XXXX
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ // read from file, convert it to user class
+ ActivityStreamsSubscriberConfiguration configuration = mapper.readValue(body, ActivityStreamsSubscriberConfiguration.class);
+ exchange.getOut().setBody(configuration);
- try{
- HashMap<String, String[]> parsedBody = parseBody(body);
- if (parsedBody.get("subscriptions")==null){
- throw new Exception();
- }
- exchange.getOut().setBody(body);
- }catch(Exception e){
+ } catch (Exception e) {
exchange.getOut().setFault(true);
exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400);
- exchange.getOut().setBody("POST should contain subscriptions and auth_token key/value pair.");
+ exchange.getOut().setBody("POST should contain a valid Subscription configuration object.");
}
+
+
//just pass this on to the route creator, body will be the dedicated URL for this subscriber
}
@@ -53,17 +59,5 @@ public class ActivityStreamsSubscriberRe
}
- private HashMap<String, String[]> parseBody(String body) {
- HashMap<String,String[]> parts = new HashMap<String, String[]>();
- String[] segments = body.split("&");
- for (String seg : segments){
- String[] query = seg.split("=");
- if (query.length>0) {
- parts.put(query[0],query[1].split(","));
- }
- }
- if (parts.isEmpty()){return null;}
- return parts;
- }
}
Modified: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java (original)
+++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java Mon Feb 4 13:42:19 2013
@@ -40,24 +40,15 @@ public class ActivityStreamsSubscriberRo
//todo: add some better scheme then getCount for URL...
//todo: make the route again if subscriber exists...and context doesn't have route
-
- // ActivityStreamsSubscriber existingConsumer = activityStreamsSubscriberWarehouse.findSubscriberBySrc(activityStreamsSubscriber.getSrc());
-
-
-
activityStreamsSubscriber.setInRoute("http://" + configuration.getSubscriberInRouteHost()+ ":" + configuration.getSubscriberInRoutePort() + EipConfigurator.SUBSCRIBER_URL_RESOURCE + "/" + UUID.randomUUID());
-
try{
//setup a message queue for this consumer.getInRoute()
camelContext.addRoutes(new DynamicSubscriberRouteBuilder(configuration,camelContext, "jetty:" + activityStreamsSubscriber.getInRoute(), activityStreamsSubscriber));
//set the body to the url the producer should post to
exchange.getOut().setBody(activityStreamsSubscriber.getInRoute());
- log.info("subs : " + activityStreamsSubscriber.getSubscriptions());
- activityStreamsSubscriber.setActivityStreamsSubscriberWarehouse(activityStreamsSubscriberWarehouse);
- //only add the route to the warehouse after its been created in messaging system...
- activityStreamsSubscriberWarehouse.register(activityStreamsSubscriber.getSubscriptions(),activityStreamsSubscriber);
+ activityStreamsSubscriberWarehouse.register(activityStreamsSubscriber);
}catch (Exception e){
exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,500);
exchange.getOut().setBody("error creating route: " + e);
Modified: incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java Mon Feb 4 13:42:19 2013
@@ -3,6 +3,7 @@ package org.apache.streams.osgi.componen
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.osgi.components.ActivityStreamsSubscriberRegistration;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberConfiguration;
import org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriberDelegate;
import java.util.Date;
@@ -20,26 +21,12 @@ public class ActivityStreamsSubscriberRe
//using the URI supplied to set it up...
//return the consumer for addition to the consumer warehouse
- HashMap<String,String[]> bodyParts = parseBody(body.toString());
- String answer = prefix + " set body: " + body + " " + new Date();
- LOG.info(">> setting up subscriptions >>" + bodyParts.get("subscriptions"));
- if (bodyParts.get("subscriptions")!=null){return new ActivityStreamsSubscriberDelegate(bodyParts.get("subscriptions"));}
- return new ActivityStreamsSubscriberDelegate();
+ ActivityStreamsSubscriberConfiguration configuration = (ActivityStreamsSubscriberConfiguration)body;
+
+ return new ActivityStreamsSubscriberDelegate(configuration);
}
- private HashMap<String, String[]> parseBody(String body) {
- HashMap<String,String[]> parts = new HashMap<String, String[]>();
- String[] segments = body.split("&");
- for (String seg : segments){
- String[] query = seg.split("=");
- if (query.length>0) {
- parts.put(query[0],query[1].split(","));
- }
- }
- if (parts.isEmpty()){return null;}
- return parts;
- }
public boolean isVerbose() {
return verbose;
Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java Mon Feb 4 13:42:19 2013
@@ -8,11 +8,9 @@ public interface ActivityStreamsSubscrib
public void receive(String activity);
public String getStream();
public void init();
- public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse warehouse);
- public void addSrc(String[] src);
- public void addSrc(String src);
- public String[] getSubscriptions();
public void setInRoute(String route);
public String getInRoute();
+ public void setActivityStreamsSubscriberConfiguration(ActivityStreamsSubscriberConfiguration config);
+ public void updateActivityStreamsSubscriberConfiguration(ActivityStreamsSubscriberConfiguration config);
}
Added: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java?rev=1442114&view=auto
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java (added)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java Mon Feb 4 13:42:19 2013
@@ -0,0 +1,4 @@
+package org.apache.streams.osgi.components.activitysubscriber;
+
+public interface ActivityStreamsSubscriberConfiguration {
+}
Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java Mon Feb 4 13:42:19 2013
@@ -7,9 +7,9 @@ import java.util.ArrayList;
*/
public interface ActivityStreamsSubscriberWarehouse {
- public void register(String src, ActivityStreamsSubscriber activitySubscriber);
- public void register(String[] src, ActivityStreamsSubscriber activitySubscriber);
- public ArrayList<ActivityStreamsSubscriber> findSubscribersBySrc(String src);
+ public void register(ActivityStreamsSubscriber activitySubscriber);
+
+ public ArrayList<ActivityStreamsSubscriber> findSubscribersByFilters(String src);
}
Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java Mon Feb 4 13:42:19 2013
@@ -3,6 +3,7 @@ package org.apache.streams.osgi.componen
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberConfiguration;
import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
import java.util.ArrayList;
@@ -13,8 +14,7 @@ public class ActivityStreamsSubscriberDe
private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberDelegate.class);
-
- private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+ private ActivityStreamsSubscriberConfiguration activityStreamsSubscriberConfiguration;
private String inRoute;
@@ -22,25 +22,23 @@ public class ActivityStreamsSubscriberDe
private ArrayList<String> stream;
-
- private String[] subscriptions;
-
- public ActivityStreamsSubscriberDelegate(){
-
+ public ActivityStreamsSubscriberDelegate(ActivityStreamsSubscriberConfiguration configuration){
+ setActivityStreamsSubscriberConfiguration(configuration);
stream = new ArrayList<String>();
}
- public ActivityStreamsSubscriberDelegate(String[] subscriptions){
-
- this();
- this.subscriptions=subscriptions;
+ public ActivityStreamsSubscriberConfiguration getActivityStreamsSubscriberConfiguration() {
+ return activityStreamsSubscriberConfiguration;
}
- public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) {
- this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
+ public void setActivityStreamsSubscriberConfiguration(ActivityStreamsSubscriberConfiguration activityStreamsSubscriberConfiguration) {
+ this.activityStreamsSubscriberConfiguration = activityStreamsSubscriberConfiguration;
}
+ public void updateActivityStreamsSubscriberConfiguration(ActivityStreamsSubscriberConfiguration activityStreamsSubscriberConfiguration) {
+ this.activityStreamsSubscriberConfiguration = activityStreamsSubscriberConfiguration;
+ }
public String getInRoute() {
return inRoute;
@@ -50,22 +48,6 @@ public class ActivityStreamsSubscriberDe
this.inRoute = inRoute;
}
- public String[] getSubscriptions() {
- return subscriptions;
- }
-
- public void addSrc(String src){
- HashMap<String,String[]> bodyParts = parseBody(src);
-
- activityStreamsSubscriberWarehouse.register(bodyParts.get("subscriptions"),this);
- }
-
- public void addSrc(String[] src){
-
- activityStreamsSubscriberWarehouse.register(src,this);
- }
-
-
public void receive (String activity){
//receive activities...do anything that is necessary
LOG.info("got a message i subscribed to: " + activity);
@@ -89,19 +71,7 @@ public class ActivityStreamsSubscriberDe
}
- private HashMap<String, String[]> parseBody(String body) {
- HashMap<String,String[]> parts = new HashMap<String, String[]>();
- String[] segments = body.split("&");
- for (String seg : segments){
- String[] query = seg.split("=");
- if (query.length>0) {
- parts.put(query[0],query[1].split(","));
- }
- }
- if (parts.isEmpty()){return null;}
- return parts;
- }
Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java Mon Feb 4 13:42:19 2013
@@ -11,41 +11,24 @@ import org.apache.streams.osgi.component
public class ActivityStreamsSubscriberWarehouseImpl implements ActivityStreamsSubscriberWarehouse {
private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberWarehouseImpl.class);
- private HashMap<String,ArrayList<ActivityStreamsSubscriber>> subscribers;
+ private ArrayList<ActivityStreamsSubscriber> subscribers;
public ActivityStreamsSubscriberWarehouseImpl(){
- subscribers = new HashMap<String, ArrayList<ActivityStreamsSubscriber>>();
- subscribers.put(null,new ArrayList<ActivityStreamsSubscriber>());
+ subscribers = new ArrayList<ActivityStreamsSubscriber>();
}
- public void register(String src, ActivityStreamsSubscriber activitySubscriber) {
+ public void register(ActivityStreamsSubscriber activitySubscriber) {
- ArrayList<ActivityStreamsSubscriber> registeredSubscribers = subscribers.get(src);
-
- if (registeredSubscribers==null){
- registeredSubscribers = new ArrayList<ActivityStreamsSubscriber>();
+ if (!subscribers.contains(activitySubscriber)){
+ subscribers.add(activitySubscriber);
+ activitySubscriber.init();
}
- registeredSubscribers.add(activitySubscriber);
- subscribers.put(src,registeredSubscribers);
- activitySubscriber.init();
-
-
}
- public void register(String[] src, ActivityStreamsSubscriber activitySubscriber) {
-
- for (String s : src){
- register(s,activitySubscriber);
- }
-
-
- }
-
-
- public ArrayList<ActivityStreamsSubscriber> findSubscribersBySrc(String src){
- return subscribers.get(src);
+ public ArrayList<ActivityStreamsSubscriber> findSubscribersByFilter(String src){
+ return null;
}