You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/07/14 16:16:33 UTC

[iotdb] 01/01: add an example to customize MQTT Message

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch mqtt_example
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b76e59015cf5a819de14ff94733b4f20f16be0e6
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Jul 15 00:16:01 2021 +0800

    add an example to customize MQTT Message
---
 .../Programming-MQTT.md                            | 79 +++++++++++++++++++++-
 .../Programming-MQTT.md                            | 74 +++++++++++++++++++-
 example/mqtt-customize/README.md                   | 42 ++++++++++++
 example/mqtt-customize/pom.xml                     | 42 ++++++++++++
 .../server/CustomizedJsonPayloadFormatter.java     | 61 +++++++++++++++++
 .../org.apache.iotdb.db.mqtt.PayloadFormatter      |  1 +
 example/pom.xml                                    |  1 +
 7 files changed, 298 insertions(+), 2 deletions(-)

diff --git a/docs/UserGuide/Communication-Service-Protocol/Programming-MQTT.md b/docs/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
index 98a5b96..f063bce 100644
--- a/docs/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
+++ b/docs/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
@@ -100,5 +100,82 @@ connection.disconnect();
 
 ```
 
-## Rest
+### Customize your MQTT Message Format
+
+If you do not like the above Json format, you can customize your MQTT Message format by just writing several lines 
+of codes. An example can be found in `example/mqtt-customize` project.
+
+Steps:
+* Create a java project, and add dependency:
+```xml
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+```
+* Define your implementation which implements `org.apache.iotdb.db.mqtt.PayloadFormatter.java`
+e.g.,
+```java
+package org.apache.iotdb.mqtt.server;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.iotdb.db.mqtt.Message;
+import org.apache.iotdb.db.mqtt.PayloadFormatter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+
+  @Override
+  public List<Message> format(ByteBuf payload) {
+    // Suppose the payload is a json format
+    if (payload == null) {
+      return null;
+    }
+
+    String json = payload.toString(StandardCharsets.UTF_8);
+    // parse data from the json and generate Messages and put them into List<Meesage> ret
+    List<Message> ret = new ArrayList<>();
+    // this is just an example, so we just generate some Messages directly
+    for (int i = 0; i < 2; i++) {
+      long ts = i;
+      Message message = new Message();
+      message.setDevice("d" + i);
+      message.setTimestamp(ts);
+      message.setMeasurements(Arrays.asList("s1", "s2"));
+      message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
+      ret.add(message);
+    }
+    return ret;
+  }
+
+  @Override
+  public String getName() {
+    // set the value of mqtt_payload_formatter in iotdb-engine.properties as the following string:
+    return "CustomizedJson";
+  }
+}
+```
+* modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`:
+  clean the file and put your implementation class name into the file.
+  In this example, the content is: `org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter`
+* compile your implementation as a jar file: `mvn package -DskipTests`
+
+
+Then, in your server:
+* put the jar into your server's lib folder
+* Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-engine.properties`)
+* Set the value of `mqtt_payload_formatter` in `conf/iotdb-engine.properties` as the value of getName() in your implementation
+  , in this example, the value is `CustomizedJson`
+* Launch the IoTDB server.
+* Now IoTDB will use your implementation to parse the MQTT message.
+
+More: the message format can be anything you want. For example, if it is a binary format, 
+just use `payload.forEachByte()` or `payload.array` to get bytes content. 
+
+
 
diff --git a/docs/zh/UserGuide/Communication-Service-Protocol/Programming-MQTT.md b/docs/zh/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
index 5fc4ef9..b5d7f6d 100644
--- a/docs/zh/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
+++ b/docs/zh/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
@@ -100,7 +100,79 @@ for (int i = 0; i < 10; i++) {
 connection.disconnect();
  ```
 
-## Rest 
 
+### 自定义MQTT消息格式
 
+事实上可以通过简单编程来实现MQTT消息的格式自定义。
+可以在源码的`example/mqtt-customize`项目中找到一个简单示例。
 
