You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/07/14 16:16:33 UTC
[iotdb] 01/01: add an example to customize MQTT Message
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch mqtt_example
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b76e59015cf5a819de14ff94733b4f20f16be0e6
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Jul 15 00:16:01 2021 +0800
add an example to customize MQTT Message
---
.../Programming-MQTT.md | 79 +++++++++++++++++++++-
.../Programming-MQTT.md | 74 +++++++++++++++++++-
example/mqtt-customize/README.md | 42 ++++++++++++
example/mqtt-customize/pom.xml | 42 ++++++++++++
.../server/CustomizedJsonPayloadFormatter.java | 61 +++++++++++++++++
.../org.apache.iotdb.db.mqtt.PayloadFormatter | 1 +
example/pom.xml | 1 +
7 files changed, 298 insertions(+), 2 deletions(-)
diff --git a/docs/UserGuide/Communication-Service-Protocol/Programming-MQTT.md b/docs/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
index 98a5b96..f063bce 100644
--- a/docs/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
+++ b/docs/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
@@ -100,5 +100,82 @@ connection.disconnect();
```
-## Rest
+### Customize your MQTT Message Format
+
+If you do not like the above Json format, you can customize your MQTT Message format by just writing several lines
+of codes. An example can be found in `example/mqtt-customize` project.
+
+Steps:
+* Create a java project, and add dependency:
+```xml
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+```
+* Define your implementation which implements `org.apache.iotdb.db.mqtt.PayloadFormatter.java`
+e.g.,
+```java
+package org.apache.iotdb.mqtt.server;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.iotdb.db.mqtt.Message;
+import org.apache.iotdb.db.mqtt.PayloadFormatter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+
+ @Override
+ public List<Message> format(ByteBuf payload) {
+ // Suppose the payload is a json format
+ if (payload == null) {
+ return null;
+ }
+
+ String json = payload.toString(StandardCharsets.UTF_8);
+ // parse data from the json and generate Messages and put them into List<Meesage> ret
+ List<Message> ret = new ArrayList<>();
+ // this is just an example, so we just generate some Messages directly
+ for (int i = 0; i < 2; i++) {
+ long ts = i;
+ Message message = new Message();
+ message.setDevice("d" + i);
+ message.setTimestamp(ts);
+ message.setMeasurements(Arrays.asList("s1", "s2"));
+ message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
+ ret.add(message);
+ }
+ return ret;
+ }
+
+ @Override
+ public String getName() {
+ // set the value of mqtt_payload_formatter in iotdb-engine.properties as the following string:
+ return "CustomizedJson";
+ }
+}
+```
+* modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`:
+ clean the file and put your implementation class name into the file.
+ In this example, the content is: `org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter`
+* compile your implementation as a jar file: `mvn package -DskipTests`
+
+
+Then, in your server:
+* put the jar into your server's lib folder
+* Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-engine.properties`)
+* Set the value of `mqtt_payload_formatter` in `conf/iotdb-engine.properties` as the value of getName() in your implementation
+ , in this example, the value is `CustomizedJson`
+* Launch the IoTDB server.
+* Now IoTDB will use your implementation to parse the MQTT message.
+
+More: the message format can be anything you want. For example, if it is a binary format,
+just use `payload.forEachByte()` or `payload.array` to get bytes content.
+
+
diff --git a/docs/zh/UserGuide/Communication-Service-Protocol/Programming-MQTT.md b/docs/zh/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
index 5fc4ef9..b5d7f6d 100644
--- a/docs/zh/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
+++ b/docs/zh/UserGuide/Communication-Service-Protocol/Programming-MQTT.md
@@ -100,7 +100,79 @@ for (int i = 0; i < 10; i++) {
connection.disconnect();
```
-## Rest
+### 自定义MQTT消息格式
+事实上可以通过简单编程来实现MQTT消息的格式自定义。
+可以在源码的`example/mqtt-customize`项目中找到一个简单示例。
+步骤:
+* 创建一个Java项目,增加如下依赖
+```xml
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+```
+* 创建一个实现类,实现接口 `org.apache.iotdb.db.mqtt.PayloadFormatter.java`
+```java
+package org.apache.iotdb.mqtt.server;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.iotdb.db.mqtt.Message;
+import org.apache.iotdb.db.mqtt.PayloadFormatter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+
+ @Override
+ public List<Message> format(ByteBuf payload) {
+ // Suppose the payload is a json format
+ if (payload == null) {
+ return null;
+ }
+
+ String json = payload.toString(StandardCharsets.UTF_8);
+ // parse data from the json and generate Messages and put them into List<Meesage> ret
+ List<Message> ret = new ArrayList<>();
+ // this is just an example, so we just generate some Messages directly
+ for (int i = 0; i < 2; i++) {
+ long ts = i;
+ Message message = new Message();
+ message.setDevice("d" + i);
+ message.setTimestamp(ts);
+ message.setMeasurements(Arrays.asList("s1", "s2"));
+ message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
+ ret.add(message);
+ }
+ return ret;
+ }
+
+ @Override
+ public String getName() {
+ // set the value of mqtt_payload_formatter in iotdb-engine.properties as the following string:
+ return "CustomizedJson";
+ }
+}
+```
+* 修改项目中的 `src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`文件:
+ 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。
+ 在本例中,文件内容为: `org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter`
+* 编译项目生成一个jar包: `mvn package -DskipTests`
+
+
+在IoTDB服务端:
+* 将刚才的jar包放入IoTDB的lib文件夹内
+* 打开MQTT服务参数. (`enable_mqtt_service=true` in `conf/iotdb-engine.properties`)
+* 用刚才的实现类中的getName() 方法的返回值 设置为 `conf/iotdb-engine.properties`中`mqtt_payload_formatter`的值,
+ , 在本例中,为 `CustomizedJson`
+* 启动IoTDB
+* 搞定.
+
+More: MQTT协议的消息不限于json,你还可以用任意二进制。通过如下函数获得:
+`payload.forEachByte()` or `payload.array`。
\ No newline at end of file
diff --git a/example/mqtt-customize/README.md b/example/mqtt-customize/README.md
new file mode 100644
index 0000000..069df51
--- /dev/null
+++ b/example/mqtt-customize/README.md
@@ -0,0 +1,42 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+# Customized IoTDB-MQTT-Broker Example
+
+## Function
+```
+The example is to show how to customize your MQTT message format
+```
+
+## Usage
+
+* Define your implementation which implements `PayloadFormatter.java`
+* modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter`:
+ clean the file and put your implementation class name into the file
+* compile your implementation as a jar file
+
+
+Then, in your server:
+* put the jar into your server's lib folder
+* Update configuration to enable MQTT service. (`enable_mqtt_service=true` in iotdb-engine.properties)
+* et the value of `mqtt_payload_formatter` in `conf/iotdb-engine.properties` as the value of getName() in your implementation
+* Launch the IoTDB server.
+* Now IoTDB will use your implementation to parse the MQTT message.
+
diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml
new file mode 100644
index 0000000..7098667
--- /dev/null
+++ b/example/mqtt-customize/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-examples</artifactId>
+ <version>0.13.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>customize-mqtt-example</artifactId>
+ <name>Customized IoTDB-MQTT Examples</name>
+ <packaging>jar</packaging>
+ <dependencies>
+ <!-- used by the server-->
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
new file mode 100644
index 0000000..e111a25
--- /dev/null
+++ b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mqtt.server;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.iotdb.db.mqtt.Message;
+import org.apache.iotdb.db.mqtt.PayloadFormatter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+
+ @Override
+ public List<Message> format(ByteBuf payload) {
+ // Suppose the payload is a json format
+ if (payload == null) {
+ return null;
+ }
+
+ String json = payload.toString(StandardCharsets.UTF_8);
+
+ // parse data from the json and generate Messages and put them into List<Meesage> ret
+ List<Message> ret = new ArrayList<>();
+ // this is just an example, so we just generate some Messages directly
+ for (int i = 0; i < 2; i++) {
+ long ts = i;
+ Message message = new Message();
+ message.setDevice("d" + i);
+ message.setTimestamp(ts);
+ message.setMeasurements(Arrays.asList("s1", "s2"));
+ message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
+ ret.add(message);
+ }
+ return ret;
+ }
+
+ @Override
+ public String getName() {
+ // set the value of mqtt_payload_formatter in iotdb-engine.properties as the following string:
+ return "CustomizedJson";
+ }
+}
diff --git a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
new file mode 100644
index 0000000..d434765
--- /dev/null
+++ b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
@@ -0,0 +1 @@
+org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter
\ No newline at end of file
diff --git a/example/pom.xml b/example/pom.xml
index 75fc3ed..f727f2c 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -42,6 +42,7 @@
<module>hadoop</module>
<module>flink</module>
<module>mqtt</module>
+ <module>mqtt-customize</module>
<module>pulsar</module>
<module>udf</module>
<module>trigger</module>