You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by le...@apache.org on 2013/02/05 22:18:41 UTC

svn commit: r1442747 - in /incubator/streams/trunk: streams-eip-routes/ streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ streams-osgi-components/activi...

Author: letourneau
Date: Tue Feb  5 21:18:41 2013
New Revision: 1442747

URL: http://svn.apache.org/viewvc?rev=1442747&view=rev
Log:
added publisher setup with JSON post

Modified:
    incubator/streams/trunk/streams-eip-routes/pom.xml
    incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java
    incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java
    incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml
    incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java
    incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java
    incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java
    incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml
    incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java

Modified: incubator/streams/trunk/streams-eip-routes/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/pom.xml?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/pom.xml (original)
+++ incubator/streams/trunk/streams-eip-routes/pom.xml Tue Feb  5 21:18:41 2013
@@ -72,7 +72,7 @@
                         <Bundle-Version>${pom.version}</Bundle-Version>
                         <Export-Package>${bundle.namespace};version="${pom.version}",org.apache.streams.messaging.configuration,org.apache.streams.messaging.routers,org.apache.streams.messaging.rules,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,org.apache.activemq,org.codehaus.jackson.*;version="${jackson.version}"</Export-Package>
                         <Private-Package>${bundle.namespace}.messaging.routers.impl.*,${bundle.namespace}.messaging.rules.impl.*</Private-Package>
-                        <Import-Package>org.apache.camel.*;version="2.8.5",org.apache.streams.messaging.configuration,org.apache.activemq.camel.component,org.apache.activemq,org.apache.activemq.pool,org.apache.camel.component.jms,org.springframework.*;version="3.0.6.RELEASE",org.apache.commons.logging,org.apache.streams.*,org.apache.streams.osgi.components,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,javax.jms, javax.net.ssl, javax.transaction.xa, org.apache.activemq.advisory, org.apache.activemq.blob, org.apache.activemq.broker, org.apache.activemq.broker.region, org.apache.activemq.command, org.apache.activemq.filter, org.apache.activemq.jndi, org.apache.activemq.management, org.apache.activemq.selector, org.apache.activemq.state, org.apache.activemq.thread, org.apache.active
 mq.transaction, org.apache.activemq.transport, org.apache.activemq.transport.failover, org.apache.activemq.transport.tcp, org.apache.activemq.usage, org.apache.activemq.util, org.slf4j,org.codehaus.jackson;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax</Import-Package>
+                        <Import-Package>org.apache.camel.*;version="2.8.5",org.apache.streams.messaging.configuration,org.apache.activemq.camel.component,org.apache.activemq,org.apache.activemq.pool,org.apache.camel.component.jms,org.springframework.*;version="3.0.6.RELEASE",org.apache.commons.logging,org.apache.streams.*,org.apache.streams.osgi.components,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.streams.messaging.processors,org.apache.streams.messaging.aggregation,javax.jms, javax.net.ssl, javax.transaction.xa, org.apache.activemq.advisory, org.apache.activemq.blob, org.apache.activemq.broker, org.apache.activemq.broker.region, org.apache.activemq.command, org.apache.activemq.filter, org.apache.activemq.jndi, org.apache.activemq.management, org.apache.activemq.selector, org.apache.acti
 vemq.state, org.apache.activemq.thread, org.apache.activemq.transaction, org.apache.activemq.transport, org.apache.activemq.transport.failover, org.apache.activemq.transport.tcp, org.apache.activemq.usage, org.apache.activemq.util, org.slf4j,org.codehaus.jackson;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax</Import-Package>
                         <Include-Resource>src/main/resources</Include-Resource>
                     </instructions>
                 </configuration>

