You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by le...@apache.org on 2013/02/04 14:42:20 UTC

svn commit: r1442114 - in /incubator/streams/trunk: streams-eip-routes/ streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ streams-osgi-components/activi...

Author: letourneau
Date: Mon Feb  4 13:42:19 2013
New Revision: 1442114

URL: http://svn.apache.org/viewvc?rev=1442114&view=rev
Log:
adding configuration of subscribers to be json object

Added:
    incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java
Modified:
    incubator/streams/trunk/streams-eip-routes/pom.xml
    incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java
    incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java
    incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java
    incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
    incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
    incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
    incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java

Modified: incubator/streams/trunk/streams-eip-routes/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/pom.xml?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/pom.xml (original)
+++ incubator/streams/trunk/streams-eip-routes/pom.xml Mon Feb  4 13:42:19 2013
@@ -6,6 +6,7 @@
     <properties>
         <bundle.symbolicName>streams-eip-routes</bundle.symbolicName>
         <bundle.namespace>org.apache.streams</bundle.namespace>
+        <jackson.version>1.9.11</jackson.version>
     </properties>
 
     <modelVersion>4.0.0</modelVersion>
@@ -111,6 +112,18 @@
         </dependency>
 
         <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mrbean</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>osgi_R4_core</artifactId>
             <version>1.0</version>

Modified: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java (original)
+++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java Mon Feb  4 13:42:19 2013
@@ -3,7 +3,12 @@ package org.apache.streams.messaging.pro
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.commons.logging.impl.SimpleLog;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberConfiguration;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
@@ -31,20 +36,21 @@ public class ActivityStreamsSubscriberRe
             //OAuth token? What does subscriber post to init a subscription URL?
             //maybe its a list of URLs to subscribe to subscriptions=1,2,3,4&auth_token=XXXX
 
+            ObjectMapper mapper = new ObjectMapper();
 
+            try {
+                // read from file, convert it to user class
+                ActivityStreamsSubscriberConfiguration configuration = mapper.readValue(body, ActivityStreamsSubscriberConfiguration.class);
+                exchange.getOut().setBody(configuration);
 
-            try{
-                HashMap<String, String[]> parsedBody = parseBody(body);
-                if (parsedBody.get("subscriptions")==null){
-                    throw new Exception();
-                }
-                exchange.getOut().setBody(body);
-            }catch(Exception e){
+            } catch (Exception e) {
                 exchange.getOut().setFault(true);
                 exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400);
-                exchange.getOut().setBody("POST should contain subscriptions and auth_token key/value pair.");
+                exchange.getOut().setBody("POST should contain a valid Subscription configuration object.");
             }
 
+
+
             //just pass this on to the route creator, body will be the dedicated URL for this subscriber
 
         }
@@ -53,17 +59,5 @@ public class ActivityStreamsSubscriberRe
 
     }
 
-    private HashMap<String, String[]> parseBody(String body) {
-        HashMap<String,String[]> parts = new HashMap<String, String[]>();
-        String[] segments = body.split("&");
-        for (String seg : segments){
-            String[] query = seg.split("=");
-            if (query.length>0) {
-                parts.put(query[0],query[1].split(","));
-            }
-        }
 
-        if (parts.isEmpty()){return null;}
-        return parts;
-    }
 }

Modified: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java (original)
+++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java Mon Feb  4 13:42:19 2013
@@ -40,24 +40,15 @@ public class ActivityStreamsSubscriberRo
         //todo: add some better scheme then getCount for URL...
         //todo: make the route again if subscriber exists...and context doesn't have route
 
-
-      //  ActivityStreamsSubscriber existingConsumer = activityStreamsSubscriberWarehouse.findSubscriberBySrc(activityStreamsSubscriber.getSrc());
-
-
-
             activityStreamsSubscriber.setInRoute("http://" + configuration.getSubscriberInRouteHost()+ ":" + configuration.getSubscriberInRoutePort() + EipConfigurator.SUBSCRIBER_URL_RESOURCE + "/" + UUID.randomUUID());
 