+步骤:
+* 创建一个Java项目,增加如下依赖
+```xml
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+```
+* 创建一个实现类,实现接口 `org.apache.iotdb.db.mqtt.PayloadFormatter.java`
+```java
+package org.apache.iotdb.mqtt.server;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.iotdb.db.mqtt.Message;
+import org.apache.iotdb.db.mqtt.PayloadFormatter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+
+  @Override
+  public List<Message> format(ByteBuf payload) {
+    // Suppose the payload is a json format
+    if (payload == null) {
+      return null;
+    }
+
+    String json = payload.toString(StandardCharsets.UTF_8);
+    // parse data from the json and generate Messages and put them into List<Meesage> ret
+    List<Message> ret = new ArrayList<>();
+    // this is just an example, so we just generate some Messages directly
+    for (int i = 0; i < 2; i++) {
+      long ts = i;
+      Message message = new Message();
+      message.setDevice("d" + i);
+      message.setTimestamp(ts);
+      message.setMeasurements(Arrays.asList("s1", "s2"));
+      message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
+      ret.add(message);
+    }
+    return ret;
+  }
+
+  @Override
+  public String getName() {
+    // set the value of mqtt_payload_formatter in iotdb-engine.properties as the following string:
+    return "CustomizedJson";
+  }
+}
+```
+* 修改项目中的 `src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`文件:
+  将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。
+  在本例中,文件内容为: `org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter`
+* 编译项目生成一个jar包: `mvn package -DskipTests`
+
+
+在IoTDB服务端:
+* 将刚才的jar包放入IoTDB的lib文件夹内
+* 打开MQTT服务参数. (`enable_mqtt_service=true` in `conf/iotdb-engine.properties`)
+* 用刚才的实现类中的getName() 方法的返回值 设置为 `conf/iotdb-engine.properties`中`mqtt_payload_formatter`的值, 
+  , 在本例中,为 `CustomizedJson`
+* 启动IoTDB
+* 搞定.
+
+More: MQTT协议的消息不限于json,你还可以用任意二进制。通过如下函数获得:
+`payload.forEachByte()` or `payload.array`。 
\ No newline at end of file
diff --git a/example/mqtt-customize/README.md b/example/mqtt-customize/README.md
new file mode 100644
index 0000000..069df51
--- /dev/null
+++ b/example/mqtt-customize/README.md
@@ -0,0 +1,42 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# Customized IoTDB-MQTT-Broker Example
+
+## Function
+```
+The example is to show how to customize your MQTT message format
+```
+
+## Usage
+
+* Define your implementation which implements `PayloadFormatter.java`
+* modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`:
+  clean the file and put your implementation class name into the file
+* compile your implementation as a jar file
+
+  
+Then, in your server: 
+* put the jar into your server's lib folder
+* Update configuration to enable MQTT service. (`enable_mqtt_service=true` in iotdb-engine.properties)
+* et the value of `mqtt_payload_formatter` in `conf/iotdb-engine.properties` as the value of getName() in your implementation 
+* Launch the IoTDB server.
+* Now IoTDB will use your implementation to parse the MQTT message.
+
diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml
new file mode 100644
index 0000000..7098667
--- /dev/null
+++ b/example/mqtt-customize/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-examples</artifactId>
+        <version>0.13.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>customize-mqtt-example</artifactId>
+    <name>Customized IoTDB-MQTT Examples</name>
+    <packaging>jar</packaging>
+    <dependencies>
+        <!-- used by the server-->
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
new file mode 100644
index 0000000..e111a25
--- /dev/null
+++ b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mqtt.server;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.iotdb.db.mqtt.Message;
+import org.apache.iotdb.db.mqtt.PayloadFormatter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+
+  @Override
+  public List<Message> format(ByteBuf payload) {
+    // Suppose the payload is a json format
+    if (payload == null) {
+      return null;
+    }
+
+    String json = payload.toString(StandardCharsets.UTF_8);
+
+    // parse data from the json and generate Messages and put them into List<Meesage> ret
+    List<Message> ret = new ArrayList<>();
+    // this is just an example, so we just generate some Messages directly
+    for (int i = 0; i < 2; i++) {
+      long ts = i;
+      Message message = new Message();
+      message.setDevice("d" + i);
+      message.setTimestamp(ts);
+      message.setMeasurements(Arrays.asList("s1", "s2"));
+      message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
+      ret.add(message);
+    }
+    return ret;
+  }
+
+  @Override
+  public String getName() {
+    // set the value of mqtt_payload_formatter in iotdb-engine.properties as the following string:
+    return "CustomizedJson";
+  }
+}
diff --git a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
new file mode 100644
index 0000000..d434765
--- /dev/null
+++ b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
@@ -0,0 +1 @@
+org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter
\ No newline at end of file
diff --git a/example/pom.xml b/example/pom.xml
index 75fc3ed..f727f2c 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -42,6 +42,7 @@
         <module>hadoop</module>
         <module>flink</module>
         <module>mqtt</module>
+        <module>mqtt-customize</module>
         <module>pulsar</module>
         <module>udf</module>
         <module>trigger</module>