Modified: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java (original)
+++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java Tue Feb  5 21:18:41 2013
@@ -2,14 +2,16 @@ package org.apache.streams.messaging.pro
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer;
+import org.apache.streams.osgi.components.activityconsumer.impl.PushActivityConsumer;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
 
 
 public class ActivityPublisherRegistrationProcessor implements Processor{
-
+    private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRegistrationProcessor.class);
     public void process(Exchange exchange){
         //add the necessary headers to the message so that the activity registration component
         //can do a lookup to either make a new processor and endpoint, or pass the message to the right one
@@ -25,14 +27,25 @@ public class ActivityPublisherRegistrati
             // authentication, all that good stuff...happens in the registration module
 
             String body = exchange.getIn().getBody(String.class);
-            try{
-                URI publisherUrl = new URI(body);
-                exchange.getOut().setHeader("activityPublisherUri",body);
-                exchange.getOut().setBody(body);
-            }catch(URISyntaxException e){
+            ObjectMapper mapper = new ObjectMapper();
+            mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false);
+
+            try {
+
+                // read from file, convert it to user class
+                ActivityConsumer configuration = mapper.readValue(body, ActivityConsumer.class);
+                if (configuration.getSrc()==null){
+                   LOG.info("configuration src is null");
+                   throw new Exception();
+                }
+
+                exchange.getOut().setBody(configuration);
+
+            } catch (Exception e) {
+                LOG.info("error: " + e);
                 exchange.getOut().setFault(true);
                 exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400);
-                exchange.getOut().setBody("POST should only contain a valid URI that is registering as an Activity Publisher.");
+                exchange.getOut().setBody("POST should contain a valid JSON configuration for registering as an Activity Publisher (check that src element is valid).");
             }
         }
 