-
             try{
 
                 //setup a message queue for this consumer.getInRoute()
                 camelContext.addRoutes(new DynamicSubscriberRouteBuilder(configuration,camelContext, "jetty:" + activityStreamsSubscriber.getInRoute(), activityStreamsSubscriber));
                 //set the body to the url the producer should post to
                 exchange.getOut().setBody(activityStreamsSubscriber.getInRoute());
-                log.info("subs : " + activityStreamsSubscriber.getSubscriptions());
-                activityStreamsSubscriber.setActivityStreamsSubscriberWarehouse(activityStreamsSubscriberWarehouse);
-                //only add the route to the warehouse after its been created in messaging system...
-               activityStreamsSubscriberWarehouse.register(activityStreamsSubscriber.getSubscriptions(),activityStreamsSubscriber);
+                activityStreamsSubscriberWarehouse.register(activityStreamsSubscriber);
             }catch (Exception e){
                 exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,500);
                 exchange.getOut().setBody("error creating route: " + e);

Modified: incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityStreamsSubscriberRegistrationImpl.java Mon Feb  4 13:42:19 2013
@@ -3,6 +3,7 @@ package org.apache.streams.osgi.componen
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.osgi.components.ActivityStreamsSubscriberRegistration;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberConfiguration;
 import org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriberDelegate;
 
 import java.util.Date;
@@ -20,26 +21,12 @@ public class ActivityStreamsSubscriberRe
         //using the URI supplied to set it up...
         //return the consumer for addition to the consumer warehouse
 
-        HashMap<String,String[]> bodyParts = parseBody(body.toString());
-        String answer = prefix + " set body:  " + body + " " + new Date();
-        LOG.info(">> setting up subscriptions >>" + bodyParts.get("subscriptions"));
-        if (bodyParts.get("subscriptions")!=null){return new ActivityStreamsSubscriberDelegate(bodyParts.get("subscriptions"));}
-        return new ActivityStreamsSubscriberDelegate();
+        ActivityStreamsSubscriberConfiguration configuration = (ActivityStreamsSubscriberConfiguration)body;
+
+        return new ActivityStreamsSubscriberDelegate(configuration);
     }
 
-    private HashMap<String, String[]> parseBody(String body) {
-        HashMap<String,String[]> parts = new HashMap<String, String[]>();
-        String[] segments = body.split("&");
-        for (String seg : segments){
-            String[] query = seg.split("=");
-            if (query.length>0) {
-                parts.put(query[0],query[1].split(","));
-            }
-        }
 
-        if (parts.isEmpty()){return null;}
-        return parts;
-    }
 
     public boolean isVerbose() {
         return verbose;

Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriber.java Mon Feb  4 13:42:19 2013
@@ -8,11 +8,9 @@ public interface ActivityStreamsSubscrib
     public void receive(String activity);
     public String getStream();
     public void init();
-    public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse warehouse);
-    public void addSrc(String[] src);
-    public void addSrc(String src);
-    public String[] getSubscriptions();
     public void setInRoute(String route);
     public String getInRoute();
+    public void setActivityStreamsSubscriberConfiguration(ActivityStreamsSubscriberConfiguration config);
+    public void updateActivityStreamsSubscriberConfiguration(ActivityStreamsSubscriberConfiguration config);
 
 }

Added: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java?rev=1442114&view=auto
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java (added)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberConfiguration.java Mon Feb  4 13:42:19 2013
@@ -0,0 +1,4 @@
+package org.apache.streams.osgi.components.activitysubscriber;
+
+public interface ActivityStreamsSubscriberConfiguration {
+}

Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscriberWarehouse.java Mon Feb  4 13:42:19 2013
@@ -7,9 +7,9 @@ import java.util.ArrayList;
  */
 public interface ActivityStreamsSubscriberWarehouse {
 
-    public void register(String src, ActivityStreamsSubscriber activitySubscriber);
-    public void register(String[] src, ActivityStreamsSubscriber activitySubscriber);
-    public ArrayList<ActivityStreamsSubscriber> findSubscribersBySrc(String src);
+    public void register(ActivityStreamsSubscriber activitySubscriber);
+
+    public ArrayList<ActivityStreamsSubscriber> findSubscribersByFilters(String src);
 
 
 }

Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberDelegate.java Mon Feb  4 13:42:19 2013
@@ -3,6 +3,7 @@ package org.apache.streams.osgi.componen
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
+import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberConfiguration;
 import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
 
 import java.util.ArrayList;
@@ -13,8 +14,7 @@ public class ActivityStreamsSubscriberDe
     private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberDelegate.class);
 
 
-
-    private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse;
+    private ActivityStreamsSubscriberConfiguration activityStreamsSubscriberConfiguration;
 
     private String inRoute;
 
@@ -22,25 +22,23 @@ public class ActivityStreamsSubscriberDe
     private ArrayList<String> stream;
 
 
