You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/09 00:17:39 UTC
[incubator-iotdb] branch master updated: [IOTDB-565] MQTT Protocol
Support (#929)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d6207c0 [IOTDB-565] MQTT Protocol Support (#929)
d6207c0 is described below
commit d6207c0745dcf4332a6763c80587808ee861b902
Author: Xin Wang <xi...@apache.org>
AuthorDate: Thu Apr 9 08:17:13 2020 +0800
[IOTDB-565] MQTT Protocol Support (#929)
* [IOTDB-503] Add checkTimeseriesExists for session
* [IOTDB-565] MQTT Protocol Support
* [IOTDB-565] upgrade fastjson to latest version
---
LICENSE | 8 +
LICENSE-binary | 2 +-
docs/UserGuide/4-Client/6-Programming - MQTT.md | 100 ++++
.../{6-Status Codes.md => 7-Status Codes.md} | 0
example/flink/README.md | 3 +-
example/flink/pom.xml | 5 -
.../org/apache/iotdb/flink/FlinkIoTDBSink.java | 6 -
example/{flink => mqtt}/README.md | 15 +-
example/{flink => mqtt}/pom.xml | 24 +-
.../java/org/apache/iotdb/mqtt/MQTTClient.java | 50 ++
example/pom.xml | 1 +
pom.xml | 26 +-
server/pom.xml | 10 +-
.../resources/conf/iotdb-engine.properties | 22 +-
.../java/io/moquette/broker/MQTTConnection.java | 503 +++++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 68 +++
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 7 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 17 +
.../apache/iotdb/db/mqtt/BrokerAuthenticator.java | 48 ++
.../apache/iotdb/db/mqtt/JSONPayloadFormatter.java | 90 ++++
.../java/org/apache/iotdb/db/mqtt/Message.java | 73 +++
.../apache/iotdb/db/mqtt/PayloadFormatManager.java | 47 ++
.../org/apache/iotdb/db/mqtt/PayloadFormatter.java | 40 ++
.../org/apache/iotdb/db/mqtt/PublishHandler.java | 110 +++++
.../java/org/apache/iotdb/db/service/IoTDB.java | 3 +
.../apache/iotdb/db/service/IoTDBShutdownHook.java | 2 +-
.../org/apache/iotdb/db/service/MQTTService.java | 102 +++++
.../org/apache/iotdb/db/service/ServiceType.java | 1 +
.../org.apache.iotdb.db.mqtt.PayloadFormatter | 20 +
.../iotdb/db/mqtt/BrokerAuthenticatorTest.java | 35 ++
.../iotdb/db/mqtt/JSONPayloadFormatTest.java | 69 +++
.../iotdb/db/mqtt/PayloadFormatManagerTest.java | 35 ++
.../apache/iotdb/db/mqtt/PublishHandlerTest.java | 59 +++
site/src/main/.vuepress/config.js | 3 +-
34 files changed, 1555 insertions(+), 49 deletions(-)
diff --git a/LICENSE b/LICENSE
index b88d620..d784353 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,3 +215,11 @@ The following class is modified from Apache commons-collections
./tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Murmur128Hash.java
Relevant pr is: https://github.com/apache/commons-collections/pull/83/
+
+------------
+
+The following class is modified from moquette (https://github.com/moquette-io/moquette),
+which is under Apache License 2.0:
+
+./server/src/main/java/io/moquette/broker/MQTTConnection.java
+Relevant pr is: https://github.com/moquette-io/moquette/pull/454
diff --git a/LICENSE-binary b/LICENSE-binary
index 35127f4..d2a39ea 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -221,7 +221,7 @@ org.apache.commons:commons-collections4:4.0
org.apache.commons:commons-lang3:3.1
org.apache.thrift:libthrift:0.9.3
org.xerial.snappy:snappy-java:1.0.5-M1
-com.alibaba:fastjson:1.2.31
+com.alibaba:fastjson:1.2.67
com.sun.xml.fastinfoset:FastInfoset:1.2.14
io.airlift.airline:0.8
com.google.guava.guava:21.0
diff --git a/docs/UserGuide/4-Client/6-Programming - MQTT.md b/docs/UserGuide/4-Client/6-Programming - MQTT.md
new file mode 100644
index 0000000..0fe588e
--- /dev/null
+++ b/docs/UserGuide/4-Client/6-Programming - MQTT.md
@@ -0,0 +1,100 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+# MQTT Protocol
+
+[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol.
+It was designed as an extremely lightweight publish/subscribe messaging transport.
+It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
+
+IoTDB supports the MQTT v3.1(an OASIS Standard) protocol.
+IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly.
+
+<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/6711230/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
+
+
+## Built-in MQTT Service
+The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
+ and then write the data into storage immediately.
+The MQTT topic is corresponding to IoTDB timeseries.
+The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the default implementation is `JSONPayloadFormatter`.
+The default `json` formatter support two json format, and the following is an MQTT message payload example:
+
+```json
+ {
+ "device":"root.sg.d1",
+ "timestamp":1586076045524,
+ "measurements":["s1","s2"],
+ "values":[0.530635,0.530635]
+ }
+```
+or
+```json
+{
+ "device":"root.sg.d1",
+ "timestamps":[1586076045524,1586076065526],
+ "measurements":["s1","s2"],
+ "values":[[0.530635,0.530635], [0.530655,0.530695]]
+ }
+```
+
+<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/6711230/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png">
+
+## MQTT Configurations
+The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties` by default.
+
+Configurations are as following:
+
+| NAME | DESCRIPTION | DEFAULT |
+| ------------- |:-------------:|:------:|
+| enable_mqtt_service | whether to enable the mqtt service | true |
+| mqtt_host | the mqtt service binding host | 0.0.0.0 |
+| mqtt_port | the mqtt service binding port | 1883 |
+| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 |
+| mqtt_payload_formatter | the mqtt message payload formatter | json |
+
+
+## Examples
+The following is an example which a mqtt client send messages to IoTDB server.
+
+ ```java
+ MQTT mqtt = new MQTT();
+ mqtt.setHost("127.0.0.1", 1883);
+ mqtt.setUserName("root");
+ mqtt.setPassword("root");
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ Random random = new Random();
+ for (int i = 0; i < 10; i++) {
+ String payload = String.format("{\n" +
+ "\"device\":\"root.sg.d1\",\n" +
+ "\"timestamp\":%d,\n" +
+ "\"measurements\":[\"s1\"],\n" +
+ "\"values\":[%f]\n" +
+ "}", System.currentTimeMillis(), random.nextDouble());
+
+ connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+ }
+
+ connection.disconnect();
+ }
+ ```
diff --git a/docs/UserGuide/4-Client/6-Status Codes.md b/docs/UserGuide/4-Client/7-Status Codes.md
similarity index 100%
rename from docs/UserGuide/4-Client/6-Status Codes.md
rename to docs/UserGuide/4-Client/7-Status Codes.md
diff --git a/example/flink/README.md b/example/flink/README.md
index 56b92e4..5059946 100644
--- a/example/flink/README.md
+++ b/example/flink/README.md
@@ -27,7 +27,8 @@ The example is to show how to send data to a IoTDB server from a Flink job.
## Usage
-* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB server and run the flink job on local mini cluster.
+* Launch the IoTDB server.
+* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to run the flink job on local mini cluster.
# TsFile-Flink-Connector Example
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
index e46e834..308edfb 100644
--- a/example/flink/pom.xml
+++ b/example/flink/pom.xml
@@ -38,11 +38,6 @@
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
<artifactId>tsfile</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 0d8bb2f..1048079 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.flink;
import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.iotdb.db.service.IoTDB;
import java.util.HashMap;
import java.util.Map;
@@ -28,11 +27,6 @@ import java.util.Random;
public class FlinkIoTDBSink {
public static void main(String[] args) throws Exception {
- // launch the local iotDB server at default port: 6667
- IoTDB.main(args);
-
- Thread.sleep(3000);
-
// run the flink job on local mini cluster
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/example/flink/README.md b/example/mqtt/README.md
similarity index 60%
copy from example/flink/README.md
copy to example/mqtt/README.md
index 56b92e4..dfdf08f 100644
--- a/example/flink/README.md
+++ b/example/mqtt/README.md
@@ -18,20 +18,15 @@
under the License.
-->
-# IoTDB-Flink-Connector Example
+# IoTDB-MQTT-Broker Example
## Function
```
-The example is to show how to send data to a IoTDB server from a Flink job.
+The example is to show how to send data to IoTDB from a mqtt client.
```
## Usage
-* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB server and run the flink job on local mini cluster.
-
-# TsFile-Flink-Connector Example
-
-## Usage
-
-* Run `org.apache.iotdb.flink.FlinkTsFileBatchSource.java` to create a tsfile and read it via a flink DataSet job on local mini cluster.
-* Run `org.apache.iotdb.flink.FlinkTsFileStreamSource.java` to create a tsfile and read it via a flink DataStream job on local mini cluster.
+* Launch the IoTDB server.
+* setup storage group `SET STORAGE GROUP TO root.sg` and create time timeseries `CREATE TIMESERIES root.sg.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN`.
+* Run `org.apache.iotdb.mqtt.MQTTClient` to run the mqtt client and send events to server.
diff --git a/example/flink/pom.xml b/example/mqtt/pom.xml
similarity index 63%
copy from example/flink/pom.xml
copy to example/mqtt/pom.xml
index e46e834..485352b 100644
--- a/example/flink/pom.xml
+++ b/example/mqtt/pom.xml
@@ -27,29 +27,13 @@
<version>0.10.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>flink-example</artifactId>
- <name>IoTDB-Flink Examples</name>
+ <artifactId>mqtt-example</artifactId>
+ <name>IoTDB-MQTT Examples</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>flink-iotdb-connector</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>tsfile</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>flink-tsfile-connector</artifactId>
- <version>${project.version}</version>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
</dependency>
</dependencies>
</project>
diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
new file mode 100644
index 0000000..54f0665
--- /dev/null
+++ b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.mqtt;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+
+import java.util.Random;
+
+public class MQTTClient {
+ public static void main(String[] args) throws Exception {
+ MQTT mqtt = new MQTT();
+ mqtt.setHost("127.0.0.1", 1883);
+ mqtt.setUserName("root");
+ mqtt.setPassword("root");
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ Random random = new Random();
+ for (int i = 0; i < 10; i++) {
+ String payload = String.format("{\n" +
+ "\"device\":\"root.sg.d1\",\n" +
+ "\"timestamp\":%d,\n" +
+ "\"measurements\":[\"s1\"],\n" +
+ "\"values\":[%f]\n" +
+ "}", System.currentTimeMillis(), random.nextDouble());
+
+ connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+ }
+
+ connection.disconnect();
+ }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 283f2d8..4b336bd 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -41,6 +41,7 @@
<module>jdbc</module>
<module>hadoop</module>
<module>flink</module>
+ <module>mqtt</module>
</modules>
<build>
<pluginManagement>
diff --git a/pom.xml b/pom.xml
index 647466f..56692a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
<common.lang3.version>3.8.1</common.lang3.version>
<common.logging.version>1.1.3</common.logging.version>
<guava.version>21.0</guava.version>
+ <fastjson.version>1.2.67</fastjson.version>
<jline.version>2.14.5</jline.version>
<jetty.version>9.4.24.v20191120</jetty.version>
<metrics.version>3.2.6</metrics.version>
@@ -135,6 +136,19 @@
<!-- By default, the argLine is empty-->
<argLine/>
</properties>
+ <repositories>
+ <!-- repository for moquette -->
+ <repository>
+ <id>bintray</id>
+ <url>https://jcenter.bintray.com</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
their version in sub-project's pom, but we have to claim themselves again
@@ -154,7 +168,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
- <version>1.2.31</version>
+ <version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
@@ -459,6 +473,16 @@
<artifactId>airline</artifactId>
<version>${airline.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.moquette</groupId>
+ <artifactId>moquette-broker</artifactId>
+ <version>0.12.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>1.12</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<dependencies>
diff --git a/server/pom.xml b/server/pom.xml
index b231975..618f768 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -61,12 +61,14 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.antlr/antlr-runtime4 -->
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -95,6 +97,10 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.moquette</groupId>
+ <artifactId>moquette-broker</artifactId>
+ </dependency>
<!-- for mocked test-->
<dependency>
<groupId>org.powermock</groupId>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 39ac594..e0a13d7 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -451,4 +451,24 @@ partition_interval=604800
# For example, your partitionInterval is 86400 and you want to insert data in 3 different days,
# you should set this param >= 6 (for sequence and unsequence)
# default number is 10
-memtable_num_in_each_storage_group=10
\ No newline at end of file
+memtable_num_in_each_storage_group=10
+
+
+####################
+### MQTT Broker Configuration
+####################
+
+# whether to enable the mqtt service.
+enable_mqtt_service=true
+
+# the mqtt service binding host.
+mqtt_host=0.0.0.0
+
+# the mqtt service binding port.
+mqtt_port=1883
+
+# the handler pool size for handing the mqtt messages.
+mqtt_handler_pool_size=1
+
+# the mqtt message payload formatter.
+mqtt_payload_formatter=json
diff --git a/server/src/main/java/io/moquette/broker/MQTTConnection.java b/server/src/main/java/io/moquette/broker/MQTTConnection.java
new file mode 100644
index 0000000..4c4a8fa
--- /dev/null
+++ b/server/src/main/java/io/moquette/broker/MQTTConnection.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.moquette.broker;
+
+import io.moquette.broker.subscriptions.Topic;
+import io.moquette.broker.security.IAuthenticator;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
+import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
+import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
+import static io.netty.handler.codec.mqtt.MqttQoS.*;
+
+// NOTE:
+// override the MQTTConnection class in the moquette 0.12.1 jar to fix the PUBACK flush issue
+// https://github.com/moquette-io/moquette/pull/454
+// when moquette fixed version released, we can remove this.
+final class MQTTConnection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTConnection.class);
+
+ final Channel channel;
+ private BrokerConfiguration brokerConfig;
+ private IAuthenticator authenticator;
+ private SessionRegistry sessionRegistry;
+ private final PostOffice postOffice;
+ private boolean connected;
+ private final AtomicInteger lastPacketId = new AtomicInteger(0);
+
+ MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator,
+ SessionRegistry sessionRegistry, PostOffice postOffice) {
+ this.channel = channel;
+ this.brokerConfig = brokerConfig;
+ this.authenticator = authenticator;
+ this.sessionRegistry = sessionRegistry;
+ this.postOffice = postOffice;
+ this.connected = false;
+ }
+
+ void handleMessage(MqttMessage msg) {
+ MqttMessageType messageType = msg.fixedHeader().messageType();
+ LOG.debug("Received MQTT message, type: {}, channel: {}", messageType, channel);
+ switch (messageType) {
+ case CONNECT:
+ processConnect((MqttConnectMessage) msg);
+ break;
+ case SUBSCRIBE:
+ processSubscribe((MqttSubscribeMessage) msg);
+ break;
+ case UNSUBSCRIBE:
+ processUnsubscribe((MqttUnsubscribeMessage) msg);
+ break;
+ case PUBLISH:
+ processPublish((MqttPublishMessage) msg);
+ break;
+ case PUBREC:
+ processPubRec(msg);
+ break;
+ case PUBCOMP:
+ processPubComp(msg);
+ break;
+ case PUBREL:
+ processPubRel(msg);
+ break;
+ case DISCONNECT:
+ processDisconnect(msg);
+ break;
+ case PUBACK:
+ processPubAck(msg);
+ break;
+ case PINGREQ:
+ MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, AT_MOST_ONCE,
+ false, 0);
+ MqttMessage pingResp = new MqttMessage(pingHeader);
+ channel.writeAndFlush(pingResp).addListener(CLOSE_ON_FAILURE);
+ break;
+ default:
+ LOG.error("Unknown MessageType: {}, channel: {}", messageType, channel);
+ break;
+ }
+ }
+
+ private void processPubComp(MqttMessage msg) {
+ final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
+ final Session session = sessionRegistry.retrieve(getClientId());
+ session.processPubComp(messageID);
+ }
+
+ private void processPubRec(MqttMessage msg) {
+ final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
+ final Session session = sessionRegistry.retrieve(getClientId());
+ session.processPubRec(messageID);
+ }
+
+ static MqttMessage pubrel(int messageID) {
+ MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
+ return new MqttMessage(pubRelHeader, from(messageID));
+ }
+
+ private void processPubAck(MqttMessage msg) {
+ final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
+ Session session = sessionRegistry.retrieve(getClientId());
+ session.pubAckReceived(messageID);
+ }
+
+ void processConnect(MqttConnectMessage msg) {
+ MqttConnectPayload payload = msg.payload();
+ String clientId = payload.clientIdentifier();
+ final String username = payload.userName();
+ LOG.trace("Processing CONNECT message. CId={} username: {} channel: {}", clientId, username, channel);
+
+ if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
+ LOG.warn("MQTT protocol version is not valid. CId={} channel: {}", clientId, channel);
+ abortConnection(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
+ return;
+ }
+ final boolean cleanSession = msg.variableHeader().isCleanSession();
+ if (clientId == null || clientId.length() == 0) {
+ if (!brokerConfig.isAllowZeroByteClientId()) {
+ LOG.warn("Broker doesn't permit MQTT empty client ID. Username: {}, channel: {}", username, channel);
+ abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
+ return;
+ }
+
+ if (!cleanSession) {
+ LOG.warn("MQTT client ID cannot be empty for persistent session. Username: {}, channel: {}",
+ username, channel);
+ abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
+ return;
+ }
+
+ // Generating client id.
+ clientId = UUID.randomUUID().toString().replace("-", "");
+ LOG.debug("Client has connected with integration generated id: {}, username: {}, channel: {}", clientId,
+ username, channel);
+ }
+
+ if (!login(msg, clientId)) {
+ abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
+ channel.close().addListener(CLOSE_ON_FAILURE);
+ return;
+ }
+
+ try {
+ LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
+ sessionRegistry.bindToSession(this, msg, clientId);
+
+ initializeKeepAliveTimeout(channel, msg, clientId);
+ setupInflightResender(channel);
+
+ NettyUtils.clientID(channel, clientId);
+ LOG.trace("CONNACK sent, channel: {}", channel);
+ postOffice.dispatchConnection(msg);
+ LOG.trace("dispatch connection: {}", msg.toString());
+ } catch (SessionCorruptedException scex) {
+ LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", clientId, channel);
+ abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+ }
+ }
+
+ private void setupInflightResender(Channel channel) {
+ channel.pipeline()
+ .addFirst("inflightResender", new InflightResender(5_000, TimeUnit.MILLISECONDS));
+ }
+
+ private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
+ int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
+ NettyUtils.keepAlive(channel, keepAlive);
+ NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
+ NettyUtils.clientID(channel, clientId);
+ int idleTime = Math.round(keepAlive * 1.5f);
+ setIdleTime(channel.pipeline(), idleTime);
+
+ LOG.debug("Connection has been configured CId={}, keepAlive={}, removeTemporaryQoS2={}, idleTime={}",
+ clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime);
+ }
+
+ private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
+ if (pipeline.names().contains("idleStateHandler")) {
+ pipeline.remove("idleStateHandler");
+ }
+ pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
+ }
+
+ private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
+ return msg.variableHeader().version() != version.protocolLevel();
+ }
+
+ private void abortConnection(MqttConnectReturnCode returnCode) {
+ MqttConnAckMessage badProto = connAck(returnCode, false);
+ channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
+ channel.close().addListener(CLOSE_ON_FAILURE);
+ }
+
+ private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) {
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
+ false, 0);
+ MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
+ return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
+ }
+
+ private boolean login(MqttConnectMessage msg, final String clientId) {
+ // handle user authentication
+ if (msg.variableHeader().hasUserName()) {
+ byte[] pwd = null;
+ if (msg.variableHeader().hasPassword()) {
+ pwd = msg.payload().password().getBytes(StandardCharsets.UTF_8);
+ } else if (!brokerConfig.isAllowAnonymous()) {
+ LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
+ return false;
+ }
+ final String login = msg.payload().userName();
+ if (!authenticator.checkValid(clientId, login, pwd)) {
+ LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
+ return false;
+ }
+ NettyUtils.userName(channel, login);
+ } else if (!brokerConfig.isAllowAnonymous()) {
+ LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
+ return false;
+ }
+ return true;
+ }
+
+ void handleConnectionLost() {
+ String clientID = NettyUtils.clientID(channel);
+ if (clientID == null || clientID.isEmpty()) {
+ return;
+ }
+ LOG.info("Notifying connection lost event. CId: {}, channel: {}", clientID, channel);
+ Session session = sessionRegistry.retrieve(clientID);
+ if (session.hasWill()) {
+ postOffice.fireWill(session.getWill());
+ }
+ if (session.isClean()) {
+ sessionRegistry.remove(clientID);
+ } else {
+ sessionRegistry.disconnect(clientID);
+ }
+ connected = false;
+ //dispatch connection lost to intercept.
+ String userName = NettyUtils.userName(channel);
+ postOffice.dispatchConnectionLost(clientID,userName);
+ LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
+ }
+
+ void sendConnAck(boolean isSessionAlreadyPresent) {
+ connected = true;
+ final MqttConnAckMessage ackMessage = connAck(CONNECTION_ACCEPTED, isSessionAlreadyPresent);
+ channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
+ }
+
+ boolean isConnected() {
+ return connected;
+ }
+
+ void dropConnection() {
+ channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
+ }
+
+ void processDisconnect(MqttMessage msg) {
+ final String clientID = NettyUtils.clientID(channel);
+ LOG.trace("Start DISCONNECT CId={}, channel: {}", clientID, channel);
+ if (!connected) {
+ LOG.info("DISCONNECT received on already closed connection, CId={}, channel: {}", clientID, channel);
+ return;
+ }
+ sessionRegistry.disconnect(clientID);
+ connected = false;
+ channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
+ LOG.trace("Processed DISCONNECT CId={}, channel: {}", clientID, channel);
+ String userName = NettyUtils.userName(channel);
+ postOffice.dispatchDisconnection(clientID,userName);
+ LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
+ }
+
+ void processSubscribe(MqttSubscribeMessage msg) {
+ final String clientID = NettyUtils.clientID(channel);
+ if (!connected) {
+ LOG.warn("SUBSCRIBE received on already closed connection, CId={}, channel: {}", clientID, channel);
+ dropConnection();
+ return;
+ }
+ postOffice.subscribeClientToTopics(msg, clientID, NettyUtils.userName(channel), this);
+ }
+
+ void sendSubAckMessage(int messageID, MqttSubAckMessage ackMessage) {
+ final String clientId = NettyUtils.clientID(channel);
+ LOG.trace("Sending SUBACK response CId={}, messageId: {}", clientId, messageID);
+ channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
+ }
+
+ private void processUnsubscribe(MqttUnsubscribeMessage msg) {
+ List<String> topics = msg.payload().topics();
+ String clientID = NettyUtils.clientID(channel);
+
+ LOG.trace("Processing UNSUBSCRIBE message. CId={}, topics: {}", clientID, topics);
+ postOffice.unsubscribe(topics, this, msg.variableHeader().messageId());
+ }
+
+ void sendUnsubAckMessage(List<String> topics, String clientID, int messageID) {
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_MOST_ONCE,
+ false, 0);
+ MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));
+
+ LOG.trace("Sending UNSUBACK message. CId={}, messageId: {}, topics: {}", clientID, messageID, topics);
+ channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
+ LOG.trace("Client <{}> unsubscribed from topics <{}>", clientID, topics);
+ }
+
+ void processPublish(MqttPublishMessage msg) {
+ final MqttQoS qos = msg.fixedHeader().qosLevel();
+ final String username = NettyUtils.userName(channel);
+ final String topicName = msg.variableHeader().topicName();
+ final String clientId = getClientId();
+ LOG.trace("Processing PUBLISH message. CId={}, topic: {}, messageId: {}, qos: {}", clientId, topicName,
+ msg.variableHeader().packetId(), qos);
+ ByteBuf payload = msg.payload();
+ final boolean retain = msg.fixedHeader().isRetain();
+ final Topic topic = new Topic(topicName);
+ if (!topic.isValid()) {
+ LOG.debug("Drop connection because of invalid topic format");
+ dropConnection();
+ }
+ switch (qos) {
+ case AT_MOST_ONCE:
+ postOffice.receivedPublishQos0(topic, username, clientId, payload, retain, msg);
+ break;
+ case AT_LEAST_ONCE: {
+ final int messageID = msg.variableHeader().packetId();
+ postOffice.receivedPublishQos1(this, topic, username, payload, messageID, retain, msg);
+ break;
+ }
+ case EXACTLY_ONCE: {
+ final int messageID = msg.variableHeader().packetId();
+ final Session session = sessionRegistry.retrieve(clientId);
+ session.receivedPublishQos2(messageID, msg);
+ postOffice.receivedPublishQos2(this, msg, username);
+// msg.release();
+ break;
+ }
+ default:
+ LOG.error("Unknown QoS-Type:{}", qos);
+ break;
+ }
+ }
+
+ void sendPublishReceived(int messageID) {
+ LOG.trace("sendPubRec invoked on channel: {}", channel);
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE,
+ false, 0);
+ MqttPubAckMessage pubRecMessage = new MqttPubAckMessage(fixedHeader, from(messageID));
+ sendIfWritableElseDrop(pubRecMessage);
+ }
+
+ private void processPubRel(MqttMessage msg) {
+ final Session session = sessionRegistry.retrieve(getClientId());
+ final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
+ session.receivedPubRelQos2(messageID);
+ sendPubCompMessage(messageID);
+ }
+
+ void sendPublish(MqttPublishMessage publishMsg) {
+ final int packetId = publishMsg.variableHeader().packetId();
+ final String topicName = publishMsg.variableHeader().topicName();
+ final String clientId = getClientId();
+ MqttQoS qos = publishMsg.fixedHeader().qosLevel();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}, payload={}", qos, packetId,
+ clientId, topicName, DebugUtils.payload2Str(publishMsg.payload()));
+ } else {
+ LOG.debug("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}", qos, packetId, clientId,
+ topicName);
+ }
+ sendIfWritableElseDrop(publishMsg);
+ }
+
+ void sendIfWritableElseDrop(MqttMessage msg) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("OUT {} on channel {}", msg.fixedHeader().messageType(), channel);
+ }
+ if (channel.isWritable()) {
+ // flushing PUBACK immediately
+ channel.writeAndFlush(msg).addListener(FIRE_EXCEPTION_ON_FAILURE);
+ }
+ }
+
+ public void writabilityChanged() {
+ if (channel.isWritable()) {
+ LOG.debug("Channel {} is again writable", channel);
+ final Session session = sessionRegistry.retrieve(getClientId());
+ session.writabilityChanged();
+ }
+ }
+
+ void sendPubAck(int messageID) {
+ LOG.trace("sendPubAck invoked");
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE,
+ false, 0);
+ MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, from(messageID));
+ sendIfWritableElseDrop(pubAckMessage);
+ }
+
+ private void sendPubCompMessage(int messageID) {
+ LOG.trace("Sending PUBCOMP message on channel: {}, messageId: {}", channel, messageID);
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
+ MqttMessage pubCompMessage = new MqttMessage(fixedHeader, from(messageID));
+ sendIfWritableElseDrop(pubCompMessage);
+ }
+
+ String getClientId() {
+ return NettyUtils.clientID(channel);
+ }
+
+ String getUsername() {
+ return NettyUtils.userName(channel);
+ }
+
+ public void sendPublishRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
+ MqttPublishMessage publishMsg = retainedPublish(topic.toString(), qos, payload);
+ sendPublish(publishMsg);
+ }
+
+ public void sendPublishRetainedWithPacketId(Topic topic, MqttQoS qos, ByteBuf payload) {
+ final int packetId = nextPacketId();
+ MqttPublishMessage publishMsg = retainedPublishWithMessageId(topic.toString(), qos, payload, packetId);
+ sendPublish(publishMsg);
+ }
+
+ private static MqttPublishMessage retainedPublish(String topic, MqttQoS qos, ByteBuf message) {
+ return retainedPublishWithMessageId(topic, qos, message, 0);
+ }
+
+ private static MqttPublishMessage retainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message,
+ int messageId) {
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, true, 0);
+ MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
+ return new MqttPublishMessage(fixedHeader, varHeader, message);
+ }
+
+ // TODO move this method in Session
+ void sendPublishNotRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
+ MqttPublishMessage publishMsg = notRetainedPublish(topic.toString(), qos, payload);
+ sendPublish(publishMsg);
+ }
+
+ static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
+ return notRetainedPublishWithMessageId(topic, qos, message, 0);
+ }
+
+ static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message,
+ int messageId) {
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
+ MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
+ return new MqttPublishMessage(fixedHeader, varHeader, message);
+ }
+
+ public void resendNotAckedPublishes() {
+ final Session session = sessionRegistry.retrieve(getClientId());
+ session.resendInflightNotAcked();
+ }
+
+ int nextPacketId() {
+ return lastPacketId.incrementAndGet();
+ }
+
+ @Override
+ public String toString() {
+ return "MQTTConnection{channel=" + channel + ", connected=" + connected + '}';
+ }
+
+ InetSocketAddress remoteAddress() {
+ return (InetSocketAddress) channel.remoteAddress();
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c0ce887..fa3013a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -50,6 +50,34 @@ public class IoTDBConfig {
private boolean enableMetricService = false;
+ /**
+ * whether to enable the mqtt service.
+ */
+ private boolean enableMQTTService = true;
+
+ /**
+ * the mqtt service binding host.
+ */
+ private String mqttHost = "0.0.0.0";
+
+ /**
+ * the mqtt service binding port.
+ */
+ private int mqttPort = 1883;
+
+ /**
+ * the handler pool size for handing the mqtt messages.
+ */
+ private int mqttHandlerPoolSize = 1;
+
+ /**
+ * the mqtt message payload formatter.
+ */
+ private String mqttPayloadFormatter = "json";
+
+ /**
+ * Rpc binding address.
+ */
private String rpcAddress = "0.0.0.0";
/**
@@ -1413,4 +1441,44 @@ public class IoTDBConfig {
public void setQueryCacheSizeInMetric(int queryCacheSizeInMetric) {
this.queryCacheSizeInMetric = queryCacheSizeInMetric;
}
+
+ public boolean isEnableMQTTService() {
+ return enableMQTTService;
+ }
+
+ public void setEnableMQTTService(boolean enableMQTTService) {
+ this.enableMQTTService = enableMQTTService;
+ }
+
+ public String getMqttHost() {
+ return mqttHost;
+ }
+
+ public void setMqttHost(String mqttHost) {
+ this.mqttHost = mqttHost;
+ }
+
+ public int getMqttPort() {
+ return mqttPort;
+ }
+
+ public void setMqttPort(int mqttPort) {
+ this.mqttPort = mqttPort;
+ }
+
+ public int getMqttHandlerPoolSize() {
+ return mqttHandlerPoolSize;
+ }
+
+ public void setMqttHandlerPoolSize(int mqttHandlerPoolSize) {
+ this.mqttHandlerPoolSize = mqttHandlerPoolSize;
+ }
+
+ public String getMqttPayloadFormatter() {
+ return mqttPayloadFormatter;
+ }
+
+ public void setMqttPayloadFormatter(String mqttPayloadFormatter) {
+ this.mqttPayloadFormatter = mqttPayloadFormatter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index c52692f..0088ddf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -88,4 +88,11 @@ public class IoTDBConstant {
public static final String SCHEMA_FOLDER_NAME = "schema";
public static final String SYNC_FOLDER_NAME = "sync";
public static final String QUERY_FOLDER_NAME = "query";
+
+ // mqtt
+ public static final String ENABLE_MQTT = "enable_mqtt_service";
+ public static final String MQTT_HOST_NAME = "mqtt_host";
+ public static final String MQTT_PORT_NAME = "mqtt_port";
+ public static final String MQTT_HANDLER_POOL_SIZE_NAME = "mqtt_handler_pool_size";
+ public static final String MQTT_PAYLOAD_FORMATTER_NAME = "mqtt_payload_formatter";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f40a82d..0c94ac7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -339,6 +339,23 @@ public class IoTDBDescriptor {
Integer.parseInt(properties.getProperty("default_fill_interval",
String.valueOf(conf.getDefaultFillInterval()))));
+ // mqtt
+ if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) {
+ conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME));
+ }
+ if (properties.getProperty(IoTDBConstant.MQTT_PORT_NAME) != null) {
+ conf.setMqttPort(Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_PORT_NAME)));
+ }
+ if (properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME) != null) {
+ conf.setMqttHandlerPoolSize(Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME)));
+ }
+ if (properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME) != null) {
+ conf.setMqttPayloadFormatter(properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME));
+ }
+ if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) {
+ conf.setEnableMQTTService(Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT)));
+ }
+
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance().getConfig()
.setTSFileStorageFs(FSType.valueOf(
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/BrokerAuthenticator.java b/server/src/main/java/org/apache/iotdb/db/mqtt/BrokerAuthenticator.java
new file mode 100644
index 0000000..8d2d0ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/BrokerAuthenticator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.moquette.broker.security.IAuthenticator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The MQTT broker authenticator.
+ */
+public class BrokerAuthenticator implements IAuthenticator {
+ private static final Logger LOG = LoggerFactory.getLogger(BrokerAuthenticator.class);
+
+ @Override
+ public boolean checkValid(String clientId, String username, byte[] password) {
+ if (StringUtils.isBlank(username) || password == null) {
+ return false;
+ }
+
+ try {
+ IAuthorizer authorizer = LocalFileAuthorizer.getInstance();
+ return authorizer.login(username, new String(password));
+ } catch (AuthException e) {
+ LOG.info("meet error while logging in.", e);
+ return false;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatter.java b/server/src/main/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatter.java
new file mode 100644
index 0000000..88e0966
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The JSON payload formatter.
+ * two json format supported:
+ * {
+ * "device":"root.sg.d1",
+ * "timestamp":1586076045524,
+ * "measurements":["s1","s2"],
+ * "values":[0.530635,0.530635]
+ * }
+ *
+ * {
+ * "device":"root.sg.d1",
+ * "timestamps":[1586076045524,1586076065526],
+ * "measurements":["s1","s2"],
+ * "values":[[0.530635,0.530635], [0.530655,0.530695]]
+ * }
+ */
+public class JSONPayloadFormatter implements PayloadFormatter {
+ private static final String JSON_KEY_DEVICE = "device";
+ private static final String JSON_KEY_TIMESTAMP = "timestamp";
+ private static final String JSON_KEY_TIMESTAMPS = "timestamps";
+ private static final String JSON_KEY_MEASUREMENTS = "measurements";
+ private static final String JSON_KEY_VALUES = "values";
+
+ @Override
+ public List<Message> format(ByteBuf payload) {
+ if (payload == null) {
+ return null;
+ }
+ String txt = payload.toString(StandardCharsets.UTF_8);
+ JSONObject jsonObject = JSON.parseObject(txt);
+
+ Object timestamp = jsonObject.get(JSON_KEY_TIMESTAMP);
+ if (timestamp != null) {
+ return Lists.newArrayList(JSON.parseObject(txt, Message.class));
+ }
+
+ String device = jsonObject.getString(JSON_KEY_DEVICE);
+ JSONArray timestamps = jsonObject.getJSONArray(JSON_KEY_TIMESTAMPS);
+ JSONArray measurements = jsonObject.getJSONArray(JSON_KEY_MEASUREMENTS);
+ JSONArray values = jsonObject.getJSONArray(JSON_KEY_VALUES);
+
+ List<Message> ret = new ArrayList<>();
+ for (int i = 0; i < timestamps.size(); i++) {
+ Long ts = timestamps.getLong(i);
+
+ Message message = new Message();
+ message.setDevice(device);
+ message.setTimestamp(ts);
+ message.setMeasurements(measurements.toJavaList(String.class));
+ message.setValues(((JSONArray)values.get(i)).toJavaList(String.class));
+ ret.add(message);
+ }
+ return ret;
+ }
+
+ @Override
+ public String getName() {
+ return "json";
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/Message.java b/server/src/main/java/org/apache/iotdb/db/mqtt/Message.java
new file mode 100644
index 0000000..d6a43a6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/Message.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.mqtt;
+
+import java.util.List;
+
+/**
+ * Message describes the information sometime sent from the devices.
+ */
+public class Message {
+ private String device;
+ private Long timestamp;
+ private List<String> measurements;
+ private List<String> values;
+
+ public String getDevice() {
+ return device;
+ }
+
+ public void setDevice(String device) {
+ this.device = device;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public List<String> getMeasurements() {
+ return measurements;
+ }
+
+ public void setMeasurements(List<String> measurements) {
+ this.measurements = measurements;
+ }
+
+ public List<String> getValues() {
+ return values;
+ }
+
+ public void setValues(List<String> values) {
+ this.values = values;
+ }
+
+ @Override
+ public String toString() {
+ return "Message{" +
+ "device='" + device + '\'' +
+ ", timestamp=" + timestamp +
+ ", measurements=" + measurements +
+ ", values=" + values +
+ '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatManager.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatManager.java
new file mode 100644
index 0000000..eb0c5d6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/**
+ * PayloadFormatManager loads payload formatter from SPI services.
+ */
+public class PayloadFormatManager {
+ private static Map<String, PayloadFormatter> map = new HashMap<>();
+
+ static {
+ init();
+ }
+
+ private static void init() {
+ ServiceLoader<PayloadFormatter> formats = ServiceLoader.load(PayloadFormatter.class);
+ for (PayloadFormatter format : formats) {
+ map.put(format.getName(), format);
+ }
+ }
+
+ public static PayloadFormatter getPayloadFormat(String name) {
+ Preconditions.checkArgument(map.containsKey(name), "Unknown payload format named: " + name);
+ return map.get(name);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatter.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatter.java
new file mode 100644
index 0000000..d99e2d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+
+/**
+ * PayloadFormatter format the payload to the messages.
+ */
+public interface PayloadFormatter {
+ /**
+ * format a payload to a list of messages
+ * @param payload
+ * @return
+ */
+ List<Message> format(ByteBuf payload);
+
+ /**
+ * get the formatter name
+ * @return
+ */
+ String getName();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
new file mode 100644
index 0000000..57cb3aa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.moquette.interception.AbstractInterceptHandler;
+import io.moquette.interception.messages.InterceptPublishMessage;
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * PublishHandler handle the messages from MQTT clients.
+ */
+public class PublishHandler extends AbstractInterceptHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
+
+ private IPlanExecutor executor;
+ private PayloadFormatter payloadFormat;
+
+ public PublishHandler(IoTDBConfig config) {
+ this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
+ try {
+ this.executor = new PlanExecutor();
+ } catch (QueryProcessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
+ this.executor = executor;
+ this.payloadFormat = payloadFormat;
+ }
+
+ @Override
+ public String getID() {
+ return "iotdb-mqtt-broker-listener";
+ }
+
+ @Override
+ public void onPublish(InterceptPublishMessage msg) {
+ String clientId = msg.getClientID();
+ ByteBuf payload = msg.getPayload();
+ String topic = msg.getTopicName();
+ String username = msg.getUsername();
+ MqttQoS qos = msg.getQos();
+
+ LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
+ clientId, username, qos, topic, payload);
+
+ List<Message> events = payloadFormat.format(payload);
+ if (events == null) {
+ return;
+ }
+
+ // since device ids from messages maybe different, so we use the InsertPlan not BatchInsertPlan.
+ for (Message event : events) {
+ if (event == null) {
+ continue;
+ }
+
+ InsertPlan plan = new InsertPlan();
+ plan.setDeviceId(event.getDevice());
+ plan.setTime(event.getTimestamp());
+ plan.setMeasurements(event.getMeasurements().toArray(new String[event.getMeasurements().size()]));
+ plan.setValues(event.getValues().toArray(new String[event.getValues().size()]));
+
+ boolean status;
+ try {
+ status = executeNonQuery(plan);
+ } catch (QueryProcessException e) {
+ throw new RuntimeException(e);
+ }
+
+ LOG.debug("event process result: {}", status);
+ }
+ }
+
+ private boolean executeNonQuery(PhysicalPlan plan) throws QueryProcessException {
+ if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+ throw new QueryProcessException(
+ "Current system mode is read-only, does not support non-query operation");
+ }
+ return executor.processNonQuery(plan);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 6f15bfb..a0cbd48 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -99,6 +99,9 @@ public class IoTDB implements IoTDBMBean {
if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
registerManager.register(MetricsService.getInstance());
}
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
+ registerManager.register(MQTTService.getInstance());
+ }
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(StorageEngine.getInstance());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index caa0439..af560d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.utils.MemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class IoTDBShutdownHook extends Thread{
+public class IoTDBShutdownHook extends Thread {
private static final Logger logger = LoggerFactory.getLogger(IoTDBShutdownHook.class);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java b/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java
new file mode 100644
index 0000000..a49c02c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service;
+
+import com.google.common.collect.Lists;
+import io.moquette.BrokerConstants;
+import io.moquette.broker.Server;
+import io.moquette.broker.config.IConfig;
+import io.moquette.broker.config.MemoryConfig;
+import io.moquette.broker.security.IAuthenticator;
+import io.moquette.interception.InterceptHandler;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.mqtt.BrokerAuthenticator;
+import org.apache.iotdb.db.mqtt.PublishHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * The IoTDB MQTT Service.
+ */
+public class MQTTService implements IService {
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class);
+ private Server server = new Server();
+
+ @Override
+ public void start() throws StartupException {
+ startup();
+ }
+
+ @Override
+ public void stop() {
+ shutdown();
+ }
+
+ public void startup() {
+ IoTDBConfig iotDBConfig = IoTDBDescriptor.getInstance().getConfig();
+ IConfig config = createBrokerConfig(iotDBConfig);
+ List<InterceptHandler> handlers = Lists.newArrayList(new PublishHandler(iotDBConfig));
+ IAuthenticator authenticator = new BrokerAuthenticator();
+
+ server.startServer(config, handlers, null, authenticator, null);
+
+ LOG.info("Start MQTT service successfully, listening on ip {} port {}",
+ iotDBConfig.getMqttHost(), iotDBConfig.getMqttPort());
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ LOG.info("Stopping IoTDB MQTT service...");
+ shutdown();
+ LOG.info("IoTDB MQTT service stopped.");
+ }));
+ }
+
+ private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) {
+ Properties properties = new Properties();
+ properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, iotDBConfig.getMqttHost());
+ properties.setProperty(BrokerConstants.PORT_PROPERTY_NAME, String.valueOf(iotDBConfig.getMqttPort()));
+ properties.setProperty(BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, String.valueOf(iotDBConfig.getMqttHandlerPoolSize()));
+ return new MemoryConfig(properties);
+ }
+
+ public void shutdown() {
+ server.stopServer();
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.MQTT_SERVICE;
+ }
+
+ public static final MQTTService getInstance() {
+ return MQTTServiceHolder.INSTANCE;
+ }
+
+ private static class MQTTServiceHolder {
+
+ private static final MQTTService INSTANCE = new MQTTService();
+
+ private MQTTServiceHolder() {
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 0b0dea5..256cccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -26,6 +26,7 @@ public enum ServiceType {
JMX_SERVICE("JMX ServerService", "JMX ServerService"),
METRICS_SERVICE("Metrics ServerService","MetricsService"),
JDBC_SERVICE("JDBC ServerService", "JDBCService"),
+ MQTT_SERVICE("MQTTService", ""),
MONITOR_SERVICE("Monitor ServerService", "Monitor"),
STAT_MONITOR_SERVICE("Statistics ServerService", ""),
WAL_SERVICE("WAL ServerService", ""),
diff --git a/server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter b/server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
new file mode 100644
index 0000000..d115e3e
--- /dev/null
+++ b/server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.iotdb.db.mqtt.JSONPayloadFormatter
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/BrokerAuthenticatorTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/BrokerAuthenticatorTest.java
new file mode 100644
index 0000000..fb36c19
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/BrokerAuthenticatorTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class BrokerAuthenticatorTest {
+
+ @Test
+ public void checkValid() {
+ BrokerAuthenticator authenticator = new BrokerAuthenticator();
+ assertTrue(authenticator.checkValid(null, "root", "root".getBytes()));
+ assertFalse(authenticator.checkValid(null, "", "foo".getBytes()));
+ assertFalse(authenticator.checkValid(null, "root", null));
+ assertFalse(authenticator.checkValid(null, "foo", "foo".getBytes()));
+ }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatTest.java
new file mode 100644
index 0000000..9b8ce2b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+public class JSONPayloadFormatTest {
+
+ @Test
+ public void formatJson() {
+ String payload = " {\n" +
+ " \"device\":\"root.sg.d1\",\n" +
+ " \"timestamp\":1586076045524,\n" +
+ " \"measurements\":[\"s1\",\"s2\"],\n" +
+ " \"values\":[0.530635,0.530635]\n" +
+ " }";
+
+ ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+ JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+ Message message = formatter.format(buf).get(0);
+
+ assertEquals("root.sg.d1", message.getDevice());
+ assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
+ assertEquals("s1", message.getMeasurements().get(0));
+ assertEquals(0.530635D, Double.parseDouble(message.getValues().get(0)), 0);
+ }
+
+ @Test
+ public void formatBatchJson() {
+ String payload = " {\n" +
+ " \"device\":\"root.sg.d1\",\n" +
+ " \"timestamps\":[1586076045524,1586076065526],\n" +
+ " \"measurements\":[\"s1\",\"s2\"],\n" +
+ " \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" +
+ " }";
+
+ ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+ JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+ Message message = formatter.format(buf).get(1);
+
+ assertEquals("root.sg.d1", message.getDevice());
+ assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
+ assertEquals("s2", message.getMeasurements().get(1));
+ assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/PayloadFormatManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/PayloadFormatManagerTest.java
new file mode 100644
index 0000000..3882344
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/PayloadFormatManagerTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class PayloadFormatManagerTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void getPayloadFormat() {
+ PayloadFormatManager.getPayloadFormat("txt");
+ }
+
+ @Test
+ public void getDefaultPayloadFormat() {
+ assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
+ }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
new file mode 100644
index 0000000..44c07ec
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.moquette.interception.messages.InterceptPublishMessage;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.mqtt.*;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class PublishHandlerTest {
+
+ @Test
+ public void onPublish() throws Exception {
+ IPlanExecutor executor = mock(IPlanExecutor.class);
+ PayloadFormatter payloadFormat = PayloadFormatManager.getPayloadFormat("json");
+ PublishHandler handler = new PublishHandler(executor, payloadFormat);
+
+ String payload = "{\n" +
+ "\"device\":\"root.sg.d1\",\n" +
+ "\"timestamp\":1586076045524,\n" +
+ "\"measurements\":[\"s1\"],\n" +
+ "\"values\":[0.530635]\n" +
+ "}";
+
+ ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+ MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("root.sg.d1", 1);
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1);
+
+ MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf);
+ InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, null, null);
+ handler.onPublish(message);
+ verify(executor).processNonQuery(any(InsertPlan.class));
+ }
+}
\ No newline at end of file
diff --git a/site/src/main/.vuepress/config.js b/site/src/main/.vuepress/config.js
index 719fcfe..818155a 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -347,7 +347,8 @@ var config = {
'4-Client/3-Programming - JDBC',
'4-Client/4-Programming - Other Languages',
'4-Client/5-Programming - TsFile API',
- '4-Client/6-Status Codes',
+ '4-Client/6-Programming - MQTT',
+ '4-Client/7-Status Codes',
]
},
{