You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/17 10:38:54 UTC
[1/6] camel git commit: Add logging to camel-paho test
Repository: camel
Updated Branches:
refs/heads/master e2ef05d41 -> 20200df36
Add logging to camel-paho test
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dbe9aa58
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dbe9aa58
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dbe9aa58
Branch: refs/heads/master
Commit: dbe9aa58c8c9c505cfbf031f2075b5350f07a148
Parents: 78b7681
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:09:15 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:18 2015 +0200
----------------------------------------------------------------------
components/camel-paho/pom.xml | 13 +++++++
.../src/test/resources/log4j.properties | 36 ++++++++++++++++++++
2 files changed, 49 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dbe9aa58/components/camel-paho/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-paho/pom.xml b/components/camel-paho/pom.xml
index 01816a9..8ee8120 100644
--- a/components/camel-paho/pom.xml
+++ b/components/camel-paho/pom.xml
@@ -62,6 +62,19 @@
<version>${activemq-version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<repositories>
http://git-wip-us.apache.org/repos/asf/camel/blob/dbe9aa58/components/camel-paho/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/test/resources/log4j.properties b/components/camel-paho/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d6cf91a
--- /dev/null
+++ b/components/camel-paho/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for testing
+#
+log4j.rootLogger=INFO, file
+
+# uncomment the following to enable camel debugging
+#log4j.logger.org.apache.camel.component.paho=TRACE
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+#log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-paho-test.log
[6/6] camel git commit: Fixed potential NPE if components create
exchange the wrong way.
Posted by da...@apache.org.
Fixed potential NPE if components create exchange the wrong way.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/20200df3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/20200df3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/20200df3
Branch: refs/heads/master
Commit: 20200df3630a4393768df3b1f526aef04692f843
Parents: 2a90c3e
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:41:10 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:20 2015 +0200
----------------------------------------------------------------------
.../camel/impl/DefaultRuntimeEndpointRegistry.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/20200df3/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
index d866fea..9fd4c31 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
@@ -241,11 +241,13 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
// we only capture details in extended mode
ExchangeCreatedEvent ece = (ExchangeCreatedEvent) event;
Endpoint endpoint = ece.getExchange().getFromEndpoint();
- String routeId = ece.getExchange().getFromRouteId();
- String uri = endpoint.getEndpointUri();
- String key = asUtilizationKey(routeId, uri);
- if (key != null) {
- inputUtilization.onHit(key);
+ if (endpoint != null) {
+ String routeId = ece.getExchange().getFromRouteId();
+ String uri = endpoint.getEndpointUri();
+ String key = asUtilizationKey(routeId, uri);
+ if (key != null) {
+ inputUtilization.onHit(key);
+ }
}
} else if (event instanceof ExchangeSendingEvent) {
ExchangeSendingEvent ese = (ExchangeSendingEvent) event;
[4/6] camel git commit: Fixes #635. Create exchange the correct way.
Posted by da...@apache.org.
Fixes #635. Create exchange the correct way.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9e5a51ca
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9e5a51ca
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9e5a51ca
Branch: refs/heads/master
Commit: 9e5a51ca51c495e1038f7bd861372e032cd1dbc9
Parents: dbe9aa5
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:15:40 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:19 2015 +0200
----------------------------------------------------------------------
.../apache/camel/component/paho/PahoConsumer.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9e5a51ca/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 86dee14..82b6c5f 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.builder.ExchangeBuilder;
import org.apache.camel.impl.DefaultConsumer;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -44,7 +43,7 @@ public class PahoConsumer extends DefaultConsumer {
getEndpoint().getClient().setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
- LOG.debug("MQTT broker connection lost:", cause);
+ LOG.debug("MQTT broker connection lost due " + cause.getMessage(), cause);
}
@Override
@@ -65,15 +64,15 @@ public class PahoConsumer extends DefaultConsumer {
headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
headerValue = props;
}
-
- Exchange exchange = ExchangeBuilder.anExchange(getEndpoint().getCamelContext()).
- withBody(message.getPayload()).
- withHeader(headerKey, headerValue).
- build();
+
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(message.getPayload());
+ exchange.getIn().setHeader(headerKey, headerValue);
+
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
-
+ // noop
}
});
}
@@ -88,6 +87,7 @@ public class PahoConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
super.doStop();
+
if (getEndpoint().getClient().isConnected()) {
String topic = getEndpoint().getTopic();
getEndpoint().getClient().unsubscribe(topic);
[2/6] camel git commit: Modify parsing topic in the uri,
and Exchange header value is changed because mqtt topic is able to
use in after process.
Posted by da...@apache.org.
Modify parsing topic in the uri, and Exchange header value is changed because mqtt topic is able to use in after process.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78b7681f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78b7681f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78b7681f
Branch: refs/heads/master
Commit: 78b7681f3d64f3c9f8667a55f233d8dd9bdcdf13
Parents: e2ef05d
Author: Takanori Suzuki <ta...@gmail.com>
Authored: Mon Oct 12 02:20:46 2015 +0900
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:18 2015 +0200
----------------------------------------------------------------------
.../camel/component/paho/MqttProperties.java | 71 ++++++++++++++++++++
.../camel/component/paho/PahoComponent.java | 15 +++++
.../camel/component/paho/PahoConstants.java | 2 +
.../camel/component/paho/PahoConsumer.java | 21 +++++-
.../camel/component/paho/PahoEndpoint.java | 21 +++++-
.../camel/component/paho/PahoComponentTest.java | 51 +++++++++++++-
6 files changed, 175 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
new file mode 100644
index 0000000..782654a
--- /dev/null
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+/**
+ * MQTT message properties.
+ */
+public class MqttProperties {
+
+ private String topic;
+
+ private int qos;
+
+ private boolean retain;
+
+ private boolean duplicate;
+
+ public MqttProperties() {}
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getQos() {
+ return qos;
+ }
+
+ public void setQos(int qos) {
+ this.qos = qos;
+ }
+
+ public boolean isRetain() {
+ return retain;
+ }
+
+ public void setRetain(boolean retain) {
+ this.retain = retain;
+ }
+
+ public boolean isDuplicate() {
+ return duplicate;
+ }
+
+ public void setDuplicate(boolean duplicate) {
+ this.duplicate = duplicate;
+ }
+
+ @Override
+ public String toString() {
+ return "PahoMqttProperties [topic=" + topic + ", qos=" + qos + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index 232d38e..ea0b627 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -27,6 +27,7 @@ public class PahoComponent extends UriEndpointComponent {
private String brokerUrl;
private String clientId;
private MqttConnectOptions connectOptions;
+ private String headerType;
public PahoComponent() {
super(PahoEndpoint.class);
@@ -45,6 +46,9 @@ public class PahoComponent extends UriEndpointComponent {
if (connectOptions != null) {
answer.setConnectOptions(connectOptions);
}
+ if (headerType != null) {
+ answer.setHeaderType(headerType);
+ }
setProperties(answer, parameters);
return answer;
@@ -82,4 +86,15 @@ public class PahoComponent extends UriEndpointComponent {
public void setConnectOptions(MqttConnectOptions connectOptions) {
this.connectOptions = connectOptions;
}
+
+ public String getHeaderType() {
+ return headerType;
+ }
+
+ /**
+ * Exchange header type.
+ */
+ public void setHeaderType(String headerType) {
+ this.headerType = headerType;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
index 35b6a49..89d9994 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.paho;
public final class PahoConstants {
+ public static final String HEASER_MQTT_PROPERTIES = "MqttProperties";
+
public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage";
private PahoConstants() {
http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 100644c..86dee14 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -28,8 +28,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.camel.component.paho.PahoConstants.HEADER_ORIGINAL_MESSAGE;
-
public class PahoConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(PahoConsumer.class);
@@ -51,9 +49,26 @@ public class PahoConsumer extends DefaultConsumer {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
+ String headerKey;
+ Object headerValue;
+ String headerType = getEndpoint().getHeaderType();
+ if (PahoConstants.HEADER_ORIGINAL_MESSAGE.equals(headerType)) {
+ headerKey = PahoConstants.HEADER_ORIGINAL_MESSAGE;
+ headerValue = message;
+ } else {
+ MqttProperties props = new MqttProperties();
+ props.setTopic(topic);
+ props.setQos(message.getQos());
+ props.setRetain(message.isRetained());
+ props.setDuplicate(message.isDuplicate());
+
+ headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
+ headerValue = props;
+ }
+
Exchange exchange = ExchangeBuilder.anExchange(getEndpoint().getCamelContext()).
withBody(message.getPayload()).
- withHeader(HEADER_ORIGINAL_MESSAGE, message).
+ withHeader(headerKey, headerValue).
build();
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 5ca943a..0562c9a 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -64,6 +64,8 @@ public class PahoEndpoint extends DefaultEndpoint {
private int qos = DEFAULT_QOS;
@UriParam(defaultValue = "MEMORY")
private PahoPersistence persistence = MEMORY;
+ @UriParam(defaultValue = PahoConstants.HEASER_MQTT_PROPERTIES)
+ private String headerType = PahoConstants.HEASER_MQTT_PROPERTIES;
// Collaboration members
@UriParam
@@ -76,7 +78,12 @@ public class PahoEndpoint extends DefaultEndpoint {
public PahoEndpoint(String uri, Component component) {
super(uri, component);
if (topic == null) {
- topic = uri.substring(7);
+ int optionIndex = uri.indexOf("?");
+ if (optionIndex > 0) {
+ topic = uri.substring(7, optionIndex);
+ } else {
+ topic = uri.substring(7);
+ }
}
}
@@ -217,4 +224,16 @@ public class PahoEndpoint extends DefaultEndpoint {
public void setConnectOptions(MqttConnectOptions connOpts) {
this.connectOptions = connOpts;
}
+
+ public String getHeaderType() {
+ return headerType;
+ }
+
+ /**
+ * Exchange header type.
+ */
+ public void setHeaderType(String headerType) {
+ this.headerType = headerType;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 905f6fd..14e1515 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.paho;
+import java.io.UnsupportedEncodingException;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
@@ -34,6 +36,9 @@ public class PahoComponentTest extends CamelTestSupport {
@EndpointInject(uri = "mock:test")
MockEndpoint mock;
+
+ @EndpointInject(uri = "mock:test2")
+ MockEndpoint mock2;
BrokerService broker;
@@ -65,9 +70,11 @@ public class PahoComponentTest extends CamelTestSupport {
@Override
public void configure() throws Exception {
from("direct:test").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
-
from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test");
+ from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
+ from("paho:queue?headerType=PahoOriginalMessage&brokerUrl=tcp://localhost:" + mqttPort).to("mock:test2");
+
from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
from("direct:connectOptions").to("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort);
@@ -85,6 +92,26 @@ public class PahoComponentTest extends CamelTestSupport {
// Tests
@Test
+ public void checkOptions() {
+ String uri = "paho:/test/topic"
+ + "?clientId=sampleClient"
+ + "&brokerUrl=tcp://localhost:" + mqttPort
+ + "&qos=2"
+ + "&persistence=file"
+ + "&headerType=testType";
+
+ PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class);
+
+ // Then
+ assertEquals("/test/topic", endpoint.getTopic());
+ assertEquals("sampleClient", endpoint.getClientId());
+ assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
+ assertEquals(2, endpoint.getQos());
+ assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
+ assertEquals("testType", endpoint.getHeaderType());
+ }
+
+ @Test
public void shouldReadMessageFromMqtt() throws InterruptedException {
// Given
String msg = "msg";
@@ -132,7 +159,7 @@ public class PahoComponentTest extends CamelTestSupport {
}
@Test
- public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
+ public void shouldKeepDefaultMessageInHeader() throws InterruptedException, UnsupportedEncodingException {
// Given
final String msg = "msg";
mock.expectedBodiesReceived(msg);
@@ -143,6 +170,26 @@ public class PahoComponentTest extends CamelTestSupport {
// Then
mock.assertIsSatisfied();
Exchange exchange = mock.getExchanges().get(0);
+ MqttProperties mqttProperties = exchange.getIn().getHeader(PahoConstants.HEASER_MQTT_PROPERTIES,
+ MqttProperties.class);
+ String payload = new String((byte[]) exchange.getIn().getBody(), "utf-8");
+
+ assertEquals("queue", new String(mqttProperties.getTopic()));
+ assertEquals(msg, new String(payload));
+ }
+
+ @Test
+ public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
+ // Given
+ final String msg = "msg";
+ mock2.expectedBodiesReceived(msg);
+
+ // When
+ template.sendBody("direct:test2", msg);
+
+ // Then
+ mock2.assertIsSatisfied();
+ Exchange exchange = mock2.getExchanges().get(0);
MqttMessage message = exchange.getIn().getHeader(PahoConstants.HEADER_ORIGINAL_MESSAGE, MqttMessage.class);
assertEquals(msg, new String(message.getPayload()));
}
[5/6] camel git commit: Fixed CS
Posted by da...@apache.org.
Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2a90c3e6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2a90c3e6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2a90c3e6
Branch: refs/heads/master
Commit: 2a90c3e606fcde1e5d8742d05d7b981cdf7fa714
Parents: d2429e7
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:36:50 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:19 2015 +0200
----------------------------------------------------------------------
.../main/java/org/apache/camel/component/paho/PahoEndpoint.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2a90c3e6/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 97dad4d..4264522 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.paho;
import java.util.Set;
-import javax.xml.xpath.XPathConstants;
+import static java.lang.System.nanoTime;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
@@ -39,7 +39,6 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.lang.System.nanoTime;
import static org.apache.camel.component.paho.PahoPersistence.MEMORY;
@UriEndpoint(scheme = "paho", title = "Paho", consumerClass = PahoConsumer.class, label = "messaging", syntax = "paho:topic")
[3/6] camel git commit: CAMEL-9232: camel-paho - Create exchange
correct. Fixes #635.
Posted by da...@apache.org.
CAMEL-9232: camel-paho - Create exchange correct. Fixes #635.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d2429e7a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d2429e7a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d2429e7a
Branch: refs/heads/master
Commit: d2429e7a475a6fd41bc39ee3dcaabd8af2d19362
Parents: 9e5a51c
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Oct 17 10:36:15 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Oct 17 10:41:19 2015 +0200
----------------------------------------------------------------------
.../camel/component/paho/MqttProperties.java | 71 --------------------
.../camel/component/paho/PahoComponent.java | 14 ----
.../camel/component/paho/PahoConstants.java | 3 +-
.../camel/component/paho/PahoConsumer.java | 24 +------
.../camel/component/paho/PahoEndpoint.java | 33 +++++----
.../camel/component/paho/PahoMessage.java | 47 +++++++++++++
.../camel/component/paho/PahoComponentTest.java | 27 +++-----
7 files changed, 78 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
deleted file mode 100644
index 782654a..0000000
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.paho;
-
-/**
- * MQTT message properties.
- */
-public class MqttProperties {
-
- private String topic;
-
- private int qos;
-
- private boolean retain;
-
- private boolean duplicate;
-
- public MqttProperties() {}
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getQos() {
- return qos;
- }
-
- public void setQos(int qos) {
- this.qos = qos;
- }
-
- public boolean isRetain() {
- return retain;
- }
-
- public void setRetain(boolean retain) {
- this.retain = retain;
- }
-
- public boolean isDuplicate() {
- return duplicate;
- }
-
- public void setDuplicate(boolean duplicate) {
- this.duplicate = duplicate;
- }
-
- @Override
- public String toString() {
- return "PahoMqttProperties [topic=" + topic + ", qos=" + qos + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index ea0b627..c9cbc41 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -27,7 +27,6 @@ public class PahoComponent extends UriEndpointComponent {
private String brokerUrl;
private String clientId;
private MqttConnectOptions connectOptions;
- private String headerType;
public PahoComponent() {
super(PahoEndpoint.class);
@@ -46,9 +45,6 @@ public class PahoComponent extends UriEndpointComponent {
if (connectOptions != null) {
answer.setConnectOptions(connectOptions);
}
- if (headerType != null) {
- answer.setHeaderType(headerType);
- }
setProperties(answer, parameters);
return answer;
@@ -87,14 +83,4 @@ public class PahoComponent extends UriEndpointComponent {
this.connectOptions = connectOptions;
}
- public String getHeaderType() {
- return headerType;
- }
-
- /**
- * Exchange header type.
- */
- public void setHeaderType(String headerType) {
- this.headerType = headerType;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
index 89d9994..4c8d1ff 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
@@ -18,8 +18,9 @@ package org.apache.camel.component.paho;
public final class PahoConstants {
- public static final String HEASER_MQTT_PROPERTIES = "MqttProperties";
+ public static final String MQTT_TOPIC = "CamelMqttTopic";
+ @Deprecated
public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage";
private PahoConstants() {
http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 82b6c5f..75d6092 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -48,26 +48,8 @@ public class PahoConsumer extends DefaultConsumer {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
- String headerKey;
- Object headerValue;
- String headerType = getEndpoint().getHeaderType();
- if (PahoConstants.HEADER_ORIGINAL_MESSAGE.equals(headerType)) {
- headerKey = PahoConstants.HEADER_ORIGINAL_MESSAGE;
- headerValue = message;
- } else {
- MqttProperties props = new MqttProperties();
- props.setTopic(topic);
- props.setQos(message.getQos());
- props.setRetain(message.isRetained());
- props.setDuplicate(message.isDuplicate());
-
- headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
- headerValue = props;
- }
-
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(message.getPayload());
- exchange.getIn().setHeader(headerKey, headerValue);
+ LOG.debug("Message arrived on topic: {} -> {}", topic, message);
+ Exchange exchange = getEndpoint().createExchange(message, topic);
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
@@ -79,7 +61,7 @@ public class PahoConsumer extends DefaultConsumer {
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
- LOG.debug("Delivery complete. Token: {}.", token);
+ LOG.debug("Delivery complete. Token: {}", token);
}
});
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 0562c9a..97dad4d 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -18,10 +18,11 @@ package org.apache.camel.component.paho;
import java.util.Set;
-import static java.lang.System.nanoTime;
+import javax.xml.xpath.XPathConstants;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
@@ -32,11 +33,13 @@ import org.apache.camel.spi.UriPath;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.lang.System.nanoTime;
import static org.apache.camel.component.paho.PahoPersistence.MEMORY;
@UriEndpoint(scheme = "paho", title = "Paho", consumerClass = PahoConsumer.class, label = "messaging", syntax = "paho:topic")
@@ -45,15 +48,11 @@ public class PahoEndpoint extends DefaultEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(PahoEndpoint.class);
// Constants
-
private static final String DEFAULT_BROKER_URL = "tcp://localhost:1883";
-
private static final int DEFAULT_QOS = 2;
-
private static final String DEFAULT_QOS_STRING = DEFAULT_QOS + "";
// Configuration members
-
@UriPath @Metadata(required = "true")
private String topic;
@UriParam
@@ -64,8 +63,6 @@ public class PahoEndpoint extends DefaultEndpoint {
private int qos = DEFAULT_QOS;
@UriParam(defaultValue = "MEMORY")
private PahoPersistence persistence = MEMORY;
- @UriParam(defaultValue = PahoConstants.HEASER_MQTT_PROPERTIES)
- private String headerType = PahoConstants.HEASER_MQTT_PROPERTIES;
// Collaboration members
@UriParam
@@ -144,6 +141,17 @@ public class PahoEndpoint extends DefaultEndpoint {
return new MqttConnectOptions();
}
+ public Exchange createExchange(MqttMessage mqttMessage, String topic) {
+ PahoMessage paho = new PahoMessage();
+ paho.setMqttMessage(mqttMessage);
+ paho.setBody(mqttMessage.getPayload());
+ paho.setHeader(PahoConstants.MQTT_TOPIC, topic);
+
+ Exchange exchange = createExchange();
+ exchange.setIn(paho);
+ return exchange;
+ }
+
// Configuration getters & setters
public String getClientId() {
@@ -225,15 +233,4 @@ public class PahoEndpoint extends DefaultEndpoint {
this.connectOptions = connOpts;
}
- public String getHeaderType() {
- return headerType;
- }
-
- /**
- * Exchange header type.
- */
- public void setHeaderType(String headerType) {
- this.headerType = headerType;
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
new file mode 100644
index 0000000..a89e470
--- /dev/null
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+import org.apache.camel.impl.DefaultMessage;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public class PahoMessage extends DefaultMessage {
+
+ private transient MqttMessage mqttMessage;
+
+ public PahoMessage() {
+ }
+
+ public PahoMessage(MqttMessage mqttMessage) {
+ this.mqttMessage = mqttMessage;
+ }
+
+ public MqttMessage getMqttMessage() {
+ return mqttMessage;
+ }
+
+ public void setMqttMessage(MqttMessage mqttMessage) {
+ this.mqttMessage = mqttMessage;
+ }
+
+ @Override
+ public PahoMessage newInstance() {
+ return new PahoMessage(mqttMessage);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 14e1515..8013f8f 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -37,9 +37,6 @@ public class PahoComponentTest extends CamelTestSupport {
@EndpointInject(uri = "mock:test")
MockEndpoint mock;
- @EndpointInject(uri = "mock:test2")
- MockEndpoint mock2;
-
BrokerService broker;
int mqttPort = AvailablePortFinder.getNextAvailable();
@@ -73,7 +70,6 @@ public class PahoComponentTest extends CamelTestSupport {
from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test");
from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
- from("paho:queue?headerType=PahoOriginalMessage&brokerUrl=tcp://localhost:" + mqttPort).to("mock:test2");
from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
@@ -97,9 +93,8 @@ public class PahoComponentTest extends CamelTestSupport {
+ "?clientId=sampleClient"
+ "&brokerUrl=tcp://localhost:" + mqttPort
+ "&qos=2"
- + "&persistence=file"
- + "&headerType=testType";
-
+ + "&persistence=file";
+
PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class);
// Then
@@ -108,7 +103,6 @@ public class PahoComponentTest extends CamelTestSupport {
assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
assertEquals(2, endpoint.getQos());
assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
- assertEquals("testType", endpoint.getHeaderType());
}
@Test
@@ -169,28 +163,29 @@ public class PahoComponentTest extends CamelTestSupport {
// Then
mock.assertIsSatisfied();
+
Exchange exchange = mock.getExchanges().get(0);
- MqttProperties mqttProperties = exchange.getIn().getHeader(PahoConstants.HEASER_MQTT_PROPERTIES,
- MqttProperties.class);
String payload = new String((byte[]) exchange.getIn().getBody(), "utf-8");
- assertEquals("queue", new String(mqttProperties.getTopic()));
- assertEquals(msg, new String(payload));
+ assertEquals("queue", exchange.getIn().getHeader(PahoConstants.MQTT_TOPIC));
+ assertEquals(msg, payload);
}
@Test
public void shouldKeepOriginalMessageInHeader() throws InterruptedException {
// Given
final String msg = "msg";
- mock2.expectedBodiesReceived(msg);
+ mock.expectedBodiesReceived(msg);
// When
template.sendBody("direct:test2", msg);
// Then
- mock2.assertIsSatisfied();
- Exchange exchange = mock2.getExchanges().get(0);
- MqttMessage message = exchange.getIn().getHeader(PahoConstants.HEADER_ORIGINAL_MESSAGE, MqttMessage.class);
+ mock.assertIsSatisfied();
+ Exchange exchange = mock.getExchanges().get(0);
+
+ MqttMessage message = exchange.getIn(PahoMessage.class).getMqttMessage();
+ assertNotNull(message);
assertEquals(msg, new String(message.getPayload()));
}