-
-    private String[] subscriptions;
-
-    public ActivityStreamsSubscriberDelegate(){
-
+    public ActivityStreamsSubscriberDelegate(ActivityStreamsSubscriberConfiguration configuration){
+        setActivityStreamsSubscriberConfiguration(configuration);
         stream = new ArrayList<String>();
     }
 
-    public ActivityStreamsSubscriberDelegate(String[] subscriptions){
-
-        this();
-        this.subscriptions=subscriptions;
 
+    public ActivityStreamsSubscriberConfiguration getActivityStreamsSubscriberConfiguration() {
+        return activityStreamsSubscriberConfiguration;
     }
 
-    public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) {
-        this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse;
+    public void setActivityStreamsSubscriberConfiguration(ActivityStreamsSubscriberConfiguration activityStreamsSubscriberConfiguration) {
+        this.activityStreamsSubscriberConfiguration = activityStreamsSubscriberConfiguration;
     }
 
+    public void updateActivityStreamsSubscriberConfiguration(ActivityStreamsSubscriberConfiguration activityStreamsSubscriberConfiguration) {
+        this.activityStreamsSubscriberConfiguration = activityStreamsSubscriberConfiguration;
+    }
 
     public String getInRoute() {
         return inRoute;
@@ -50,22 +48,6 @@ public class ActivityStreamsSubscriberDe
         this.inRoute = inRoute;
     }
 
-    public String[] getSubscriptions() {
-        return subscriptions;
-    }
-
-    public void addSrc(String src){
-        HashMap<String,String[]> bodyParts = parseBody(src);
-
-        activityStreamsSubscriberWarehouse.register(bodyParts.get("subscriptions"),this);
-    }
-
-    public void addSrc(String[] src){
-
-        activityStreamsSubscriberWarehouse.register(src,this);
-    }
-
-
     public void receive (String activity){
         //receive activities...do anything that is necessary
         LOG.info("got a message i subscribed to: " + activity);
@@ -89,19 +71,7 @@ public class ActivityStreamsSubscriberDe
 
     }
 
-    private HashMap<String, String[]> parseBody(String body) {
-        HashMap<String,String[]> parts = new HashMap<String, String[]>();
-        String[] segments = body.split("&");
-        for (String seg : segments){
-            String[] query = seg.split("=");
-            if (query.length>0) {
-                parts.put(query[0],query[1].split(","));
-            }
-        }
 
-        if (parts.isEmpty()){return null;}
-        return parts;
-    }
 
 
 

Modified: incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java?rev=1442114&r1=1442113&r2=1442114&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriberWarehouseImpl.java Mon Feb  4 13:42:19 2013
@@ -11,41 +11,24 @@ import org.apache.streams.osgi.component
 public class ActivityStreamsSubscriberWarehouseImpl implements ActivityStreamsSubscriberWarehouse {
     private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberWarehouseImpl.class);
 
-    private HashMap<String,ArrayList<ActivityStreamsSubscriber>> subscribers;
+    private ArrayList<ActivityStreamsSubscriber> subscribers;
 
     public ActivityStreamsSubscriberWarehouseImpl(){
-        subscribers = new HashMap<String, ArrayList<ActivityStreamsSubscriber>>();
-        subscribers.put(null,new ArrayList<ActivityStreamsSubscriber>());
+        subscribers = new ArrayList<ActivityStreamsSubscriber>();
     }
 
-    public void register(String src, ActivityStreamsSubscriber activitySubscriber) {
+    public void register(ActivityStreamsSubscriber activitySubscriber) {
 
-        ArrayList<ActivityStreamsSubscriber> registeredSubscribers = subscribers.get(src);
-
-        if (registeredSubscribers==null){
-            registeredSubscribers = new ArrayList<ActivityStreamsSubscriber>();
+        if (!subscribers.contains(activitySubscriber)){
+            subscribers.add(activitySubscriber);
+            activitySubscriber.init();
         }
 
-        registeredSubscribers.add(activitySubscriber);
-        subscribers.put(src,registeredSubscribers);
-        activitySubscriber.init();
-
-
     }
 
-    public void register(String[] src, ActivityStreamsSubscriber activitySubscriber) {
-
-       for (String s : src){
-           register(s,activitySubscriber);
-       }
-
-
-    }
-
-
 
-    public ArrayList<ActivityStreamsSubscriber> findSubscribersBySrc(String src){
-        return subscribers.get(src);
+    public ArrayList<ActivityStreamsSubscriber> findSubscribersByFilter(String src){
+        return null;
     }