You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/02/22 05:09:42 UTC

svn commit: r1292120 - in /camel/trunk/components/camel-cometd/src: main/java/org/apache/camel/component/cometd/ test/java/org/apache/camel/component/cometd/

Author: ningjiang
Date: Wed Feb 22 04:09:41 2012
New Revision: 1292120

URL: http://svn.apache.org/viewvc?rev=1292120&view=rev
Log:
CAMEL-4993 Add session information from cometd to camel message headers

Modified:
    camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
    camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdEndpoint.java
    camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java

Modified: camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java?rev=1292120&r1=1292119&r2=1292120&view=diff
==============================================================================
--- camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java (original)
+++ camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java Wed Feb 22 04:09:41 2012
@@ -19,6 +19,7 @@ package org.apache.camel.component.comet
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -27,21 +28,36 @@ import org.cometd.bayeux.server.ServerCh
 import org.cometd.bayeux.server.ServerMessage;
 import org.cometd.bayeux.server.ServerSession;
 import org.cometd.server.BayeuxServerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A Strategy used to convert between a Camel {@link Exchange} and
  * to and from a Cometd messages
  */
 public class CometdBinding {
+    
     public static final String HEADERS_FIELD = "CamelHeaders";
     public static final String COMETD_CLIENT_ID_HEADER_NAME = "CometdClientId";
-    public static final String SUBSCRIPTION_HEADER_NAME = "subscription";
+    public static final String COMETD_SUBSCRIPTION_HEADER_NAME = "subscription";
+    public static final String COMETD_SESSION_ATTR_HEADER_NAME = "CometdSessionAttr";
+    
+    private static final String IMPROPER_SESSTION_ATTRIBUTE_TYPE_MESSAGE = "Sesstion attribute %s has a value of %s which cannot be included as at header because it is not an int, string, or long.";
+    private static final transient Logger LOG = LoggerFactory.getLogger(CometdBinding.class);
 
     private final BayeuxServerImpl bayeux;
+    private boolean enableSessionHeader;
 
     public CometdBinding(BayeuxServerImpl bayeux) {
+        this(bayeux, false);
+    }
+
+    
+    public CometdBinding(BayeuxServerImpl bayeux, boolean enableSessionHeader) {
         this.bayeux = bayeux;
+        this.enableSessionHeader = enableSessionHeader;
     }
+    
 
     public ServerMessage.Mutable createCometdMessage(ServerChannel channel, ServerSession serverSession, Message camelMessage) {
         ServerMessage.Mutable mutable = bayeux.newMessage();
@@ -65,18 +81,38 @@ public class CometdBinding {
         message.setHeaders(getHeadersFromMessage(cometdMessage));
         message.setHeader(COMETD_CLIENT_ID_HEADER_NAME, remote.getId());
 
-        if (cometdMessage.get(SUBSCRIPTION_HEADER_NAME) != null) {
-            message.setHeader(SUBSCRIPTION_HEADER_NAME, cometdMessage.get(SUBSCRIPTION_HEADER_NAME));
+        if (cometdMessage.get(COMETD_SUBSCRIPTION_HEADER_NAME) != null) {
+            message.setHeader(COMETD_SUBSCRIPTION_HEADER_NAME, cometdMessage.get(COMETD_SUBSCRIPTION_HEADER_NAME));
+        }
+        
+        if (enableSessionHeader) {
+            addSessionAttributesToMessageHeaders(remote, message);
         }
 
         return message;
     }
 
 
+    private void addSessionAttributesToMessageHeaders(ServerSession remote, Message message) {
+        Set<String> attributeNames = remote.getAttributeNames();
+        for (String attributeName : attributeNames) {
+            Object attribute = remote.getAttribute(attributeName);
+
+            if (attribute instanceof Integer || attribute instanceof String || attribute instanceof Long
+                || attribute instanceof Double) {
+                message.setHeader(attributeName, attribute);
+            } else {
+                // Do we need to support other type of session objects ?
+                LOG.info(String.format(IMPROPER_SESSTION_ATTRIBUTE_TYPE_MESSAGE, attributeName, attribute));
+            }
+
+        }
+    }
+
+
     public void addHeadersToMessage(ServerMessage.Mutable cometdMessage, Message camelMessage) {
         if (camelMessage.hasHeaders()) {
             Map<String, Object> ext = cometdMessage.getExt(true);
-
             ext.put(HEADERS_FIELD, filterHeaders(camelMessage.getHeaders()));
         }
     }

Modified: camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdEndpoint.java?rev=1292120&r1=1292119&r2=1292120&view=diff
==============================================================================
--- camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdEndpoint.java (original)
+++ camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdEndpoint.java Wed Feb 22 04:09:41 2012
@@ -37,6 +37,7 @@ public class CometdEndpoint extends Defa
     private int maxInterval = 30000;
     private int multiFrameInterval = 1500;
     private boolean jsonCommented = true;
+    private boolean sessionHeadersEnabled;
     private int logLevel = 1;
     private URI uri;
     private CometdComponent component;
@@ -149,6 +150,14 @@ public class CometdEndpoint extends Defa
     public void setJsonCommented(boolean commented) {
         jsonCommented = commented;
     }
+    
+    public void setSessionHeadersEnabled(boolean enable) {
+        this.sessionHeadersEnabled = enable;
+    }
+
+    public boolean areSessionHeadersEnabled() {
+        return sessionHeadersEnabled;
+    }
 
     public int getLogLevel() {
         return logLevel;

Modified: camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java?rev=1292120&r1=1292119&r2=1292120&view=diff
==============================================================================
--- camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java (original)
+++ camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java Wed Feb 22 04:09:41 2012
@@ -18,6 +18,7 @@ package org.apache.camel.component.comet
 
 import java.util.List;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
@@ -71,13 +72,31 @@ public class CometdProducerConsumerTest 
             assertNotNull(message.getHeader(CometdBinding.COMETD_CLIENT_ID_HEADER_NAME));
         }
     }
+    
+    @Test
+    public void testSessionHeaderArgumentSet() throws Exception {
+        // setup
+        CometdComponent component = context.getComponent("cometd", CometdComponent.class);
+
+        // act
+        Endpoint result = component
+            .createEndpoint("cometd://127.0.0.1:"
+                            + port
+                            + "/service/testArgs?baseResource=file:./target/test-classes/webapp&"
+                            + "timeout=240000&interval=0&maxInterval=30000&multiFrameInterval=1500&jsonCommented=true&sessionHeadersEnabled=true&logLevel=2");
+
+        // assert
+        assertTrue(result instanceof CometdEndpoint);
+        CometdEndpoint cometdEndpoint = (CometdEndpoint)result;
+        assertTrue(cometdEndpoint.areSessionHeadersEnabled());
+    }
 
     @Override
     @Before
     public void setUp() throws Exception {
         port = AvailablePortFinder.getNextAvailable(23500);
         uri = "cometd://127.0.0.1:" + port + "/service/test?baseResource=file:./target/test-classes/webapp&"
-                + "timeout=240000&interval=0&maxInterval=30000&multiFrameInterval=1500&jsonCommented=true&logLevel=2";
+                + "timeout=240000&interval=0&maxInterval=30000&multiFrameInterval=1500&jsonCommented=true&sessionHeadersEnabled=true&logLevel=2";
 
         super.setUp();
     }
@@ -121,3 +140,4 @@ public class CometdProducerConsumerTest 
         }
     }
 }
+