You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/17 10:38:54 UTC

[1/6] camel git commit: Add logging to camel-paho test

Repository: camel
Updated Branches:
  refs/heads/master e2ef05d41 -> 20200df36


Add logging to camel-paho test


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dbe9aa58
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dbe9aa58
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dbe9aa58

Branch: refs/heads/master
Commit: dbe9aa58c8c9c505cfbf031f2075b5350f07a148
Parents: 78b7681
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:09:15 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:18 2015 +0200

----------------------------------------------------------------------
 components/camel-paho/pom.xml                   | 13 +++++++
 .../src/test/resources/log4j.properties         | 36 ++++++++++++++++++++
 2 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dbe9aa58/components/camel-paho/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-paho/pom.xml b/components/camel-paho/pom.xml
index 01816a9..8ee8120 100644
--- a/components/camel-paho/pom.xml
+++ b/components/camel-paho/pom.xml
@@ -62,6 +62,19 @@
       <version>${activemq-version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <repositories>

http://git-wip-us.apache.org/repos/asf/camel/blob/dbe9aa58/components/camel-paho/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/test/resources/log4j.properties b/components/camel-paho/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d6cf91a
--- /dev/null
+++ b/components/camel-paho/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for testing
+#
+log4j.rootLogger=INFO, file
+
+# uncomment the following to enable camel debugging
+#log4j.logger.org.apache.camel.component.paho=TRACE
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+#log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-paho-test.log


[6/6] camel git commit: Fixed potential NPE if components create exchange the wrong way.

Posted by da...@apache.org.
Fixed potential NPE if components create exchange the wrong way.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/20200df3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/20200df3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/20200df3

Branch: refs/heads/master
Commit: 20200df3630a4393768df3b1f526aef04692f843
Parents: 2a90c3e
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:41:10 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:20 2015 +0200

----------------------------------------------------------------------
 .../camel/impl/DefaultRuntimeEndpointRegistry.java      | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/20200df3/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
index d866fea..9fd4c31 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
@@ -241,11 +241,13 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
             // we only capture details in extended mode
             ExchangeCreatedEvent ece = (ExchangeCreatedEvent) event;
             Endpoint endpoint = ece.getExchange().getFromEndpoint();
-            String routeId = ece.getExchange().getFromRouteId();
-            String uri = endpoint.getEndpointUri();
-            String key = asUtilizationKey(routeId, uri);
-            if (key != null) {
-                inputUtilization.onHit(key);
+            if (endpoint != null) {
+                String routeId = ece.getExchange().getFromRouteId();
+                String uri = endpoint.getEndpointUri();
+                String key = asUtilizationKey(routeId, uri);
+                if (key != null) {
+                    inputUtilization.onHit(key);
+                }
             }
         } else if (event instanceof ExchangeSendingEvent) {
             ExchangeSendingEvent ese = (ExchangeSendingEvent) event;


[4/6] camel git commit: Fixes #635. Create exchange the correct way.

Posted by da...@apache.org.
Fixes #635. Create exchange the correct way.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9e5a51ca
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9e5a51ca
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9e5a51ca

Branch: refs/heads/master
Commit: 9e5a51ca51c495e1038f7bd861372e032cd1dbc9
Parents: dbe9aa5
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:15:40 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:19 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/component/paho/PahoConsumer.java   | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9e5a51ca/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 86dee14..82b6c5f 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.builder.ExchangeBuilder;
 import org.apache.camel.impl.DefaultConsumer;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -44,7 +43,7 @@ public class PahoConsumer extends DefaultConsumer {
         getEndpoint().getClient().setCallback(new MqttCallback() {
             @Override
             public void connectionLost(Throwable cause) {
-                LOG.debug("MQTT broker connection lost:", cause);
+                LOG.debug("MQTT broker connection lost due " + cause.getMessage(), cause);
             }
 
             @Override
@@ -65,15 +64,15 @@ public class PahoConsumer extends DefaultConsumer {
                     headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
                     headerValue = props;
                 }
-                
-                Exchange exchange = ExchangeBuilder.anExchange(getEndpoint().getCamelContext()).
-                        withBody(message.getPayload()).
-                        withHeader(headerKey, headerValue).
-                        build();
+
+                Exchange exchange = getEndpoint().createExchange();
+                exchange.getIn().setBody(message.getPayload());
+                exchange.getIn().setHeader(headerKey, headerValue);
+
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override
                     public void done(boolean doneSync) {
-
+                        // noop
                     }
                 });
             }
@@ -88,6 +87,7 @@ public class PahoConsumer extends DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
+
         if (getEndpoint().getClient().isConnected()) {
             String topic = getEndpoint().getTopic();
             getEndpoint().getClient().unsubscribe(topic);


[2/6] camel git commit: Modify parsing topic in the uri, and Exchange header value is changed because mqtt topic is able to use in after process.

Posted by da...@apache.org.
Modify parsing topic in the uri, and Exchange header value is changed because mqtt topic is able to use in after process.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78b7681f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78b7681f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78b7681f

Branch: refs/heads/master
Commit: 78b7681f3d64f3c9f8667a55f233d8dd9bdcdf13
Parents: e2ef05d
Author: Takanori Suzuki <ta...@gmail.com>
Authored: Mon Oct 12 02:20:46 2015 +0900
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:18 2015 +0200

----------------------------------------------------------------------
 .../camel/component/paho/MqttProperties.java    | 71 ++++++++++++++++++++
 .../camel/component/paho/PahoComponent.java     | 15 +++++
 .../camel/component/paho/PahoConstants.java     |  2 +
 .../camel/component/paho/PahoConsumer.java      | 21 +++++-
 .../camel/component/paho/PahoEndpoint.java      | 21 +++++-
 .../camel/component/paho/PahoComponentTest.java | 51 +++++++++++++-
 6 files changed, 175 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
new file mode 100644
index 0000000..782654a
--- /dev/null
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+/**
+ * MQTT message properties.
+ */
+public class MqttProperties {
+
+    private String  topic;
+
+    private int     qos;
+
+    private boolean retain;
+
+    private boolean duplicate;
+
+    public MqttProperties() {}
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getQos() {
+        return qos;
+    }
+
+    public void setQos(int qos) {
+        this.qos = qos;
+    }
+
+    public boolean isRetain() {
+        return retain;
+    }
+
+    public void setRetain(boolean retain) {
+        this.retain = retain;
+    }
+
+    public boolean isDuplicate() {
+        return duplicate;
+    }
+
+    public void setDuplicate(boolean duplicate) {
+        this.duplicate = duplicate;
+    }
+    
+    @Override
+    public String toString() {
+        return "PahoMqttProperties [topic=" + topic + ", qos=" + qos + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index 232d38e..ea0b627 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -27,6 +27,7 @@ public class PahoComponent extends UriEndpointComponent {
     private String brokerUrl;
     private String clientId;
     private MqttConnectOptions connectOptions;
+    private String headerType;
 
     public PahoComponent() {
         super(PahoEndpoint.class);
@@ -45,6 +46,9 @@ public class PahoComponent extends UriEndpointComponent {
         if (connectOptions != null) {
             answer.setConnectOptions(connectOptions);
         }
+        if (headerType != null) {
+            answer.setHeaderType(headerType);
+        }
 
         setProperties(answer, parameters);
         return answer;
@@ -82,4 +86,15 @@ public class PahoComponent extends UriEndpointComponent {
     public void setConnectOptions(MqttConnectOptions connectOptions) {
         this.connectOptions = connectOptions;
     }
+    
+    public String getHeaderType() {
+        return headerType;
+    }
+
+    /**
+     * Exchange header type.
+     */
+    public void setHeaderType(String headerType) {
+        this.headerType = headerType;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
index 35b6a49..89d9994 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.paho;
 
 public final class PahoConstants {
 
+    public static final String HEASER_MQTT_PROPERTIES = "MqttProperties";
+
     public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage";
 
     private PahoConstants() {

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 100644c..86dee14 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -28,8 +28,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.paho.PahoConstants.HEADER_ORIGINAL_MESSAGE;
-
 public class PahoConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(PahoConsumer.class);
@@ -51,9 +49,26 @@ public class PahoConsumer extends DefaultConsumer {
 
             @Override
             public void messageArrived(String topic, MqttMessage message) throws Exception {
+                String headerKey;
+                Object headerValue;
+                String headerType = getEndpoint().getHeaderType();
+                if (PahoConstants.HEADER_ORIGINAL_MESSAGE.equals(headerType)) {
+                    headerKey = PahoConstants.HEADER_ORIGINAL_MESSAGE;
+                    headerValue = message;
+                } else {
+                    MqttProperties props = new MqttProperties();
+                    props.setTopic(topic);
+                    props.setQos(message.getQos());
+                    props.setRetain(message.isRetained());
+                    props.setDuplicate(message.isDuplicate());
+                    
+                    headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
+                    headerValue = props;
+                }
+                
                 Exchange exchange = ExchangeBuilder.anExchange(getEndpoint().getCamelContext()).
                         withBody(message.getPayload()).
-                        withHeader(HEADER_ORIGINAL_MESSAGE, message).
+                        withHeader(headerKey, headerValue).
                         build();
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 5ca943a..0562c9a 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -64,6 +64,8 @@ public class PahoEndpoint extends DefaultEndpoint {
     private int qos = DEFAULT_QOS;
     @UriParam(defaultValue = "MEMORY")
     private PahoPersistence persistence = MEMORY;
+    @UriParam(defaultValue = PahoConstants.HEASER_MQTT_PROPERTIES)
+    private String headerType = PahoConstants.HEASER_MQTT_PROPERTIES;
 
     // Collaboration members
     @UriParam
@@ -76,7 +78,12 @@ public class PahoEndpoint extends DefaultEndpoint {
     public PahoEndpoint(String uri, Component component) {
         super(uri, component);
         if (topic == null) {
-            topic = uri.substring(7);
+            int optionIndex = uri.indexOf("?");
+            if (optionIndex > 0) {
+                topic = uri.substring(7, optionIndex);
+            } else {
+                topic = uri.substring(7);
+            }
         }
     }
 
@@ -217,4 +224,16 @@ public class PahoEndpoint extends DefaultEndpoint {
     public void setConnectOptions(MqttConnectOptions connOpts) {
         this.connectOptions = connOpts;
     }
+
+    public String getHeaderType() {
+        return headerType;
+    }
+
+    /**
+     * Exchange header type.
+     */
+    public void setHeaderType(String headerType) {
+        this.headerType = headerType;
+    }
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 905f6fd..14e1515 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.paho;
 
+import java.io.UnsupportedEncodingException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
@@ -34,6 +36,9 @@ public class PahoComponentTest extends CamelTestSupport {
 
     @EndpointInject(uri = "mock:test")
     MockEndpoint mock;
+    
+    @EndpointInject(uri = "mock:test2")
+    MockEndpoint mock2;
 
     BrokerService broker;
 
@@ -65,9 +70,11 @@ public class PahoComponentTest extends CamelTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:test").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
-
                 from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test");
 
+                from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
+                from("paho:queue?headerType=PahoOriginalMessage&brokerUrl=tcp://localhost:" + mqttPort).to("mock:test2");
+
                 from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
 
                 from("direct:connectOptions").to("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort);
@@ -85,6 +92,26 @@ public class PahoComponentTest extends CamelTestSupport {
     // Tests
 
     @Test
+    public void checkOptions() {
+        String uri = "paho:/test/topic"
+                + "?clientId=sampleClient"
+                + "&brokerUrl=tcp://localhost:" + mqttPort
+                + "&qos=2"
+                + "&persistence=file"
+                + "&headerType=testType";
+        
+        PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class);
+
+        // Then
+        assertEquals("/test/topic", endpoint.getTopic());
+        assertEquals("sampleClient", endpoint.getClientId());
+        assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
+        assertEquals(2, endpoint.getQos());
+        assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
+        assertEquals("testType", endpoint.getHeaderType());
+    }
+
+    @Test
     public void shouldReadMessageFromMqtt() throws InterruptedException {
         // Given
         String msg = "msg";
@@ -132,7 +159,7 @@ public class PahoComponentTest extends CamelTestSupport {
     }
 
     @Test
-    public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
+    public void shouldKeepDefaultMessageInHeader() throws InterruptedException, UnsupportedEncodingException {
         // Given
         final String msg = "msg";
         mock.expectedBodiesReceived(msg);
@@ -143,6 +170,26 @@ public class PahoComponentTest extends CamelTestSupport {
         // Then
         mock.assertIsSatisfied();
         Exchange exchange = mock.getExchanges().get(0);
+        MqttProperties mqttProperties = exchange.getIn().getHeader(PahoConstants.HEASER_MQTT_PROPERTIES,
+                MqttProperties.class);
+        String payload = new String((byte[]) exchange.getIn().getBody(), "utf-8");
+
+        assertEquals("queue", new String(mqttProperties.getTopic()));
+        assertEquals(msg, new String(payload));
+    }
+
+    @Test
+    public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
+        // Given
+        final String msg = "msg";
+        mock2.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBody("direct:test2", msg);
+
+        // Then
+        mock2.assertIsSatisfied();
+        Exchange exchange = mock2.getExchanges().get(0);
         MqttMessage message = exchange.getIn().getHeader(PahoConstants.HEADER_ORIGINAL_MESSAGE, MqttMessage.class);
         assertEquals(msg, new String(message.getPayload()));
     }


[5/6] camel git commit: Fixed CS

Posted by da...@apache.org.
Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2a90c3e6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2a90c3e6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2a90c3e6

Branch: refs/heads/master
Commit: 2a90c3e606fcde1e5d8742d05d7b981cdf7fa714
Parents: d2429e7
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:36:50 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:19 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/paho/PahoEndpoint.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2a90c3e6/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 97dad4d..4264522 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.paho;
 
 import java.util.Set;
 
-import javax.xml.xpath.XPathConstants;
+import static java.lang.System.nanoTime;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
@@ -39,7 +39,6 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.lang.System.nanoTime;
 import static org.apache.camel.component.paho.PahoPersistence.MEMORY;
 
 @UriEndpoint(scheme = "paho", title = "Paho", consumerClass = PahoConsumer.class, label = "messaging", syntax = "paho:topic")


[3/6] camel git commit: CAMEL-9232: camel-paho - Create exchange correct. Fixes #635.

Posted by da...@apache.org.
CAMEL-9232: camel-paho - Create exchange correct. Fixes #635.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d2429e7a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d2429e7a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d2429e7a

Branch: refs/heads/master
Commit: d2429e7a475a6fd41bc39ee3dcaabd8af2d19362
Parents: 9e5a51c
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:36:15 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:19 2015 +0200

----------------------------------------------------------------------
 .../camel/component/paho/MqttProperties.java    | 71 --------------------
 .../camel/component/paho/PahoComponent.java     | 14 ----
 .../camel/component/paho/PahoConstants.java     |  3 +-
 .../camel/component/paho/PahoConsumer.java      | 24 +------
 .../camel/component/paho/PahoEndpoint.java      | 33 +++++----
 .../camel/component/paho/PahoMessage.java       | 47 +++++++++++++
 .../camel/component/paho/PahoComponentTest.java | 27 +++-----
 7 files changed, 78 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
deleted file mode 100644
index 782654a..0000000
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.paho;
-
-/**
- * MQTT message properties.
- */
-public class MqttProperties {
-
-    private String  topic;
-
-    private int     qos;
-
-    private boolean retain;
-
-    private boolean duplicate;
-
-    public MqttProperties() {}
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public int getQos() {
-        return qos;
-    }
-
-    public void setQos(int qos) {
-        this.qos = qos;
-    }
-
-    public boolean isRetain() {
-        return retain;
-    }
-
-    public void setRetain(boolean retain) {
-        this.retain = retain;
-    }
-
-    public boolean isDuplicate() {
-        return duplicate;
-    }
-
-    public void setDuplicate(boolean duplicate) {
-        this.duplicate = duplicate;
-    }
-    
-    @Override
-    public String toString() {
-        return "PahoMqttProperties [topic=" + topic + ", qos=" + qos + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index ea0b627..c9cbc41 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -27,7 +27,6 @@ public class PahoComponent extends UriEndpointComponent {
     private String brokerUrl;
     private String clientId;
     private MqttConnectOptions connectOptions;
-    private String headerType;
 
     public PahoComponent() {
         super(PahoEndpoint.class);
@@ -46,9 +45,6 @@ public class PahoComponent extends UriEndpointComponent {
         if (connectOptions != null) {
             answer.setConnectOptions(connectOptions);
         }
-        if (headerType != null) {
-            answer.setHeaderType(headerType);
-        }
 
         setProperties(answer, parameters);
         return answer;
@@ -87,14 +83,4 @@ public class PahoComponent extends UriEndpointComponent {
         this.connectOptions = connectOptions;
     }
     
-    public String getHeaderType() {
-        return headerType;
-    }
-
-    /**
-     * Exchange header type.
-     */
-    public void setHeaderType(String headerType) {
-        this.headerType = headerType;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
index 89d9994..4c8d1ff 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
@@ -18,8 +18,9 @@ package org.apache.camel.component.paho;
 
 public final class PahoConstants {
 
-    public static final String HEASER_MQTT_PROPERTIES = "MqttProperties";
+    public static final String MQTT_TOPIC = "CamelMqttTopic";
 
+    @Deprecated
     public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage";
 
     private PahoConstants() {

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 82b6c5f..75d6092 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -48,26 +48,8 @@ public class PahoConsumer extends DefaultConsumer {
 
             @Override
             public void messageArrived(String topic, MqttMessage message) throws Exception {
-                String headerKey;
-                Object headerValue;
-                String headerType = getEndpoint().getHeaderType();
-                if (PahoConstants.HEADER_ORIGINAL_MESSAGE.equals(headerType)) {
-                    headerKey = PahoConstants.HEADER_ORIGINAL_MESSAGE;
-                    headerValue = message;
-                } else {
-                    MqttProperties props = new MqttProperties();
-                    props.setTopic(topic);
-                    props.setQos(message.getQos());
-                    props.setRetain(message.isRetained());
-                    props.setDuplicate(message.isDuplicate());
-                    
-                    headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
-                    headerValue = props;
-                }
-
-                Exchange exchange = getEndpoint().createExchange();
-                exchange.getIn().setBody(message.getPayload());
-                exchange.getIn().setHeader(headerKey, headerValue);
+                LOG.debug("Message arrived on topic: {} -> {}", topic, message);
+                Exchange exchange = getEndpoint().createExchange(message, topic);
 
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override
@@ -79,7 +61,7 @@ public class PahoConsumer extends DefaultConsumer {
 
             @Override
             public void deliveryComplete(IMqttDeliveryToken token) {
-                LOG.debug("Delivery complete. Token: {}.", token);
+                LOG.debug("Delivery complete. Token: {}", token);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 0562c9a..97dad4d 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -18,10 +18,11 @@ package org.apache.camel.component.paho;
 
 import java.util.Set;
 
-import static java.lang.System.nanoTime;
+import javax.xml.xpath.XPathConstants;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
@@ -32,11 +33,13 @@ import org.apache.camel.spi.UriPath;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.System.nanoTime;
 import static org.apache.camel.component.paho.PahoPersistence.MEMORY;
 
 @UriEndpoint(scheme = "paho", title = "Paho", consumerClass = PahoConsumer.class, label = "messaging", syntax = "paho:topic")
@@ -45,15 +48,11 @@ public class PahoEndpoint extends DefaultEndpoint {
     private static final Logger LOG = LoggerFactory.getLogger(PahoEndpoint.class);
 
     // Constants
-
     private static final String DEFAULT_BROKER_URL = "tcp://localhost:1883";
-
     private static final int DEFAULT_QOS = 2;
-
     private static final String DEFAULT_QOS_STRING = DEFAULT_QOS + "";
 
     // Configuration members
-
     @UriPath @Metadata(required = "true")
     private String topic;
     @UriParam
@@ -64,8 +63,6 @@ public class PahoEndpoint extends DefaultEndpoint {
     private int qos = DEFAULT_QOS;
     @UriParam(defaultValue = "MEMORY")
     private PahoPersistence persistence = MEMORY;
-    @UriParam(defaultValue = PahoConstants.HEASER_MQTT_PROPERTIES)
-    private String headerType = PahoConstants.HEASER_MQTT_PROPERTIES;
 
     // Collaboration members
     @UriParam
@@ -144,6 +141,17 @@ public class PahoEndpoint extends DefaultEndpoint {
         return new MqttConnectOptions();
     }
 
+    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
+        PahoMessage paho = new PahoMessage();
+        paho.setMqttMessage(mqttMessage);
+        paho.setBody(mqttMessage.getPayload());
+        paho.setHeader(PahoConstants.MQTT_TOPIC, topic);
+
+        Exchange exchange = createExchange();
+        exchange.setIn(paho);
+        return exchange;
+    }
+
     // Configuration getters & setters
 
     public String getClientId() {
@@ -225,15 +233,4 @@ public class PahoEndpoint extends DefaultEndpoint {
         this.connectOptions = connOpts;
     }
 
-    public String getHeaderType() {
-        return headerType;
-    }
-
-    /**
-     * Exchange header type.
-     */
-    public void setHeaderType(String headerType) {
-        this.headerType = headerType;
-    }
-    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
new file mode 100644
index 0000000..a89e470
--- /dev/null
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+import org.apache.camel.impl.DefaultMessage;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public class PahoMessage extends DefaultMessage {
+
+    private transient MqttMessage mqttMessage;
+
+    public PahoMessage() {
+    }
+
+    public PahoMessage(MqttMessage mqttMessage) {
+        this.mqttMessage = mqttMessage;
+    }
+
+    public MqttMessage getMqttMessage() {
+        return mqttMessage;
+    }
+
+    public void setMqttMessage(MqttMessage mqttMessage) {
+        this.mqttMessage = mqttMessage;
+    }
+
+    @Override
+    public PahoMessage newInstance() {
+        return new PahoMessage(mqttMessage);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 14e1515..8013f8f 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -37,9 +37,6 @@ public class PahoComponentTest extends CamelTestSupport {
     @EndpointInject(uri = "mock:test")
     MockEndpoint mock;
     
-    @EndpointInject(uri = "mock:test2")
-    MockEndpoint mock2;
-
     BrokerService broker;
 
     int mqttPort = AvailablePortFinder.getNextAvailable();
@@ -73,7 +70,6 @@ public class PahoComponentTest extends CamelTestSupport {
                 from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test");
 
                 from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
-                from("paho:queue?headerType=PahoOriginalMessage&brokerUrl=tcp://localhost:" + mqttPort).to("mock:test2");
 
                 from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
 
@@ -97,9 +93,8 @@ public class PahoComponentTest extends CamelTestSupport {
                 + "?clientId=sampleClient"
                 + "&brokerUrl=tcp://localhost:" + mqttPort
                 + "&qos=2"
-                + "&persistence=file"
-                + "&headerType=testType";
-        
+                + "&persistence=file";
+
         PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class);
 
         // Then
@@ -108,7 +103,6 @@ public class PahoComponentTest extends CamelTestSupport {
         assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
         assertEquals(2, endpoint.getQos());
         assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
-        assertEquals("testType", endpoint.getHeaderType());
     }
 
     @Test
@@ -169,28 +163,29 @@ public class PahoComponentTest extends CamelTestSupport {
 
         // Then
         mock.assertIsSatisfied();
+
         Exchange exchange = mock.getExchanges().get(0);
-        MqttProperties mqttProperties = exchange.getIn().getHeader(PahoConstants.HEASER_MQTT_PROPERTIES,
-                MqttProperties.class);
         String payload = new String((byte[]) exchange.getIn().getBody(), "utf-8");
 
-        assertEquals("queue", new String(mqttProperties.getTopic()));
-        assertEquals(msg, new String(payload));
+        assertEquals("queue", exchange.getIn().getHeader(PahoConstants.MQTT_TOPIC));
+        assertEquals(msg, payload);
     }
 
     @Test
     public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
         // Given
         final String msg = "msg";
-        mock2.expectedBodiesReceived(msg);
+        mock.expectedBodiesReceived(msg);
 
         // When
         template.sendBody("direct:test2", msg);
 
         // Then
-        mock2.assertIsSatisfied();
-        Exchange exchange = mock2.getExchanges().get(0);
-        MqttMessage message = exchange.getIn().getHeader(PahoConstants.HEADER_ORIGINAL_MESSAGE, MqttMessage.class);
+        mock.assertIsSatisfied();
+        Exchange exchange = mock.getExchanges().get(0);
+
+        MqttMessage message = exchange.getIn(PahoMessage.class).getMqttMessage();
+        assertNotNull(message);
         assertEquals(msg, new String(message.getPayload()));
     }