You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/17 10:38:55 UTC

[2/6] camel git commit: Modify parsing topic in the uri, and Exchange header value is changed because mqtt topic is able to use in after process.

Modify parsing topic in the uri, and Exchange header value is changed because mqtt topic is able to use in after process.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78b7681f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78b7681f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78b7681f

Branch: refs/heads/master
Commit: 78b7681f3d64f3c9f8667a55f233d8dd9bdcdf13
Parents: e2ef05d
Author: Takanori Suzuki <ta...@gmail.com>
Authored: Mon Oct 12 02:20:46 2015 +0900
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:18 2015 +0200

----------------------------------------------------------------------
 .../camel/component/paho/MqttProperties.java    | 71 ++++++++++++++++++++
 .../camel/component/paho/PahoComponent.java     | 15 +++++
 .../camel/component/paho/PahoConstants.java     |  2 +
 .../camel/component/paho/PahoConsumer.java      | 21 +++++-
 .../camel/component/paho/PahoEndpoint.java      | 21 +++++-
 .../camel/component/paho/PahoComponentTest.java | 51 +++++++++++++-
 6 files changed, 175 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
new file mode 100644
index 0000000..782654a
--- /dev/null
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+/**
+ * MQTT message properties.
+ */
+public class MqttProperties {
+
+    private String  topic;
+
+    private int     qos;
+
+    private boolean retain;
+
+    private boolean duplicate;
+
+    public MqttProperties() {}
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getQos() {
+        return qos;
+    }
+
+    public void setQos(int qos) {
+        this.qos = qos;
+    }
+
+    public boolean isRetain() {
+        return retain;
+    }
+
+    public void setRetain(boolean retain) {
+        this.retain = retain;
+    }
+
+    public boolean isDuplicate() {
+        return duplicate;
+    }
+
+    public void setDuplicate(boolean duplicate) {
+        this.duplicate = duplicate;
+    }
+    
+    @Override
+    public String toString() {
+        return "PahoMqttProperties [topic=" + topic + ", qos=" + qos + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index 232d38e..ea0b627 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -27,6 +27,7 @@ public class PahoComponent extends UriEndpointComponent {
     private String brokerUrl;
     private String clientId;
     private MqttConnectOptions connectOptions;
+    private String headerType;
 
     public PahoComponent() {
         super(PahoEndpoint.class);
@@ -45,6 +46,9 @@ public class PahoComponent extends UriEndpointComponent {
         if (connectOptions != null) {
             answer.setConnectOptions(connectOptions);
         }
+        if (headerType != null) {
+            answer.setHeaderType(headerType);
+        }
 
         setProperties(answer, parameters);
         return answer;
@@ -82,4 +86,15 @@ public class PahoComponent extends UriEndpointComponent {
     public void setConnectOptions(MqttConnectOptions connectOptions) {
         this.connectOptions = connectOptions;
     }
+    
+    public String getHeaderType() {
+        return headerType;
+    }
+
+    /**
+     * Exchange header type.
+     */
+    public void setHeaderType(String headerType) {
+        this.headerType = headerType;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
index 35b6a49..89d9994 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.paho;
 
 public final class PahoConstants {
 
+    public static final String HEASER_MQTT_PROPERTIES = "MqttProperties";
+
     public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage";
 
     private PahoConstants() {

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 100644c..86dee14 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -28,8 +28,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.paho.PahoConstants.HEADER_ORIGINAL_MESSAGE;
-
 public class PahoConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(PahoConsumer.class);
@@ -51,9 +49,26 @@ public class PahoConsumer extends DefaultConsumer {
 
             @Override
             public void messageArrived(String topic, MqttMessage message) throws Exception {
+                String headerKey;
+                Object headerValue;
+                String headerType = getEndpoint().getHeaderType();
+                if (PahoConstants.HEADER_ORIGINAL_MESSAGE.equals(headerType)) {
+                    headerKey = PahoConstants.HEADER_ORIGINAL_MESSAGE;
+                    headerValue = message;
+                } else {
+                    MqttProperties props = new MqttProperties();
+                    props.setTopic(topic);
+                    props.setQos(message.getQos());
+                    props.setRetain(message.isRetained());
+                    props.setDuplicate(message.isDuplicate());
+                    
+                    headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
+                    headerValue = props;
+                }
+                
                 Exchange exchange = ExchangeBuilder.anExchange(getEndpoint().getCamelContext()).
                         withBody(message.getPayload()).
-                        withHeader(HEADER_ORIGINAL_MESSAGE, message).
+                        withHeader(headerKey, headerValue).
                         build();
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 5ca943a..0562c9a 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -64,6 +64,8 @@ public class PahoEndpoint extends DefaultEndpoint {
     private int qos = DEFAULT_QOS;
     @UriParam(defaultValue = "MEMORY")
     private PahoPersistence persistence = MEMORY;
+    @UriParam(defaultValue = PahoConstants.HEASER_MQTT_PROPERTIES)
+    private String headerType = PahoConstants.HEASER_MQTT_PROPERTIES;
 
     // Collaboration members
     @UriParam
@@ -76,7 +78,12 @@ public class PahoEndpoint extends DefaultEndpoint {
     public PahoEndpoint(String uri, Component component) {
         super(uri, component);
         if (topic == null) {
-            topic = uri.substring(7);
+            int optionIndex = uri.indexOf("?");
+            if (optionIndex > 0) {
+                topic = uri.substring(7, optionIndex);
+            } else {
+                topic = uri.substring(7);
+            }
         }
     }
 
@@ -217,4 +224,16 @@ public class PahoEndpoint extends DefaultEndpoint {
     public void setConnectOptions(MqttConnectOptions connOpts) {
         this.connectOptions = connOpts;
     }
+
+    public String getHeaderType() {
+        return headerType;
+    }
+
+    /**
+     * Exchange header type.
+     */
+    public void setHeaderType(String headerType) {
+        this.headerType = headerType;
+    }
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 905f6fd..14e1515 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.paho;
 
+import java.io.UnsupportedEncodingException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
@@ -34,6 +36,9 @@ public class PahoComponentTest extends CamelTestSupport {
 
     @EndpointInject(uri = "mock:test")
     MockEndpoint mock;
+    
+    @EndpointInject(uri = "mock:test2")
+    MockEndpoint mock2;
 
     BrokerService broker;
 
@@ -65,9 +70,11 @@ public class PahoComponentTest extends CamelTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:test").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
-
                 from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test");
 
+                from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
+                from("paho:queue?headerType=PahoOriginalMessage&brokerUrl=tcp://localhost:" + mqttPort).to("mock:test2");
+
                 from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
 
                 from("direct:connectOptions").to("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort);
@@ -85,6 +92,26 @@ public class PahoComponentTest extends CamelTestSupport {
     // Tests
 
     @Test
+    public void checkOptions() {
+        String uri = "paho:/test/topic"
+                + "?clientId=sampleClient"
+                + "&brokerUrl=tcp://localhost:" + mqttPort
+                + "&qos=2"
+                + "&persistence=file"
+                + "&headerType=testType";
+        
+        PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class);
+
+        // Then
+        assertEquals("/test/topic", endpoint.getTopic());
+        assertEquals("sampleClient", endpoint.getClientId());
+        assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
+        assertEquals(2, endpoint.getQos());
+        assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
+        assertEquals("testType", endpoint.getHeaderType());
+    }
+
+    @Test
     public void shouldReadMessageFromMqtt() throws InterruptedException {
         // Given
         String msg = "msg";
@@ -132,7 +159,7 @@ public class PahoComponentTest extends CamelTestSupport {
     }
 
     @Test
-    public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
+    public void shouldKeepDefaultMessageInHeader() throws InterruptedException, UnsupportedEncodingException {
         // Given
         final String msg = "msg";
         mock.expectedBodiesReceived(msg);
@@ -143,6 +170,26 @@ public class PahoComponentTest extends CamelTestSupport {
         // Then
         mock.assertIsSatisfied();
         Exchange exchange = mock.getExchanges().get(0);
+        MqttProperties mqttProperties = exchange.getIn().getHeader(PahoConstants.HEASER_MQTT_PROPERTIES,
+                MqttProperties.class);
+        String payload = new String((byte[]) exchange.getIn().getBody(), "utf-8");
+
+        assertEquals("queue", new String(mqttProperties.getTopic()));
+        assertEquals(msg, new String(payload));
+    }
+
+    @Test
+    public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
+        // Given
+        final String msg = "msg";
+        mock2.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBody("direct:test2", msg);
+
+        // Then
+        mock2.assertIsSatisfied();
+        Exchange exchange = mock2.getExchanges().get(0);
         MqttMessage message = exchange.getIn().getHeader(PahoConstants.HEADER_ORIGINAL_MESSAGE, MqttMessage.class);
         assertEquals(msg, new String(message.getPayload()));
     }