You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/07/29 19:43:48 UTC

[camel-spring-boot-examples] branch main updated: Added camel-paho-mqtt5 with shared subscription example (#37)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new dfad93a  Added camel-paho-mqtt5 with shared subscription example (#37)
dfad93a is described below

commit dfad93a20f2b7496e2f80ebb25528f763c5069f9
Author: Luigi De Masi <55...@users.noreply.github.com>
AuthorDate: Thu Jul 29 21:41:14 2021 +0200

    Added camel-paho-mqtt5 with shared subscription example (#37)
---
 README.adoc                                        |   4 +-
 paho-mqtt5-shared-subscriptions/README.adoc        |  53 +++++++++++++
 .../img/nonSharedSubscription.gif                  | Bin 0 -> 581177 bytes
 .../img/sharedSubscription.gif                     | Bin 0 -> 949203 bytes
 paho-mqtt5-shared-subscriptions/pom.xml            |  84 +++++++++++++++++++++
 .../spring/boot/paho/mqtt5/Application.java        |  34 +++++++++
 .../boot/paho/mqtt5/PahoMqtt5RouteBuilder.java     |  69 +++++++++++++++++
 .../src/main/resources/application.properties      |  26 +++++++
 pom.xml                                            |   3 +-
 9 files changed, 271 insertions(+), 2 deletions(-)

diff --git a/README.adoc b/README.adoc
index 78d8d89..0a56a58 100644
--- a/README.adoc
+++ b/README.adoc
@@ -27,7 +27,7 @@ readme's instructions.
 === Examples
 
 // examples: START
-Number of Examples: 49 (0 deprecated)
+Number of Examples: 50 (0 deprecated)
 
 [width="100%",cols="4,2,4",options="header"]
 |===
@@ -118,6 +118,8 @@ Number of Examples: 49 (0 deprecated)
 
 | link:kafka-offsetrepository/README.adoc[Kafka Offsetrepository] (kafka-offsetrepository) | Messaging | An example for Kafka offsetrepository
 
+| link:paho-mqtt5-shared-subscriptions/README.adoc[Paho Mqtt5 Shared Subscriptions] (paho-mqtt5-shared-subscriptions) | Messaging | An example showing  how to set up multiple mqtt5 consumers that use shared subscription feature of MQTT5
+
 | link:rabbitmq/readme.adoc[Rabbitmq] (rabbitmq) | Messaging | An example showing how to work with Camel and RabbitMQ
 
 | link:strimzi/README.adoc[Strimzi] (strimzi) | Messaging | Camel example which a route is defined in XML for Strimzi integration on Openshift/Kubernetes
diff --git a/paho-mqtt5-shared-subscriptions/README.adoc b/paho-mqtt5-shared-subscriptions/README.adoc
new file mode 100644
index 0000000..05f13e5
--- /dev/null
+++ b/paho-mqtt5-shared-subscriptions/README.adoc
@@ -0,0 +1,53 @@
+= Spring Boot Camel PAHO MQTT5 Shared Subscription Quickstart
+
+This quickstart demonstrates how to set up multiple mqtt5 consumers that use shared subscription feature of mqtt5 in order to balance the load across the consumers.
+
+=== MQTT 5.0 protocol introduction-shared subscription
+Shared subscription is a new feature introduced by MQTT 5.0 protocol, which is equivalent to the load balancing function of the subscribers.
+
+We know that the general non-shared subscription message publishing process is like this:
+
+[#img-nonshared]
+image::img/nonSharedSubscription.gif[]
+
+Under this structure, if the subscription node fails, it will cause the publisher's message to be lost (QoS 0) or accumulate in the server (QoS 1, 2). Under normal circumstances, the solution to this problem is to directly increase the subscription node, but this generates a large number of duplicate messages, not only wasting performance, in some business scenarios, the subscription node also needs to remove itself, further increasing the the complexity.
+
+Secondly, when the publisher's production capacity is strong, there may be a situation where the subscriber's consumption capacity cannot keep up in time. At this time, the subscriber can only solve it by implementing load balancing, which again increases the user's development cost.
+
+=== Protocol specification
+Now, in the MQTT 5.0 protocol, you can solve the problems mentioned above through the shared subscription feature. When you use a shared subscription, the flow of messages becomes:
+
+[#img-nonshared]
+image::img/sharedSubscription.gif[]
+
+
+Like non-shared subscriptions, shared subscriptions include a topic filter and subscription options, the only difference is that the topic filter format for shared subscriptions must be in this form *$share/{ShareName}/{filter}*. The meanings of these fields are:
+
+* *$share* The prefix indicates that this will be a shared subscription
+* *{ShareName}* It is a string that does not contain "/", "+" and "#". Subscribe to the session by using the same *{ShareName}* Indicates sharing the same subscription, and messages matching the subscription will only be published to one of the sessions at a time
+* *{filter}* Is the topic filter in non-shared subscriptions
+
+It should be noted that if the server is sending a QoS 2 message to the selected subscriber, and the network is interrupted before the distribution is completed, the server will continue to complete the distribution of the message when the subscriber reconnects. If the subscriber's session is terminated before it reconnects, the server will discard the message without attempting to send it to other subscribers. If it is a QoS 1 message, the server can wait for the subscriber to reconnect [...]
+
+
+== Run the Quickstart
+
+This quickstart use a public MQTT 5 test server available at the time of writing.
+It contains one producer and six consumers that are part of the same conversation group (group1)
+
+
+. Build the project
++
+....
+ mvn clean package
+....
+
+. Start the fat jar
++
+....
+ mvn spring-boot:run
+....
++
+In the logs, you should see something like this:
++
+image::https://asciinema.org/a/427793.png[link="https://asciinema.org/a/427793"]
diff --git a/paho-mqtt5-shared-subscriptions/img/nonSharedSubscription.gif b/paho-mqtt5-shared-subscriptions/img/nonSharedSubscription.gif
new file mode 100644
index 0000000..a3640f8
Binary files /dev/null and b/paho-mqtt5-shared-subscriptions/img/nonSharedSubscription.gif differ
diff --git a/paho-mqtt5-shared-subscriptions/img/sharedSubscription.gif b/paho-mqtt5-shared-subscriptions/img/sharedSubscription.gif
new file mode 100644
index 0000000..2fa4c0c
Binary files /dev/null and b/paho-mqtt5-shared-subscriptions/img/sharedSubscription.gif differ
diff --git a/paho-mqtt5-shared-subscriptions/pom.xml b/paho-mqtt5-shared-subscriptions/pom.xml
new file mode 100644
index 0000000..47fbf4d
--- /dev/null
+++ b/paho-mqtt5-shared-subscriptions/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>examples</artifactId>
+        <groupId>org.apache.camel.springboot.example</groupId>
+        <version>3.12.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>camel-example-spring-boot-paho-mqtt5-shared-subscriptions</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Camel SB Examples :: Paho MQTT5 Shared Subscriptions</name>
+    <description>An example showing  how to set up multiple mqtt5 consumers that use shared subscription feature of MQTT5</description>
+
+    <properties>
+        <category>Messaging</category>
+        <spring.boot-version>${spring-boot-version}</spring.boot-version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-dependencies</artifactId>
+                <version>${spring.boot-version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.camel.springboot</groupId>
+                <artifactId>camel-spring-boot-dependencies</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.camel.springboot</groupId>
+            <artifactId>camel-spring-boot-starter</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel.springboot</groupId>
+            <artifactId>camel-core-starter</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel.springboot</groupId>
+            <artifactId>camel-paho-mqtt5-starter</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+
+        <!-- we do not want version in the JAR name -->
+        <finalName>${project.artifactId}</finalName>
+
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>${spring-boot-version}</version>
+                <configuration>
+                    <mainClass>org.apache.camel.example.spring.boot.paho.mqtt5.Application</mainClass>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/paho-mqtt5-shared-subscriptions/src/main/java/org/apache/camel/example/spring/boot/paho/mqtt5/Application.java b/paho-mqtt5-shared-subscriptions/src/main/java/org/apache/camel/example/spring/boot/paho/mqtt5/Application.java
new file mode 100644
index 0000000..ac85c7d
--- /dev/null
+++ b/paho-mqtt5-shared-subscriptions/src/main/java/org/apache/camel/example/spring/boot/paho/mqtt5/Application.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.spring.boot.paho.mqtt5;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * The Spring-boot main class.public class Application {
+ */
+
+@SpringBootApplication
+public class Application {
+
+    private String topic = "$share/numbers/topic";
+
+    public static void main(String[] args) {
+        SpringApplication.run(Application.class, args);
+    }
+}
diff --git a/paho-mqtt5-shared-subscriptions/src/main/java/org/apache/camel/example/spring/boot/paho/mqtt5/PahoMqtt5RouteBuilder.java b/paho-mqtt5-shared-subscriptions/src/main/java/org/apache/camel/example/spring/boot/paho/mqtt5/PahoMqtt5RouteBuilder.java
new file mode 100644
index 0000000..b644e92
--- /dev/null
+++ b/paho-mqtt5-shared-subscriptions/src/main/java/org/apache/camel/example/spring/boot/paho/mqtt5/PahoMqtt5RouteBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.spring.boot.paho.mqtt5;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.springframework.stereotype.Component;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.camel.LoggingLevel.DEBUG;
+import static org.apache.camel.LoggingLevel.INFO;
+
+@Component
+public class PahoMqtt5RouteBuilder extends RouteBuilder {
+
+    AtomicInteger counter = new AtomicInteger(0);
+
+    @Override
+    public void configure() throws Exception {
+
+        // The following consumers share the same conversation group (group1) and will be loadbalanced
+        from("paho-mqtt5:{{consumer.topic}}?brokerUrl={{broker.url}}&clientId=consumerA")
+                .id("consumerA")
+                .log(INFO, "CONSUMER A - MESSAGE: ${body}");
+
+        from("paho-mqtt5:{{consumer.topic}}?brokerUrl={{broker.url}}&clientId=consumerB")
+                .id("consumerB")
+                .log(INFO, "CONSUMER B - MESSAGE: ${body}");
+
+        from("paho-mqtt5:{{consumer.topic}}?brokerUrl={{broker.url}}&clientId=consumerC")
+                .id("consumerC")
+                .log(INFO, "CONSUMER C - MESSAGE: ${body}");
+
+        from("paho-mqtt5:{{consumer.topic}}?brokerUrl={{broker.url}}&clientId=consumerD")
+                .id("consumerD")
+                .log(INFO, "CONSUMER D - MESSAGE: ${body}");
+
+        from("paho-mqtt5:{{consumer.topic}}?brokerUrl={{broker.url}}&clientId=consumerE")
+                .id("consumerE")
+                .log(INFO, "CONSUMER E - MESSAGE: ${body}");
+
+        from("paho-mqtt5:{{consumer.topic}}?brokerUrl={{broker.url}}&clientId=consumerF")
+                .id("consumerF")
+                .log(INFO, "CONSUMER F - MESSAGE: ${body}");
+
+        // the producer
+        from("timer://foo?fixedRate=true&period={{producer.period}}")
+                .id("producer")
+                .process(exchange -> {
+                    exchange.getIn().setBody(counter.getAndIncrement()+" - hello world");
+                })
+                .log(DEBUG,"PRODUCER   - MESSAGE: ${body}")
+                .to("paho-mqtt5:{{producer.topic}}?brokerUrl={{broker.url}}&clientId=producer");
+    }
+}
diff --git a/paho-mqtt5-shared-subscriptions/src/main/resources/application.properties b/paho-mqtt5-shared-subscriptions/src/main/resources/application.properties
new file mode 100644
index 0000000..9622629
--- /dev/null
+++ b/paho-mqtt5-shared-subscriptions/src/main/resources/application.properties
@@ -0,0 +1,26 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+logging.level.org.eclipse.paho.mqttv5.client.internal=WARN
+
+broker.url=tcp://broker.emqx.io:1883
+
+consumer.topic=$share/gr1/topic
+
+producer.topic=topic
+
+producer.period=1000
diff --git a/pom.xml b/pom.xml
index 1e66e3a..db3e40f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,7 @@
 		<module>master</module>
 		<module>metrics</module>
 		<module>opentracing</module>
+		<module>paho-mqtt5-shared-subscriptions</module>
 		<module>pojo</module>
 		<module>rabbitmq</module>
 		<module>reactive-streams</module>
@@ -84,7 +85,7 @@
 		<module>splitter-eip</module>
 		<module>widget-gadget</module>
 		<module>unit-testing</module>
-	</modules>
+    </modules>
 
 	<properties>
 		<camel-version>3.12.0-SNAPSHOT</camel-version>