Modified: incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java (original)
+++ incubator/streams/trunk/streams-eip-routes/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java Tue Feb  5 21:18:41 2013
@@ -39,7 +39,7 @@ public class ActivityConsumerRouter exte
         //todo: add some better scheme then getCount for URL...
         //todo: make the route again if consumer exists...and context doesn't have route
         if (activityConsumer.isAuthenticated()){
-                ActivityConsumer existingConsumer = activityConsumerWarehouse.findConsumerBySrc(activityConsumer.getSrc());
+                ActivityConsumer existingConsumer = activityConsumerWarehouse.findConsumerBySrc(activityConsumer.getSrc().toASCIIString());
 
                 if (existingConsumer==null){
 

Modified: incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-consumer/pom.xml Tue Feb  5 21:18:41 2013
@@ -7,6 +7,7 @@
         <bundle.symbolicName>activity-consumer-bundle</bundle.symbolicName>
         <bundle.namespace>org.apache.streams.osgi.components.activityconsumer</bundle.namespace>
         <commons.log>1.1</commons.log>
+        <jackson.version>1.9.11</jackson.version>
     </properties>
 
     <modelVersion>4.0.0</modelVersion>
@@ -70,7 +71,7 @@
                         <Bundle-Version>${pom.version}</Bundle-Version>
                         <Export-Package>${bundle.namespace};version="${pom.version}",org.apache.streams.osgi.components.activityconsumer.impl</Export-Package>
                         <Private-Package>${bundle.namespace}.impl.*</Private-Package>
-                        <Import-Package>org.apache.streams.osgi.components.activityconsumer,org.apache.commons.logging</Import-Package>
+                        <Import-Package>org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activityconsumer.impl,org.apache.commons.logging,org.codehaus.jackson.*;version="${jackson.version}",javax.xml.datatype, javax.xml.namespace, javax.xml.parsers, org.joda.time, org.joda.time.format, org.w3c.dom, org.w3c.dom.bootstrap, org.w3c.dom.ls, org.xml.sax</Import-Package>
                         <Include-Resource>src/main/resources</Include-Resource>
                     </instructions>
                 </configuration>
@@ -88,6 +89,18 @@
         </dependency>
 
         <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mrbean</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>osgi_R4_compendium</artifactId>
             <version>1.0</version>
@@ -100,6 +113,7 @@
             <version>${commons.log}</version>
             <scope>provided</scope>
         </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file

Modified: incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/ActivityConsumer.java Tue Feb  5 21:18:41 2013
@@ -1,12 +1,21 @@
 package org.apache.streams.osgi.components.activityconsumer;
 
 
+import org.codehaus.jackson.annotate.JsonTypeInfo;
+
+import java.net.URI;
+
+
+@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 public interface ActivityConsumer {
     public String receive(String activity);
     public void init();
-    public String getSrc();
+    public URI getSrc();
+    public void setSrc(String src);
     public void setInRoute(String route);
     public String getInRoute();
+    public String getAuthToken();
+    public void setAuthToken(String token);
     public boolean isAuthenticated();
     public void setAuthenticated(boolean authenticated);
 }

Modified: incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/ActivityConsumerWarehouseImpl.java Tue Feb  5 21:18:41 2013
@@ -20,7 +20,7 @@ public class ActivityConsumerWarehouseIm
     public void register(ActivityConsumer activityConsumer) {
 
         //key in warehouse is the activity publisher URI source
-        consumers.put(activityConsumer.getSrc(), activityConsumer);
+        consumers.put(activityConsumer.getSrc().toASCIIString(), activityConsumer);
         activityConsumer.init();
 
 

Modified: incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-consumer/src/main/java/org/apache/streams/osgi/components/activityconsumer/impl/PushActivityConsumer.java Tue Feb  5 21:18:41 2013
@@ -3,6 +3,12 @@ package org.apache.streams.osgi.componen
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer;
+
+import javax.tools.JavaFileManager;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.List;
 import java.util.ArrayList;
 
@@ -10,24 +16,40 @@ public class PushActivityConsumer implem
 
     private static final transient Log LOG = LogFactory.getLog(PushActivityConsumer.class);
 
-    private String src;
+    private URI src;
+
 
 
+    private String authToken;
 
     private boolean authenticated;
 
     private String inRoute;
 
-    public PushActivityConsumer(String src){
-        this.setSrc(src);
+    public PushActivityConsumer(){
+
     }
 
-    public String getSrc() {
+
+    public URI getSrc() {
         return src;
     }
 
     public void setSrc(String src) {
-        this.src = src;
+        try{
+            this.src = new URI(src);
+
+        } catch (URISyntaxException e) {
+           this.src=null;
+        }
+    }
+
+    public String getAuthToken() {
+        return authToken;
+    }
+
+    public void setAuthToken(String authToken) {
+        this.authToken = authToken;
     }
 
     public boolean isAuthenticated() {

Modified: incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-registration/pom.xml Tue Feb  5 21:18:41 2013
@@ -69,7 +69,7 @@
                         <Bundle-Version>${pom.version}</Bundle-Version>
                         <Export-Package>${bundle.namespace};version="${pom.version}"</Export-Package>
                         <Private-Package>${bundle.namespace}.impl.*</Private-Package>
-                        <Import-Package>org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.commons.logging</Import-Package>
+                        <Import-Package>org.apache.streams.osgi.components.activityconsumer.impl,org.apache.streams.osgi.components.activityconsumer,org.apache.streams.osgi.components.activitysubscriber,org.apache.streams.osgi.components.activitysubscriber.impl,org.apache.commons.logging</Import-Package>
                         <Include-Resource>src/main/resources</Include-Resource>
                     </instructions>
                 </configuration>

Modified: incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java?rev=1442747&r1=1442746&r2=1442747&view=diff
==============================================================================
--- incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java (original)
+++ incubator/streams/trunk/streams-osgi-components/activity-registration/src/main/java/org/apache/streams/osgi/components/impl/ActivityPublisherRegistrationImpl.java Tue Feb  5 21:18:41 2013
@@ -5,6 +5,7 @@ import java.util.Date;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.streams.osgi.components.ActivityPublisherRegistration;
+import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer;
 import org.apache.streams.osgi.components.activityconsumer.impl.PushActivityConsumer;
 
 public class ActivityPublisherRegistrationImpl implements ActivityPublisherRegistration {
@@ -22,8 +23,8 @@ public class ActivityPublisherRegistrati
         String answer = prefix + " set body:  " + body + " " + new Date();
         LOG.info(">> call >>" + answer);
 
-        //should be configed like the subscriber = what type? polling? how often? etc...
-        PushActivityConsumer activityConsumer =new PushActivityConsumer(body.toString());
+
+        ActivityConsumer activityConsumer =(ActivityConsumer)body;
         //authenticate..
         activityConsumer.setAuthenticated(true);
         return activityConsumer;