You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/09 00:17:39 UTC

[incubator-iotdb] branch master updated: [IOTDB-565] MQTT Protocol Support (#929)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d6207c0  [IOTDB-565] MQTT Protocol Support (#929)
d6207c0 is described below

commit d6207c0745dcf4332a6763c80587808ee861b902
Author: Xin Wang <xi...@apache.org>
AuthorDate: Thu Apr 9 08:17:13 2020 +0800

    [IOTDB-565] MQTT Protocol Support (#929)
    
    * [IOTDB-503] Add checkTimeseriesExists for session
    
    * [IOTDB-565] MQTT Protocol Support
    
    * [IOTDB-565] upgrade fastjson to latest version
---
 LICENSE                                            |   8 +
 LICENSE-binary                                     |   2 +-
 docs/UserGuide/4-Client/6-Programming - MQTT.md    | 100 ++++
 .../{6-Status Codes.md => 7-Status Codes.md}       |   0
 example/flink/README.md                            |   3 +-
 example/flink/pom.xml                              |   5 -
 .../org/apache/iotdb/flink/FlinkIoTDBSink.java     |   6 -
 example/{flink => mqtt}/README.md                  |  15 +-
 example/{flink => mqtt}/pom.xml                    |  24 +-
 .../java/org/apache/iotdb/mqtt/MQTTClient.java     |  50 ++
 example/pom.xml                                    |   1 +
 pom.xml                                            |  26 +-
 server/pom.xml                                     |  10 +-
 .../resources/conf/iotdb-engine.properties         |  22 +-
 .../java/io/moquette/broker/MQTTConnection.java    | 503 +++++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  68 +++
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   7 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  17 +
 .../apache/iotdb/db/mqtt/BrokerAuthenticator.java  |  48 ++
 .../apache/iotdb/db/mqtt/JSONPayloadFormatter.java |  90 ++++
 .../java/org/apache/iotdb/db/mqtt/Message.java     |  73 +++
 .../apache/iotdb/db/mqtt/PayloadFormatManager.java |  47 ++
 .../org/apache/iotdb/db/mqtt/PayloadFormatter.java |  40 ++
 .../org/apache/iotdb/db/mqtt/PublishHandler.java   | 110 +++++
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   3 +
 .../apache/iotdb/db/service/IoTDBShutdownHook.java |   2 +-
 .../org/apache/iotdb/db/service/MQTTService.java   | 102 +++++
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../org.apache.iotdb.db.mqtt.PayloadFormatter      |  20 +
 .../iotdb/db/mqtt/BrokerAuthenticatorTest.java     |  35 ++
 .../iotdb/db/mqtt/JSONPayloadFormatTest.java       |  69 +++
 .../iotdb/db/mqtt/PayloadFormatManagerTest.java    |  35 ++
 .../apache/iotdb/db/mqtt/PublishHandlerTest.java   |  59 +++
 site/src/main/.vuepress/config.js                  |   3 +-
 34 files changed, 1555 insertions(+), 49 deletions(-)

diff --git a/LICENSE b/LICENSE
index b88d620..d784353 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,3 +215,11 @@ The following class is modified from Apache commons-collections
 
 ./tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Murmur128Hash.java
 Relevant pr is: https://github.com/apache/commons-collections/pull/83/
+
+------------
+
+The following class is modified from moquette (https://github.com/moquette-io/moquette),
+which is under Apache License 2.0:
+
+./server/src/main/java/io/moquette/broker/MQTTConnection.java
+Relevant pr is: https://github.com/moquette-io/moquette/pull/454
diff --git a/LICENSE-binary b/LICENSE-binary
index 35127f4..d2a39ea 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -221,7 +221,7 @@ org.apache.commons:commons-collections4:4.0
 org.apache.commons:commons-lang3:3.1
 org.apache.thrift:libthrift:0.9.3
 org.xerial.snappy:snappy-java:1.0.5-M1
-com.alibaba:fastjson:1.2.31
+com.alibaba:fastjson:1.2.67
 com.sun.xml.fastinfoset:FastInfoset:1.2.14
 io.airlift.airline:0.8
 com.google.guava.guava:21.0
diff --git a/docs/UserGuide/4-Client/6-Programming - MQTT.md b/docs/UserGuide/4-Client/6-Programming - MQTT.md
new file mode 100644
index 0000000..0fe588e
--- /dev/null
+++ b/docs/UserGuide/4-Client/6-Programming - MQTT.md	
@@ -0,0 +1,100 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# MQTT Protocol
+
+[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol.
+It was designed as an extremely lightweight publish/subscribe messaging transport.
+It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
+
+IoTDB supports the MQTT v3.1(an OASIS Standard) protocol.
+IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly.
+
+<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/6711230/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
+
+
+## Built-in MQTT Service
+The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
+ and then write the data into storage immediately. 
+The MQTT topic is corresponding to IoTDB timeseries. 
+The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the default implementation is `JSONPayloadFormatter`.
+The default `json` formatter support two json format, and the following is an MQTT message payload example:
+
+```json
+ {
+      "device":"root.sg.d1",
+      "timestamp":1586076045524,
+      "measurements":["s1","s2"],
+      "values":[0.530635,0.530635]
+ }
+```
+or
+```json
+{
+      "device":"root.sg.d1",
+      "timestamps":[1586076045524,1586076065526],
+      "measurements":["s1","s2"],
+      "values":[[0.530635,0.530635], [0.530655,0.530695]]
+  }
+```
+
+<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/6711230/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png">
+
+## MQTT Configurations
+The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties` by default.
+
+Configurations are as following:
+
+| NAME        | DESCRIPTION           | DEFAULT  |
+| ------------- |:-------------:|:------:|
+| enable_mqtt_service      | whether to enable the mqtt service | true |
+| mqtt_host      | the mqtt service binding host | 0.0.0.0 |
+| mqtt_port      | the mqtt service binding port    |   1883 |
+| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages      |    1 |
+| mqtt_payload_formatter | the mqtt message payload formatter     |    json |
+
+
+## Examples
+The following is an example which a mqtt client send messages to IoTDB server.
+
+ ```java
+        MQTT mqtt = new MQTT();
+        mqtt.setHost("127.0.0.1", 1883);
+        mqtt.setUserName("root");
+        mqtt.setPassword("root");
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        Random random = new Random();
+        for (int i = 0; i < 10; i++) {
+            String payload = String.format("{\n" +
+                    "\"device\":\"root.sg.d1\",\n" +
+                    "\"timestamp\":%d,\n" +
+                    "\"measurements\":[\"s1\"],\n" +
+                    "\"values\":[%f]\n" +
+                    "}", System.currentTimeMillis(), random.nextDouble());
+
+            connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+        }
+
+        connection.disconnect();
+    }
+ ```
diff --git a/docs/UserGuide/4-Client/6-Status Codes.md b/docs/UserGuide/4-Client/7-Status Codes.md
similarity index 100%
rename from docs/UserGuide/4-Client/6-Status Codes.md
rename to docs/UserGuide/4-Client/7-Status Codes.md
diff --git a/example/flink/README.md b/example/flink/README.md
index 56b92e4..5059946 100644
--- a/example/flink/README.md
+++ b/example/flink/README.md
@@ -27,7 +27,8 @@ The example is to show how to send data to a IoTDB server from a Flink job.
 
 ## Usage
 
-* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB server and run the flink job on local mini cluster.
+* Launch the IoTDB server.
+* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to run the flink job on local mini cluster.
 
 # TsFile-Flink-Connector Example
 
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
index e46e834..308edfb 100644
--- a/example/flink/pom.xml
+++ b/example/flink/pom.xml
@@ -38,11 +38,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-server</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
             <artifactId>tsfile</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 0d8bb2f..1048079 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.flink;
 import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.iotdb.db.service.IoTDB;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -28,11 +27,6 @@ import java.util.Random;
 
 public class FlinkIoTDBSink {
     public static void main(String[] args) throws Exception {
-        // launch the local iotDB server at default port: 6667
-        IoTDB.main(args);
-
-        Thread.sleep(3000);
-
         // run the flink job on local mini cluster
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
diff --git a/example/flink/README.md b/example/mqtt/README.md
similarity index 60%
copy from example/flink/README.md
copy to example/mqtt/README.md
index 56b92e4..dfdf08f 100644
--- a/example/flink/README.md
+++ b/example/mqtt/README.md
@@ -18,20 +18,15 @@
     under the License.
 
 -->
-# IoTDB-Flink-Connector Example
+# IoTDB-MQTT-Broker Example
 
 ## Function
 ```
-The example is to show how to send data to a IoTDB server from a Flink job.
+The example is to show how to send data to IoTDB from a mqtt client.
 ```
 
 ## Usage
 
-* Run `org.apache.iotdb.flink.FlinkIoTDBSink.java` to launch the local iotDB server and run the flink job on local mini cluster.
-
-# TsFile-Flink-Connector Example
-
-## Usage
-
-* Run `org.apache.iotdb.flink.FlinkTsFileBatchSource.java` to create a tsfile and read it via a flink DataSet job on local mini cluster.
-* Run `org.apache.iotdb.flink.FlinkTsFileStreamSource.java` to create a tsfile and read it via a flink DataStream job on local mini cluster.
+* Launch the IoTDB server.
+* setup storage group `SET STORAGE GROUP TO root.sg` and create time timeseries `CREATE TIMESERIES root.sg.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN`.
+* Run `org.apache.iotdb.mqtt.MQTTClient` to run the mqtt client and send events to server.
diff --git a/example/flink/pom.xml b/example/mqtt/pom.xml
similarity index 63%
copy from example/flink/pom.xml
copy to example/mqtt/pom.xml
index e46e834..485352b 100644
--- a/example/flink/pom.xml
+++ b/example/mqtt/pom.xml
@@ -27,29 +27,13 @@
         <version>0.10.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <artifactId>flink-example</artifactId>
-    <name>IoTDB-Flink Examples</name>
+    <artifactId>mqtt-example</artifactId>
+    <name>IoTDB-MQTT Examples</name>
     <packaging>jar</packaging>
     <dependencies>
         <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>flink-iotdb-connector</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-server</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>tsfile</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>flink-tsfile-connector</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.fusesource.mqtt-client</groupId>
+            <artifactId>mqtt-client</artifactId>
         </dependency>
     </dependencies>
 </project>
diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
new file mode 100644
index 0000000..54f0665
--- /dev/null
+++ b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.mqtt;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+
+import java.util.Random;
+
+public class MQTTClient {
+    public static void main(String[] args) throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setHost("127.0.0.1", 1883);
+        mqtt.setUserName("root");
+        mqtt.setPassword("root");
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+
+        Random random = new Random();
+        for (int i = 0; i < 10; i++) {
+            String payload = String.format("{\n" +
+                    "\"device\":\"root.sg.d1\",\n" +
+                    "\"timestamp\":%d,\n" +
+                    "\"measurements\":[\"s1\"],\n" +
+                    "\"values\":[%f]\n" +
+                    "}", System.currentTimeMillis(), random.nextDouble());
+
+            connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+        }
+
+        connection.disconnect();
+    }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 283f2d8..4b336bd 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -41,6 +41,7 @@
         <module>jdbc</module>
         <module>hadoop</module>
         <module>flink</module>
+        <module>mqtt</module>
     </modules>
     <build>
         <pluginManagement>
diff --git a/pom.xml b/pom.xml
index 647466f..56692a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
         <common.lang3.version>3.8.1</common.lang3.version>
         <common.logging.version>1.1.3</common.logging.version>
         <guava.version>21.0</guava.version>
+        <fastjson.version>1.2.67</fastjson.version>
         <jline.version>2.14.5</jline.version>
         <jetty.version>9.4.24.v20191120</jetty.version>
         <metrics.version>3.2.6</metrics.version>
@@ -135,6 +136,19 @@
         <!-- By default, the argLine is empty-->
         <argLine/>
     </properties>
+    <repositories>
+        <!--   repository for moquette    -->
+        <repository>
+            <id>bintray</id>
+            <url>https://jcenter.bintray.com</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
     <!--
         if we claim dependencies in dependencyManagement, then we do not claim
         their version in sub-project's pom, but we have to claim themselves again
@@ -154,7 +168,7 @@
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>fastjson</artifactId>
-                <version>1.2.31</version>
+                <version>${fastjson.version}</version>
             </dependency>
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
@@ -459,6 +473,16 @@
                 <artifactId>airline</artifactId>
                 <version>${airline.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.moquette</groupId>
+                <artifactId>moquette-broker</artifactId>
+                <version>0.12.1</version>
+            </dependency>
+            <dependency>
+                <groupId>org.fusesource.mqtt-client</groupId>
+                <artifactId>mqtt-client</artifactId>
+                <version>1.12</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <dependencies>
diff --git a/server/pom.xml b/server/pom.xml
index b231975..618f768 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -61,12 +61,14 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-collections4</artifactId>
         </dependency>
-        <!-- https://mvnrepository.com/artifact/org.antlr/antlr-runtime4 -->
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.antlr</groupId>
             <artifactId>antlr4-runtime</artifactId>
         </dependency>
-        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
@@ -95,6 +97,10 @@
             <groupId>io.dropwizard.metrics</groupId>
             <artifactId>metrics-json</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.moquette</groupId>
+            <artifactId>moquette-broker</artifactId>
+        </dependency>
         <!-- for mocked test-->
         <dependency>
             <groupId>org.powermock</groupId>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 39ac594..e0a13d7 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -451,4 +451,24 @@ partition_interval=604800
 # For example, your partitionInterval is 86400 and you want to insert data in 3 different days,
 # you should set this param >= 6 (for sequence and unsequence)
 # default number is 10
-memtable_num_in_each_storage_group=10
\ No newline at end of file
+memtable_num_in_each_storage_group=10
+
+
+####################
+### MQTT Broker Configuration
+####################
+
+# whether to enable the mqtt service.
+enable_mqtt_service=true
+
+# the mqtt service binding host.
+mqtt_host=0.0.0.0
+
+# the mqtt service binding port.
+mqtt_port=1883
+
+# the handler pool size for handing the mqtt messages.
+mqtt_handler_pool_size=1
+
+# the mqtt message payload formatter.
+mqtt_payload_formatter=json
diff --git a/server/src/main/java/io/moquette/broker/MQTTConnection.java b/server/src/main/java/io/moquette/broker/MQTTConnection.java
new file mode 100644
index 0000000..4c4a8fa
--- /dev/null
+++ b/server/src/main/java/io/moquette/broker/MQTTConnection.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.moquette.broker;
+
+import io.moquette.broker.subscriptions.Topic;
+import io.moquette.broker.security.IAuthenticator;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
+import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
+import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
+import static io.netty.handler.codec.mqtt.MqttQoS.*;
+
+// NOTE:
+// override the MQTTConnection class in the moquette 0.12.1 jar to fix the PUBACK flush issue
+// https://github.com/moquette-io/moquette/pull/454
+// when moquette fixed version released, we can remove this.
+final class MQTTConnection {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTConnection.class);
+
+    final Channel channel;
+    private BrokerConfiguration brokerConfig;
+    private IAuthenticator authenticator;
+    private SessionRegistry sessionRegistry;
+    private final PostOffice postOffice;
+    private boolean connected;
+    private final AtomicInteger lastPacketId = new AtomicInteger(0);
+
+    MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator,
+                   SessionRegistry sessionRegistry, PostOffice postOffice) {
+        this.channel = channel;
+        this.brokerConfig = brokerConfig;
+        this.authenticator = authenticator;
+        this.sessionRegistry = sessionRegistry;
+        this.postOffice = postOffice;
+        this.connected = false;
+    }
+
+    void handleMessage(MqttMessage msg) {
+        MqttMessageType messageType = msg.fixedHeader().messageType();
+        LOG.debug("Received MQTT message, type: {}, channel: {}", messageType, channel);
+        switch (messageType) {
+            case CONNECT:
+                processConnect((MqttConnectMessage) msg);
+                break;
+            case SUBSCRIBE:
+                processSubscribe((MqttSubscribeMessage) msg);
+                break;
+            case UNSUBSCRIBE:
+                processUnsubscribe((MqttUnsubscribeMessage) msg);
+                break;
+            case PUBLISH:
+                processPublish((MqttPublishMessage) msg);
+                break;
+            case PUBREC:
+                processPubRec(msg);
+                break;
+            case PUBCOMP:
+                processPubComp(msg);
+                break;
+            case PUBREL:
+                processPubRel(msg);
+                break;
+            case DISCONNECT:
+                processDisconnect(msg);
+                break;
+            case PUBACK:
+                processPubAck(msg);
+                break;
+            case PINGREQ:
+                MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, AT_MOST_ONCE,
+                        false, 0);
+                MqttMessage pingResp = new MqttMessage(pingHeader);
+                channel.writeAndFlush(pingResp).addListener(CLOSE_ON_FAILURE);
+                break;
+            default:
+                LOG.error("Unknown MessageType: {}, channel: {}", messageType, channel);
+                break;
+        }
+    }
+
+    private void processPubComp(MqttMessage msg) {
+        final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
+        final Session session = sessionRegistry.retrieve(getClientId());
+        session.processPubComp(messageID);
+    }
+
+    private void processPubRec(MqttMessage msg) {
+        final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
+        final Session session = sessionRegistry.retrieve(getClientId());
+        session.processPubRec(messageID);
+    }
+
+    static MqttMessage pubrel(int messageID) {
+        MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
+        return new MqttMessage(pubRelHeader, from(messageID));
+    }
+
+    private void processPubAck(MqttMessage msg) {
+        final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
+        Session session = sessionRegistry.retrieve(getClientId());
+        session.pubAckReceived(messageID);
+    }
+
+    void processConnect(MqttConnectMessage msg) {
+        MqttConnectPayload payload = msg.payload();
+        String clientId = payload.clientIdentifier();
+        final String username = payload.userName();
+        LOG.trace("Processing CONNECT message. CId={} username: {} channel: {}", clientId, username, channel);
+
+        if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
+            LOG.warn("MQTT protocol version is not valid. CId={} channel: {}", clientId, channel);
+            abortConnection(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
+            return;
+        }
+        final boolean cleanSession = msg.variableHeader().isCleanSession();
+        if (clientId == null || clientId.length() == 0) {
+            if (!brokerConfig.isAllowZeroByteClientId()) {
+                LOG.warn("Broker doesn't permit MQTT empty client ID. Username: {}, channel: {}", username, channel);
+                abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
+                return;
+            }
+
+            if (!cleanSession) {
+                LOG.warn("MQTT client ID cannot be empty for persistent session. Username: {}, channel: {}",
+                        username, channel);
+                abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
+                return;
+            }
+
+            // Generating client id.
+            clientId = UUID.randomUUID().toString().replace("-", "");
+            LOG.debug("Client has connected with integration generated id: {}, username: {}, channel: {}", clientId,
+                    username, channel);
+        }
+
+        if (!login(msg, clientId)) {
+            abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
+            channel.close().addListener(CLOSE_ON_FAILURE);
+            return;
+        }
+
+        try {
+            LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
+            sessionRegistry.bindToSession(this, msg, clientId);
+
+            initializeKeepAliveTimeout(channel, msg, clientId);
+            setupInflightResender(channel);
+
+            NettyUtils.clientID(channel, clientId);
+            LOG.trace("CONNACK sent, channel: {}", channel);
+            postOffice.dispatchConnection(msg);
+            LOG.trace("dispatch connection: {}", msg.toString());
+        } catch (SessionCorruptedException scex) {
+            LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", clientId, channel);
+            abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
+        }
+    }
+
+    private void setupInflightResender(Channel channel) {
+        channel.pipeline()
+                .addFirst("inflightResender", new InflightResender(5_000, TimeUnit.MILLISECONDS));
+    }
+
+    private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
+        int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
+        NettyUtils.keepAlive(channel, keepAlive);
+        NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
+        NettyUtils.clientID(channel, clientId);
+        int idleTime = Math.round(keepAlive * 1.5f);
+        setIdleTime(channel.pipeline(), idleTime);
+
+        LOG.debug("Connection has been configured CId={}, keepAlive={}, removeTemporaryQoS2={}, idleTime={}",
+                clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime);
+    }
+
+    private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
+        if (pipeline.names().contains("idleStateHandler")) {
+            pipeline.remove("idleStateHandler");
+        }
+        pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
+    }
+
+    private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
+        return msg.variableHeader().version() != version.protocolLevel();
+    }
+
+    private void abortConnection(MqttConnectReturnCode returnCode) {
+        MqttConnAckMessage badProto = connAck(returnCode, false);
+        channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
+        channel.close().addListener(CLOSE_ON_FAILURE);
+    }
+
+    private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) {
+        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
+                false, 0);
+        MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
+        return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
+    }
+
+    private boolean login(MqttConnectMessage msg, final String clientId) {
+        // handle user authentication
+        if (msg.variableHeader().hasUserName()) {
+            byte[] pwd = null;
+            if (msg.variableHeader().hasPassword()) {
+                pwd = msg.payload().password().getBytes(StandardCharsets.UTF_8);
+            } else if (!brokerConfig.isAllowAnonymous()) {
+                LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
+                return false;
+            }
+            final String login = msg.payload().userName();
+            if (!authenticator.checkValid(clientId, login, pwd)) {
+                LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
+                return false;
+            }
+            NettyUtils.userName(channel, login);
+        } else if (!brokerConfig.isAllowAnonymous()) {
+            LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
+            return false;
+        }
+        return true;
+    }
+
+    void handleConnectionLost() {
+        String clientID = NettyUtils.clientID(channel);
+        if (clientID == null || clientID.isEmpty()) {
+            return;
+        }
+        LOG.info("Notifying connection lost event. CId: {}, channel: {}", clientID, channel);
+        Session session = sessionRegistry.retrieve(clientID);
+        if (session.hasWill()) {
+            postOffice.fireWill(session.getWill());
+        }
+        if (session.isClean()) {
+            sessionRegistry.remove(clientID);
+        } else {
+            sessionRegistry.disconnect(clientID);
+        }
+        connected = false;
+        //dispatch connection lost to intercept.
+        String userName = NettyUtils.userName(channel);
+        postOffice.dispatchConnectionLost(clientID,userName);
+        LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
+    }
+
+    void sendConnAck(boolean isSessionAlreadyPresent) {
+        connected = true;
+        final MqttConnAckMessage ackMessage = connAck(CONNECTION_ACCEPTED, isSessionAlreadyPresent);
+        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
+    }
+
+    boolean isConnected() {
+        return connected;
+    }
+
+    void dropConnection() {
+        channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
+    }
+
+    void processDisconnect(MqttMessage msg) {
+        final String clientID = NettyUtils.clientID(channel);
+        LOG.trace("Start DISCONNECT CId={}, channel: {}", clientID, channel);
+        if (!connected) {
+            LOG.info("DISCONNECT received on already closed connection, CId={}, channel: {}", clientID, channel);
+            return;
+        }
+        sessionRegistry.disconnect(clientID);
+        connected = false;
+        channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
+        LOG.trace("Processed DISCONNECT CId={}, channel: {}", clientID, channel);
+        String userName = NettyUtils.userName(channel);
+        postOffice.dispatchDisconnection(clientID,userName);
+        LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
+    }
+
+    void processSubscribe(MqttSubscribeMessage msg) {
+        final String clientID = NettyUtils.clientID(channel);
+        if (!connected) {
+            LOG.warn("SUBSCRIBE received on already closed connection, CId={}, channel: {}", clientID, channel);
+            dropConnection();
+            return;
+        }
+        postOffice.subscribeClientToTopics(msg, clientID, NettyUtils.userName(channel), this);
+    }
+
+    void sendSubAckMessage(int messageID, MqttSubAckMessage ackMessage) {
+        final String clientId = NettyUtils.clientID(channel);
+        LOG.trace("Sending SUBACK response CId={}, messageId: {}", clientId, messageID);
+        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
+    }
+
+    private void processUnsubscribe(MqttUnsubscribeMessage msg) {
+        List<String> topics = msg.payload().topics();
+        String clientID = NettyUtils.clientID(channel);
+
+        LOG.trace("Processing UNSUBSCRIBE message. CId={}, topics: {}", clientID, topics);
+        postOffice.unsubscribe(topics, this, msg.variableHeader().messageId());
+    }
+
+    void sendUnsubAckMessage(List<String> topics, String clientID, int messageID) {
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_MOST_ONCE,
+                false, 0);
+        MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));
+
+        LOG.trace("Sending UNSUBACK message. CId={}, messageId: {}, topics: {}", clientID, messageID, topics);
+        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
+        LOG.trace("Client <{}> unsubscribed from topics <{}>", clientID, topics);
+    }
+
+    void processPublish(MqttPublishMessage msg) {
+        final MqttQoS qos = msg.fixedHeader().qosLevel();
+        final String username = NettyUtils.userName(channel);
+        final String topicName = msg.variableHeader().topicName();
+        final String clientId = getClientId();
+        LOG.trace("Processing PUBLISH message. CId={}, topic: {}, messageId: {}, qos: {}", clientId, topicName,
+                msg.variableHeader().packetId(), qos);
+        ByteBuf payload = msg.payload();
+        final boolean retain = msg.fixedHeader().isRetain();
+        final Topic topic = new Topic(topicName);
+        if (!topic.isValid()) {
+            LOG.debug("Drop connection because of invalid topic format");
+            dropConnection();
+        }
+        switch (qos) {
+            case AT_MOST_ONCE:
+                postOffice.receivedPublishQos0(topic, username, clientId, payload, retain, msg);
+                break;
+            case AT_LEAST_ONCE: {
+                final int messageID = msg.variableHeader().packetId();
+                postOffice.receivedPublishQos1(this, topic, username, payload, messageID, retain, msg);
+                break;
+            }
+            case EXACTLY_ONCE: {
+                final int messageID = msg.variableHeader().packetId();
+                final Session session = sessionRegistry.retrieve(clientId);
+                session.receivedPublishQos2(messageID, msg);
+                postOffice.receivedPublishQos2(this, msg, username);
+//                msg.release();
+                break;
+            }
+            default:
+                LOG.error("Unknown QoS-Type:{}", qos);
+                break;
+        }
+    }
+
+    void sendPublishReceived(int messageID) {
+        LOG.trace("sendPubRec invoked on channel: {}", channel);
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE,
+                false, 0);
+        MqttPubAckMessage pubRecMessage = new MqttPubAckMessage(fixedHeader, from(messageID));
+        sendIfWritableElseDrop(pubRecMessage);
+    }
+
+    private void processPubRel(MqttMessage msg) {
+        final Session session = sessionRegistry.retrieve(getClientId());
+        final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
+        session.receivedPubRelQos2(messageID);
+        sendPubCompMessage(messageID);
+    }
+
+    void sendPublish(MqttPublishMessage publishMsg) {
+        final int packetId = publishMsg.variableHeader().packetId();
+        final String topicName = publishMsg.variableHeader().topicName();
+        final String clientId = getClientId();
+        MqttQoS qos = publishMsg.fixedHeader().qosLevel();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}, payload={}", qos, packetId,
+                    clientId, topicName, DebugUtils.payload2Str(publishMsg.payload()));
+        } else {
+            LOG.debug("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}", qos, packetId, clientId,
+                    topicName);
+        }
+        sendIfWritableElseDrop(publishMsg);
+    }
+
+    void sendIfWritableElseDrop(MqttMessage msg) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("OUT {} on channel {}", msg.fixedHeader().messageType(), channel);
+        }
+        if (channel.isWritable()) {
+            // flushing PUBACK immediately
+            channel.writeAndFlush(msg).addListener(FIRE_EXCEPTION_ON_FAILURE);
+        }
+    }
+
+    public void writabilityChanged() {
+        if (channel.isWritable()) {
+            LOG.debug("Channel {} is again writable", channel);
+            final Session session = sessionRegistry.retrieve(getClientId());
+            session.writabilityChanged();
+        }
+    }
+
+    void sendPubAck(int messageID) {
+        LOG.trace("sendPubAck invoked");
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE,
+                false, 0);
+        MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, from(messageID));
+        sendIfWritableElseDrop(pubAckMessage);
+    }
+
+    private void sendPubCompMessage(int messageID) {
+        LOG.trace("Sending PUBCOMP message on channel: {}, messageId: {}", channel, messageID);
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
+        MqttMessage pubCompMessage = new MqttMessage(fixedHeader, from(messageID));
+        sendIfWritableElseDrop(pubCompMessage);
+    }
+
+    String getClientId() {
+        return NettyUtils.clientID(channel);
+    }
+
+    String getUsername() {
+        return NettyUtils.userName(channel);
+    }
+
+    public void sendPublishRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
+        MqttPublishMessage publishMsg = retainedPublish(topic.toString(), qos, payload);
+        sendPublish(publishMsg);
+    }
+
+    public void sendPublishRetainedWithPacketId(Topic topic, MqttQoS qos, ByteBuf payload) {
+        final int packetId = nextPacketId();
+        MqttPublishMessage publishMsg = retainedPublishWithMessageId(topic.toString(), qos, payload, packetId);
+        sendPublish(publishMsg);
+    }
+
+    private static MqttPublishMessage retainedPublish(String topic, MqttQoS qos, ByteBuf message) {
+        return retainedPublishWithMessageId(topic, qos, message, 0);
+    }
+
+    private static MqttPublishMessage retainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message,
+                                                                   int messageId) {
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, true, 0);
+        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
+        return new MqttPublishMessage(fixedHeader, varHeader, message);
+    }
+
+    // TODO move this method in Session
+    void sendPublishNotRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
+        MqttPublishMessage publishMsg = notRetainedPublish(topic.toString(), qos, payload);
+        sendPublish(publishMsg);
+    }
+
+    static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
+        return notRetainedPublishWithMessageId(topic, qos, message, 0);
+    }
+
+    static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message,
+                                                              int messageId) {
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
+        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
+        return new MqttPublishMessage(fixedHeader, varHeader, message);
+    }
+
+    public void resendNotAckedPublishes() {
+        final Session session = sessionRegistry.retrieve(getClientId());
+        session.resendInflightNotAcked();
+    }
+
+    int nextPacketId() {
+        return lastPacketId.incrementAndGet();
+    }
+
+    @Override
+    public String toString() {
+        return "MQTTConnection{channel=" + channel + ", connected=" + connected + '}';
+    }
+
+    InetSocketAddress remoteAddress() {
+        return (InetSocketAddress) channel.remoteAddress();
+    }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c0ce887..fa3013a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -50,6 +50,34 @@ public class IoTDBConfig {
 
   private boolean enableMetricService = false;
 
+  /**
+   * whether to enable the mqtt service.
+   */
+  private boolean enableMQTTService = true;
+
+  /**
+   * the mqtt service binding host.
+   */
+  private String mqttHost = "0.0.0.0";
+
+  /**
+   * the mqtt service binding port.
+   */
+  private int mqttPort = 1883;
+
+  /**
+   * the handler pool size for handing the mqtt messages.
+   */
+  private int mqttHandlerPoolSize = 1;
+
+  /**
+   * the mqtt message payload formatter.
+   */
+  private String mqttPayloadFormatter = "json";
+
+  /**
+   * Rpc binding address.
+   */
   private String rpcAddress = "0.0.0.0";
 
   /**
@@ -1413,4 +1441,44 @@ public class IoTDBConfig {
   public void setQueryCacheSizeInMetric(int queryCacheSizeInMetric) {
     this.queryCacheSizeInMetric = queryCacheSizeInMetric;
   }
+
+  public boolean isEnableMQTTService() {
+    return enableMQTTService;
+  }
+
+  public void setEnableMQTTService(boolean enableMQTTService) {
+    this.enableMQTTService = enableMQTTService;
+  }
+
+  public String getMqttHost() {
+    return mqttHost;
+  }
+
+  public void setMqttHost(String mqttHost) {
+    this.mqttHost = mqttHost;
+  }
+
+  public int getMqttPort() {
+    return mqttPort;
+  }
+
+  public void setMqttPort(int mqttPort) {
+    this.mqttPort = mqttPort;
+  }
+
+  public int getMqttHandlerPoolSize() {
+    return mqttHandlerPoolSize;
+  }
+
+  public void setMqttHandlerPoolSize(int mqttHandlerPoolSize) {
+    this.mqttHandlerPoolSize = mqttHandlerPoolSize;
+  }
+
+  public String getMqttPayloadFormatter() {
+    return mqttPayloadFormatter;
+  }
+
+  public void setMqttPayloadFormatter(String mqttPayloadFormatter) {
+    this.mqttPayloadFormatter = mqttPayloadFormatter;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index c52692f..0088ddf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -88,4 +88,11 @@ public class IoTDBConstant {
   public static final String SCHEMA_FOLDER_NAME = "schema";
   public static final String SYNC_FOLDER_NAME = "sync";
   public static final String QUERY_FOLDER_NAME = "query";
+
+  // mqtt  
+  public static final String ENABLE_MQTT = "enable_mqtt_service";
+  public static final String MQTT_HOST_NAME = "mqtt_host";
+  public static final String MQTT_PORT_NAME = "mqtt_port";
+  public static final String MQTT_HANDLER_POOL_SIZE_NAME = "mqtt_handler_pool_size";
+  public static final String MQTT_PAYLOAD_FORMATTER_NAME = "mqtt_payload_formatter";
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f40a82d..0c94ac7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -339,6 +339,23 @@ public class IoTDBDescriptor {
           Integer.parseInt(properties.getProperty("default_fill_interval",
               String.valueOf(conf.getDefaultFillInterval()))));
 
+      // mqtt
+      if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) {
+        conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME));
+      }
+      if (properties.getProperty(IoTDBConstant.MQTT_PORT_NAME) != null) {
+        conf.setMqttPort(Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_PORT_NAME)));
+      }
+      if (properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME) != null) {
+        conf.setMqttHandlerPoolSize(Integer.parseInt(properties.getProperty(IoTDBConstant.MQTT_HANDLER_POOL_SIZE_NAME)));
+      }
+      if (properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME) != null) {
+        conf.setMqttPayloadFormatter(properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME));
+      }
+      if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) {
+        conf.setEnableMQTTService(Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT)));
+      }
+      
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance().getConfig()
           .setTSFileStorageFs(FSType.valueOf(
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/BrokerAuthenticator.java b/server/src/main/java/org/apache/iotdb/db/mqtt/BrokerAuthenticator.java
new file mode 100644
index 0000000..8d2d0ff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/BrokerAuthenticator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.moquette.broker.security.IAuthenticator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
+import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The MQTT broker authenticator.
+ */
+public class BrokerAuthenticator implements IAuthenticator {
+    private static final Logger LOG = LoggerFactory.getLogger(BrokerAuthenticator.class);
+
+    @Override
+    public boolean checkValid(String clientId, String username, byte[] password) {
+        if (StringUtils.isBlank(username) || password == null) {
+            return false;
+        }
+
+        try {
+            IAuthorizer authorizer = LocalFileAuthorizer.getInstance();
+            return authorizer.login(username, new String(password));
+        } catch (AuthException e) {
+            LOG.info("meet error while logging in.", e);
+            return false;
+        }
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatter.java b/server/src/main/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatter.java
new file mode 100644
index 0000000..88e0966
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The JSON payload formatter.
+ * two json format supported:
+ * {
+ *     "device":"root.sg.d1",
+ *     "timestamp":1586076045524,
+ *     "measurements":["s1","s2"],
+ *     "values":[0.530635,0.530635]
+ * }
+ *
+ * {
+ *     "device":"root.sg.d1",
+ *     "timestamps":[1586076045524,1586076065526],
+ *     "measurements":["s1","s2"],
+ *     "values":[[0.530635,0.530635], [0.530655,0.530695]]
+ * }
+ */
+public class JSONPayloadFormatter implements PayloadFormatter {
+    private static final String JSON_KEY_DEVICE = "device";
+    private static final String JSON_KEY_TIMESTAMP = "timestamp";
+    private static final String JSON_KEY_TIMESTAMPS = "timestamps";
+    private static final String JSON_KEY_MEASUREMENTS = "measurements";
+    private static final String JSON_KEY_VALUES = "values";
+
+    @Override
+    public List<Message> format(ByteBuf payload) {
+        if (payload == null) {
+            return null;
+        }
+        String txt = payload.toString(StandardCharsets.UTF_8);
+        JSONObject jsonObject = JSON.parseObject(txt);
+
+        Object timestamp = jsonObject.get(JSON_KEY_TIMESTAMP);
+        if (timestamp != null) {
+            return Lists.newArrayList(JSON.parseObject(txt, Message.class));
+        }
+
+        String device = jsonObject.getString(JSON_KEY_DEVICE);
+        JSONArray timestamps = jsonObject.getJSONArray(JSON_KEY_TIMESTAMPS);
+        JSONArray measurements = jsonObject.getJSONArray(JSON_KEY_MEASUREMENTS);
+        JSONArray values = jsonObject.getJSONArray(JSON_KEY_VALUES);
+
+        List<Message> ret = new ArrayList<>();
+        for (int i = 0; i < timestamps.size(); i++) {
+            Long ts = timestamps.getLong(i);
+
+            Message message = new Message();
+            message.setDevice(device);
+            message.setTimestamp(ts);
+            message.setMeasurements(measurements.toJavaList(String.class));
+            message.setValues(((JSONArray)values.get(i)).toJavaList(String.class));
+            ret.add(message);
+        }
+        return ret;
+    }
+
+    @Override
+    public String getName() {
+        return "json";
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/Message.java b/server/src/main/java/org/apache/iotdb/db/mqtt/Message.java
new file mode 100644
index 0000000..d6a43a6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/Message.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.mqtt;
+
+import java.util.List;
+
+/**
+ * Message describes the information sometime sent from the devices.
+ */
+public class Message {
+    private String device;
+    private Long timestamp;
+    private List<String> measurements;
+    private List<String> values;
+
+    public String getDevice() {
+        return device;
+    }
+
+    public void setDevice(String device) {
+        this.device = device;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public List<String> getMeasurements() {
+        return measurements;
+    }
+
+    public void setMeasurements(List<String> measurements) {
+        this.measurements = measurements;
+    }
+
+    public List<String> getValues() {
+        return values;
+    }
+
+    public void setValues(List<String> values) {
+        this.values = values;
+    }
+
+    @Override
+    public String toString() {
+        return "Message{" +
+                "device='" + device + '\'' +
+                ", timestamp=" + timestamp +
+                ", measurements=" + measurements +
+                ", values=" + values +
+                '}';
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatManager.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatManager.java
new file mode 100644
index 0000000..eb0c5d6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/**
+ * PayloadFormatManager loads payload formatter from SPI services.
+ */
+public class PayloadFormatManager {
+    private static Map<String, PayloadFormatter> map = new HashMap<>();
+
+    static {
+        init();
+    }
+
+    private static void init() {
+        ServiceLoader<PayloadFormatter> formats = ServiceLoader.load(PayloadFormatter.class);
+        for (PayloadFormatter format : formats) {
+            map.put(format.getName(), format);
+        }
+    }
+
+    public static PayloadFormatter getPayloadFormat(String name) {
+        Preconditions.checkArgument(map.containsKey(name), "Unknown payload format named: " + name);
+        return map.get(name);
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatter.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatter.java
new file mode 100644
index 0000000..d99e2d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PayloadFormatter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+
+/**
+ * PayloadFormatter format the payload to the messages.
+ */
+public interface PayloadFormatter {
+    /**
+     * format a payload to a list of messages
+     * @param payload
+     * @return
+     */
+    List<Message> format(ByteBuf payload);
+
+    /**
+     * get the formatter name
+     * @return
+     */
+    String getName();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
new file mode 100644
index 0000000..57cb3aa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.moquette.interception.AbstractInterceptHandler;
+import io.moquette.interception.messages.InterceptPublishMessage;
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * PublishHandler handle the messages from MQTT clients.
+ */
+public class PublishHandler extends AbstractInterceptHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
+
+    private IPlanExecutor executor;
+    private PayloadFormatter payloadFormat;
+
+    public PublishHandler(IoTDBConfig config) {
+        this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
+        try {
+            this.executor = new PlanExecutor();
+        } catch (QueryProcessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
+        this.executor = executor;
+        this.payloadFormat = payloadFormat;
+    }
+
+    @Override
+    public String getID() {
+        return "iotdb-mqtt-broker-listener";
+    }
+
+    @Override
+    public void onPublish(InterceptPublishMessage msg) {
+        String clientId = msg.getClientID();
+        ByteBuf payload = msg.getPayload();
+        String topic = msg.getTopicName();
+        String username = msg.getUsername();
+        MqttQoS qos = msg.getQos();
+
+        LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
+                clientId, username, qos, topic, payload);
+
+        List<Message> events = payloadFormat.format(payload);
+        if (events == null) {
+            return;
+        }
+
+        // since device ids from messages maybe different, so we use the InsertPlan not BatchInsertPlan.
+        for (Message event : events) {
+            if (event == null) {
+                continue;
+            }
+
+            InsertPlan plan = new InsertPlan();
+            plan.setDeviceId(event.getDevice());
+            plan.setTime(event.getTimestamp());
+            plan.setMeasurements(event.getMeasurements().toArray(new String[event.getMeasurements().size()]));
+            plan.setValues(event.getValues().toArray(new String[event.getValues().size()]));
+
+            boolean status;
+            try {
+                status = executeNonQuery(plan);
+            } catch (QueryProcessException e) {
+                throw new RuntimeException(e);
+            }
+
+            LOG.debug("event process result: {}", status);
+        }
+    }
+
+    private boolean executeNonQuery(PhysicalPlan plan) throws QueryProcessException {
+        if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+            throw new QueryProcessException(
+                    "Current system mode is read-only, does not support non-query operation");
+        }
+        return executor.processNonQuery(plan);
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 6f15bfb..a0cbd48 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -99,6 +99,9 @@ public class IoTDB implements IoTDBMBean {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) {
       registerManager.register(MetricsService.getInstance());
     }
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
+      registerManager.register(MQTTService.getInstance());
+    }
     JMXService.registerMBean(getInstance(), mbeanName);
     registerManager.register(StorageEngine.getInstance());
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index caa0439..af560d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.utils.MemUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class IoTDBShutdownHook extends Thread{
+public class IoTDBShutdownHook extends Thread {
 
   private static final Logger logger = LoggerFactory.getLogger(IoTDBShutdownHook.class);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java b/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java
new file mode 100644
index 0000000..a49c02c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.service;
+
+import com.google.common.collect.Lists;
+import io.moquette.BrokerConstants;
+import io.moquette.broker.Server;
+import io.moquette.broker.config.IConfig;
+import io.moquette.broker.config.MemoryConfig;
+import io.moquette.broker.security.IAuthenticator;
+import io.moquette.interception.InterceptHandler;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.mqtt.BrokerAuthenticator;
+import org.apache.iotdb.db.mqtt.PublishHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * The IoTDB MQTT Service.
+ */
+public class MQTTService implements IService {
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class);
+    private Server server = new Server();
+    
+    @Override
+    public void start() throws StartupException {
+        startup();
+    }
+
+    @Override
+    public void stop() {
+        shutdown();
+    }
+
+    public void startup() {
+        IoTDBConfig iotDBConfig = IoTDBDescriptor.getInstance().getConfig();
+        IConfig config = createBrokerConfig(iotDBConfig);
+        List<InterceptHandler> handlers = Lists.newArrayList(new PublishHandler(iotDBConfig));
+        IAuthenticator authenticator = new BrokerAuthenticator();
+
+        server.startServer(config, handlers, null, authenticator, null);
+
+        LOG.info("Start MQTT service successfully, listening on ip {} port {}",
+                iotDBConfig.getMqttHost(), iotDBConfig.getMqttPort());
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            LOG.info("Stopping IoTDB MQTT service...");
+            shutdown();
+            LOG.info("IoTDB MQTT service stopped.");
+        }));
+    }
+
+    private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) {
+        Properties properties = new Properties();
+        properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, iotDBConfig.getMqttHost());
+        properties.setProperty(BrokerConstants.PORT_PROPERTY_NAME, String.valueOf(iotDBConfig.getMqttPort()));
+        properties.setProperty(BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, String.valueOf(iotDBConfig.getMqttHandlerPoolSize()));
+        return new MemoryConfig(properties);
+    }
+
+    public void shutdown() {
+        server.stopServer();
+    }
+
+    @Override
+    public ServiceType getID() {
+        return ServiceType.MQTT_SERVICE;
+    }
+
+    public static final MQTTService getInstance() {
+        return MQTTServiceHolder.INSTANCE;
+    }
+
+    private static class MQTTServiceHolder {
+
+        private static final MQTTService INSTANCE = new MQTTService();
+
+        private MQTTServiceHolder() {
+        }
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 0b0dea5..256cccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -26,6 +26,7 @@ public enum ServiceType {
   JMX_SERVICE("JMX ServerService", "JMX ServerService"),
   METRICS_SERVICE("Metrics ServerService","MetricsService"),
   JDBC_SERVICE("JDBC ServerService", "JDBCService"),
+  MQTT_SERVICE("MQTTService", ""),
   MONITOR_SERVICE("Monitor ServerService", "Monitor"),
   STAT_MONITOR_SERVICE("Statistics ServerService", ""),
   WAL_SERVICE("WAL ServerService", ""),
diff --git a/server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter b/server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
new file mode 100644
index 0000000..d115e3e
--- /dev/null
+++ b/server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.iotdb.db.mqtt.JSONPayloadFormatter
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/BrokerAuthenticatorTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/BrokerAuthenticatorTest.java
new file mode 100644
index 0000000..fb36c19
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/BrokerAuthenticatorTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class BrokerAuthenticatorTest {
+
+    @Test
+    public void checkValid() {
+        BrokerAuthenticator authenticator = new BrokerAuthenticator();
+        assertTrue(authenticator.checkValid(null, "root", "root".getBytes()));
+        assertFalse(authenticator.checkValid(null, "", "foo".getBytes()));
+        assertFalse(authenticator.checkValid(null, "root", null));
+        assertFalse(authenticator.checkValid(null, "foo", "foo".getBytes()));
+    }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatTest.java
new file mode 100644
index 0000000..9b8ce2b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/JSONPayloadFormatTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+public class JSONPayloadFormatTest {
+
+    @Test
+    public void formatJson() {
+        String payload = " {\n" +
+                "      \"device\":\"root.sg.d1\",\n" +
+                "      \"timestamp\":1586076045524,\n" +
+                "      \"measurements\":[\"s1\",\"s2\"],\n" +
+                "      \"values\":[0.530635,0.530635]\n" +
+                " }";
+
+        ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+        JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+        Message message = formatter.format(buf).get(0);
+
+        assertEquals("root.sg.d1", message.getDevice());
+        assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
+        assertEquals("s1", message.getMeasurements().get(0));
+        assertEquals(0.530635D, Double.parseDouble(message.getValues().get(0)), 0);
+    }
+
+    @Test
+    public void formatBatchJson() {
+        String payload = " {\n" +
+                "      \"device\":\"root.sg.d1\",\n" +
+                "      \"timestamps\":[1586076045524,1586076065526],\n" +
+                "      \"measurements\":[\"s1\",\"s2\"],\n" +
+                "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n" +
+                "  }";
+
+        ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+        JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+        Message message = formatter.format(buf).get(1);
+
+        assertEquals("root.sg.d1", message.getDevice());
+        assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
+        assertEquals("s2", message.getMeasurements().get(1));
+        assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
+    }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/PayloadFormatManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/PayloadFormatManagerTest.java
new file mode 100644
index 0000000..3882344
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/PayloadFormatManagerTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class PayloadFormatManagerTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void getPayloadFormat() {
+        PayloadFormatManager.getPayloadFormat("txt");
+    }
+
+    @Test
+    public void getDefaultPayloadFormat() {
+        assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
+    }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
new file mode 100644
index 0000000..44c07ec
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mqtt/PublishHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.moquette.interception.messages.InterceptPublishMessage;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.mqtt.*;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class PublishHandlerTest {
+
+    @Test
+    public void onPublish() throws Exception {
+        IPlanExecutor executor = mock(IPlanExecutor.class);
+        PayloadFormatter payloadFormat = PayloadFormatManager.getPayloadFormat("json");
+        PublishHandler handler = new PublishHandler(executor, payloadFormat);
+
+        String payload = "{\n" +
+                "\"device\":\"root.sg.d1\",\n" +
+                "\"timestamp\":1586076045524,\n" +
+                "\"measurements\":[\"s1\"],\n" +
+                "\"values\":[0.530635]\n" +
+                "}";
+
+        ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+        MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("root.sg.d1", 1);
+        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1);
+
+        MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf);
+        InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, null, null);
+        handler.onPublish(message);
+        verify(executor).processNonQuery(any(InsertPlan.class));
+    }
+}
\ No newline at end of file
diff --git a/site/src/main/.vuepress/config.js b/site/src/main/.vuepress/config.js
index 719fcfe..818155a 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -347,7 +347,8 @@ var config = {
 							'4-Client/3-Programming - JDBC',
 							'4-Client/4-Programming - Other Languages',
 							'4-Client/5-Programming - TsFile API',
-							'4-Client/6-Status Codes',
+							'4-Client/6-Programming - MQTT',
+							'4-Client/7-Status Codes',
 						]
 					},
 					{