You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/12 10:44:28 UTC

[incubator-iotdb] branch master updated: [IOTDB-589] Upgrade moquette version to 0.13 (#1031)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ac5fcd6  [IOTDB-589] Upgrade moquette version to 0.13 (#1031)
ac5fcd6 is described below

commit ac5fcd6d7b0b792a4efa0fc261d87563d5751966
Author: Xin Wang <xi...@apache.org>
AuthorDate: Sun Apr 12 18:44:19 2020 +0800

    [IOTDB-589] Upgrade moquette version to 0.13 (#1031)
    
    * [IOTDB-589] Upgrade moquette version to 0.13
---
 LICENSE                                            |   8 -
 distribution/pom.xml                               |  13 +
 pom.xml                                            |  18 -
 server/pom.xml                                     |  14 +
 .../java/io/moquette/broker/MQTTConnection.java    | 503 ---------------------
 .../org/apache/iotdb/db/service/MQTTService.java   |   1 +
 6 files changed, 28 insertions(+), 529 deletions(-)

diff --git a/LICENSE b/LICENSE
index d784353..b88d620 100644
--- a/LICENSE
+++ b/LICENSE
@@ -215,11 +215,3 @@ The following class is modified from Apache commons-collections
 
 ./tsfile/src/main/java/org/apache/iotdb/tsfile/utils/Murmur128Hash.java
 Relevant pr is: https://github.com/apache/commons-collections/pull/83/
-
-------------
-
-The following class is modified from moquette (https://github.com/moquette-io/moquette),
-which is under Apache License 2.0:
-
-./server/src/main/java/io/moquette/broker/MQTTConnection.java
-Relevant pr is: https://github.com/moquette-io/moquette/pull/454
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 5faf8e3..8e3c140 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -83,6 +83,19 @@
             </plugin>
         </plugins>
     </build>
+    <repositories>
+        <!--   repository for moquette    -->
+        <repository>
+            <id>bintray</id>
+            <url>https://jcenter.bintray.com</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
     <dependencies>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
diff --git a/pom.xml b/pom.xml
index 56692a8..12e5bcc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,19 +136,6 @@
         <!-- By default, the argLine is empty-->
         <argLine/>
     </properties>
-    <repositories>
-        <!--   repository for moquette    -->
-        <repository>
-            <id>bintray</id>
-            <url>https://jcenter.bintray.com</url>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-    </repositories>
     <!--
         if we claim dependencies in dependencyManagement, then we do not claim
         their version in sub-project's pom, but we have to claim themselves again
@@ -474,11 +461,6 @@
                 <version>${airline.version}</version>
             </dependency>
             <dependency>
-                <groupId>io.moquette</groupId>
-                <artifactId>moquette-broker</artifactId>
-                <version>0.12.1</version>
-            </dependency>
-            <dependency>
                 <groupId>org.fusesource.mqtt-client</groupId>
                 <artifactId>mqtt-client</artifactId>
                 <version>1.12</version>
diff --git a/server/pom.xml b/server/pom.xml
index d18760a..b6cb127 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -34,6 +34,19 @@
         <iotdb.it.skip>${iotdb.test.skip}</iotdb.it.skip>
         <iotdb.ut.skip>${iotdb.test.skip}</iotdb.ut.skip>
     </properties>
+    <repositories>
+        <!--   repository for moquette    -->
+        <repository>
+            <id>bintray</id>
+            <url>https://jcenter.bintray.com</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
     <dependencies>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
@@ -104,6 +117,7 @@
         <dependency>
             <groupId>io.moquette</groupId>
             <artifactId>moquette-broker</artifactId>
+            <version>0.13</version>
         </dependency>
         <!-- for mocked test-->
         <dependency>
diff --git a/server/src/main/java/io/moquette/broker/MQTTConnection.java b/server/src/main/java/io/moquette/broker/MQTTConnection.java
deleted file mode 100644
index 4c4a8fa..0000000
--- a/server/src/main/java/io/moquette/broker/MQTTConnection.java
+++ /dev/null
@@ -1,503 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.moquette.broker;
-
-import io.moquette.broker.subscriptions.Topic;
-import io.moquette.broker.security.IAuthenticator;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.mqtt.*;
-import io.netty.handler.timeout.IdleStateHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
-import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
-import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
-import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
-import static io.netty.handler.codec.mqtt.MqttQoS.*;
-
-// NOTE:
-// override the MQTTConnection class in the moquette 0.12.1 jar to fix the PUBACK flush issue
-// https://github.com/moquette-io/moquette/pull/454
-// when moquette fixed version released, we can remove this.
-final class MQTTConnection {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MQTTConnection.class);
-
-    final Channel channel;
-    private BrokerConfiguration brokerConfig;
-    private IAuthenticator authenticator;
-    private SessionRegistry sessionRegistry;
-    private final PostOffice postOffice;
-    private boolean connected;
-    private final AtomicInteger lastPacketId = new AtomicInteger(0);
-
-    MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator,
-                   SessionRegistry sessionRegistry, PostOffice postOffice) {
-        this.channel = channel;
-        this.brokerConfig = brokerConfig;
-        this.authenticator = authenticator;
-        this.sessionRegistry = sessionRegistry;
-        this.postOffice = postOffice;
-        this.connected = false;
-    }
-
-    void handleMessage(MqttMessage msg) {
-        MqttMessageType messageType = msg.fixedHeader().messageType();
-        LOG.debug("Received MQTT message, type: {}, channel: {}", messageType, channel);
-        switch (messageType) {
-            case CONNECT:
-                processConnect((MqttConnectMessage) msg);
-                break;
-            case SUBSCRIBE:
-                processSubscribe((MqttSubscribeMessage) msg);
-                break;
-            case UNSUBSCRIBE:
-                processUnsubscribe((MqttUnsubscribeMessage) msg);
-                break;
-            case PUBLISH:
-                processPublish((MqttPublishMessage) msg);
-                break;
-            case PUBREC:
-                processPubRec(msg);
-                break;
-            case PUBCOMP:
-                processPubComp(msg);
-                break;
-            case PUBREL:
-                processPubRel(msg);
-                break;
-            case DISCONNECT:
-                processDisconnect(msg);
-                break;
-            case PUBACK:
-                processPubAck(msg);
-                break;
-            case PINGREQ:
-                MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, AT_MOST_ONCE,
-                        false, 0);
-                MqttMessage pingResp = new MqttMessage(pingHeader);
-                channel.writeAndFlush(pingResp).addListener(CLOSE_ON_FAILURE);
-                break;
-            default:
-                LOG.error("Unknown MessageType: {}, channel: {}", messageType, channel);
-                break;
-        }
-    }
-
-    private void processPubComp(MqttMessage msg) {
-        final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
-        final Session session = sessionRegistry.retrieve(getClientId());
-        session.processPubComp(messageID);
-    }
-
-    private void processPubRec(MqttMessage msg) {
-        final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
-        final Session session = sessionRegistry.retrieve(getClientId());
-        session.processPubRec(messageID);
-    }
-
-    static MqttMessage pubrel(int messageID) {
-        MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
-        return new MqttMessage(pubRelHeader, from(messageID));
-    }
-
-    private void processPubAck(MqttMessage msg) {
-        final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
-        Session session = sessionRegistry.retrieve(getClientId());
-        session.pubAckReceived(messageID);
-    }
-
-    void processConnect(MqttConnectMessage msg) {
-        MqttConnectPayload payload = msg.payload();
-        String clientId = payload.clientIdentifier();
-        final String username = payload.userName();
-        LOG.trace("Processing CONNECT message. CId={} username: {} channel: {}", clientId, username, channel);
-
-        if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
-            LOG.warn("MQTT protocol version is not valid. CId={} channel: {}", clientId, channel);
-            abortConnection(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
-            return;
-        }
-        final boolean cleanSession = msg.variableHeader().isCleanSession();
-        if (clientId == null || clientId.length() == 0) {
-            if (!brokerConfig.isAllowZeroByteClientId()) {
-                LOG.warn("Broker doesn't permit MQTT empty client ID. Username: {}, channel: {}", username, channel);
-                abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
-                return;
-            }
-
-            if (!cleanSession) {
-                LOG.warn("MQTT client ID cannot be empty for persistent session. Username: {}, channel: {}",
-                        username, channel);
-                abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
-                return;
-            }
-
-            // Generating client id.
-            clientId = UUID.randomUUID().toString().replace("-", "");
-            LOG.debug("Client has connected with integration generated id: {}, username: {}, channel: {}", clientId,
-                    username, channel);
-        }
-
-        if (!login(msg, clientId)) {
-            abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
-            channel.close().addListener(CLOSE_ON_FAILURE);
-            return;
-        }
-
-        try {
-            LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
-            sessionRegistry.bindToSession(this, msg, clientId);
-
-            initializeKeepAliveTimeout(channel, msg, clientId);
-            setupInflightResender(channel);
-
-            NettyUtils.clientID(channel, clientId);
-            LOG.trace("CONNACK sent, channel: {}", channel);
-            postOffice.dispatchConnection(msg);
-            LOG.trace("dispatch connection: {}", msg.toString());
-        } catch (SessionCorruptedException scex) {
-            LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", clientId, channel);
-            abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
-        }
-    }
-
-    private void setupInflightResender(Channel channel) {
-        channel.pipeline()
-                .addFirst("inflightResender", new InflightResender(5_000, TimeUnit.MILLISECONDS));
-    }
-
-    private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
-        int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
-        NettyUtils.keepAlive(channel, keepAlive);
-        NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
-        NettyUtils.clientID(channel, clientId);
-        int idleTime = Math.round(keepAlive * 1.5f);
-        setIdleTime(channel.pipeline(), idleTime);
-
-        LOG.debug("Connection has been configured CId={}, keepAlive={}, removeTemporaryQoS2={}, idleTime={}",
-                clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime);
-    }
-
-    private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
-        if (pipeline.names().contains("idleStateHandler")) {
-            pipeline.remove("idleStateHandler");
-        }
-        pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
-    }
-
-    private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
-        return msg.variableHeader().version() != version.protocolLevel();
-    }
-
-    private void abortConnection(MqttConnectReturnCode returnCode) {
-        MqttConnAckMessage badProto = connAck(returnCode, false);
-        channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
-        channel.close().addListener(CLOSE_ON_FAILURE);
-    }
-
-    private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) {
-        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
-                false, 0);
-        MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
-        return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
-    }
-
-    private boolean login(MqttConnectMessage msg, final String clientId) {
-        // handle user authentication
-        if (msg.variableHeader().hasUserName()) {
-            byte[] pwd = null;
-            if (msg.variableHeader().hasPassword()) {
-                pwd = msg.payload().password().getBytes(StandardCharsets.UTF_8);
-            } else if (!brokerConfig.isAllowAnonymous()) {
-                LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
-                return false;
-            }
-            final String login = msg.payload().userName();
-            if (!authenticator.checkValid(clientId, login, pwd)) {
-                LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
-                return false;
-            }
-            NettyUtils.userName(channel, login);
-        } else if (!brokerConfig.isAllowAnonymous()) {
-            LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
-            return false;
-        }
-        return true;
-    }
-
-    void handleConnectionLost() {
-        String clientID = NettyUtils.clientID(channel);
-        if (clientID == null || clientID.isEmpty()) {
-            return;
-        }
-        LOG.info("Notifying connection lost event. CId: {}, channel: {}", clientID, channel);
-        Session session = sessionRegistry.retrieve(clientID);
-        if (session.hasWill()) {
-            postOffice.fireWill(session.getWill());
-        }
-        if (session.isClean()) {
-            sessionRegistry.remove(clientID);
-        } else {
-            sessionRegistry.disconnect(clientID);
-        }
-        connected = false;
-        //dispatch connection lost to intercept.
-        String userName = NettyUtils.userName(channel);
-        postOffice.dispatchConnectionLost(clientID,userName);
-        LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
-    }
-
-    void sendConnAck(boolean isSessionAlreadyPresent) {
-        connected = true;
-        final MqttConnAckMessage ackMessage = connAck(CONNECTION_ACCEPTED, isSessionAlreadyPresent);
-        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
-    }
-
-    boolean isConnected() {
-        return connected;
-    }
-
-    void dropConnection() {
-        channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
-    }
-
-    void processDisconnect(MqttMessage msg) {
-        final String clientID = NettyUtils.clientID(channel);
-        LOG.trace("Start DISCONNECT CId={}, channel: {}", clientID, channel);
-        if (!connected) {
-            LOG.info("DISCONNECT received on already closed connection, CId={}, channel: {}", clientID, channel);
-            return;
-        }
-        sessionRegistry.disconnect(clientID);
-        connected = false;
-        channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
-        LOG.trace("Processed DISCONNECT CId={}, channel: {}", clientID, channel);
-        String userName = NettyUtils.userName(channel);
-        postOffice.dispatchDisconnection(clientID,userName);
-        LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
-    }
-
-    void processSubscribe(MqttSubscribeMessage msg) {
-        final String clientID = NettyUtils.clientID(channel);
-        if (!connected) {
-            LOG.warn("SUBSCRIBE received on already closed connection, CId={}, channel: {}", clientID, channel);
-            dropConnection();
-            return;
-        }
-        postOffice.subscribeClientToTopics(msg, clientID, NettyUtils.userName(channel), this);
-    }
-
-    void sendSubAckMessage(int messageID, MqttSubAckMessage ackMessage) {
-        final String clientId = NettyUtils.clientID(channel);
-        LOG.trace("Sending SUBACK response CId={}, messageId: {}", clientId, messageID);
-        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
-    }
-
-    private void processUnsubscribe(MqttUnsubscribeMessage msg) {
-        List<String> topics = msg.payload().topics();
-        String clientID = NettyUtils.clientID(channel);
-
-        LOG.trace("Processing UNSUBSCRIBE message. CId={}, topics: {}", clientID, topics);
-        postOffice.unsubscribe(topics, this, msg.variableHeader().messageId());
-    }
-
-    void sendUnsubAckMessage(List<String> topics, String clientID, int messageID) {
-        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_MOST_ONCE,
-                false, 0);
-        MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));
-
-        LOG.trace("Sending UNSUBACK message. CId={}, messageId: {}, topics: {}", clientID, messageID, topics);
-        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
-        LOG.trace("Client <{}> unsubscribed from topics <{}>", clientID, topics);
-    }
-
-    void processPublish(MqttPublishMessage msg) {
-        final MqttQoS qos = msg.fixedHeader().qosLevel();
-        final String username = NettyUtils.userName(channel);
-        final String topicName = msg.variableHeader().topicName();
-        final String clientId = getClientId();
-        LOG.trace("Processing PUBLISH message. CId={}, topic: {}, messageId: {}, qos: {}", clientId, topicName,
-                msg.variableHeader().packetId(), qos);
-        ByteBuf payload = msg.payload();
-        final boolean retain = msg.fixedHeader().isRetain();
-        final Topic topic = new Topic(topicName);
-        if (!topic.isValid()) {
-            LOG.debug("Drop connection because of invalid topic format");
-            dropConnection();
-        }
-        switch (qos) {
-            case AT_MOST_ONCE:
-                postOffice.receivedPublishQos0(topic, username, clientId, payload, retain, msg);
-                break;
-            case AT_LEAST_ONCE: {
-                final int messageID = msg.variableHeader().packetId();
-                postOffice.receivedPublishQos1(this, topic, username, payload, messageID, retain, msg);
-                break;
-            }
-            case EXACTLY_ONCE: {
-                final int messageID = msg.variableHeader().packetId();
-                final Session session = sessionRegistry.retrieve(clientId);
-                session.receivedPublishQos2(messageID, msg);
-                postOffice.receivedPublishQos2(this, msg, username);
-//                msg.release();
-                break;
-            }
-            default:
-                LOG.error("Unknown QoS-Type:{}", qos);
-                break;
-        }
-    }
-
-    void sendPublishReceived(int messageID) {
-        LOG.trace("sendPubRec invoked on channel: {}", channel);
-        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE,
-                false, 0);
-        MqttPubAckMessage pubRecMessage = new MqttPubAckMessage(fixedHeader, from(messageID));
-        sendIfWritableElseDrop(pubRecMessage);
-    }
-
-    private void processPubRel(MqttMessage msg) {
-        final Session session = sessionRegistry.retrieve(getClientId());
-        final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
-        session.receivedPubRelQos2(messageID);
-        sendPubCompMessage(messageID);
-    }
-
-    void sendPublish(MqttPublishMessage publishMsg) {
-        final int packetId = publishMsg.variableHeader().packetId();
-        final String topicName = publishMsg.variableHeader().topicName();
-        final String clientId = getClientId();
-        MqttQoS qos = publishMsg.fixedHeader().qosLevel();
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}, payload={}", qos, packetId,
-                    clientId, topicName, DebugUtils.payload2Str(publishMsg.payload()));
-        } else {
-            LOG.debug("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}", qos, packetId, clientId,
-                    topicName);
-        }
-        sendIfWritableElseDrop(publishMsg);
-    }
-
-    void sendIfWritableElseDrop(MqttMessage msg) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("OUT {} on channel {}", msg.fixedHeader().messageType(), channel);
-        }
-        if (channel.isWritable()) {
-            // flushing PUBACK immediately
-            channel.writeAndFlush(msg).addListener(FIRE_EXCEPTION_ON_FAILURE);
-        }
-    }
-
-    public void writabilityChanged() {
-        if (channel.isWritable()) {
-            LOG.debug("Channel {} is again writable", channel);
-            final Session session = sessionRegistry.retrieve(getClientId());
-            session.writabilityChanged();
-        }
-    }
-
-    void sendPubAck(int messageID) {
-        LOG.trace("sendPubAck invoked");
-        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE,
-                false, 0);
-        MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, from(messageID));
-        sendIfWritableElseDrop(pubAckMessage);
-    }
-
-    private void sendPubCompMessage(int messageID) {
-        LOG.trace("Sending PUBCOMP message on channel: {}, messageId: {}", channel, messageID);
-        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
-        MqttMessage pubCompMessage = new MqttMessage(fixedHeader, from(messageID));
-        sendIfWritableElseDrop(pubCompMessage);
-    }
-
-    String getClientId() {
-        return NettyUtils.clientID(channel);
-    }
-
-    String getUsername() {
-        return NettyUtils.userName(channel);
-    }
-
-    public void sendPublishRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
-        MqttPublishMessage publishMsg = retainedPublish(topic.toString(), qos, payload);
-        sendPublish(publishMsg);
-    }
-
-    public void sendPublishRetainedWithPacketId(Topic topic, MqttQoS qos, ByteBuf payload) {
-        final int packetId = nextPacketId();
-        MqttPublishMessage publishMsg = retainedPublishWithMessageId(topic.toString(), qos, payload, packetId);
-        sendPublish(publishMsg);
-    }
-
-    private static MqttPublishMessage retainedPublish(String topic, MqttQoS qos, ByteBuf message) {
-        return retainedPublishWithMessageId(topic, qos, message, 0);
-    }
-
-    private static MqttPublishMessage retainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message,
-                                                                   int messageId) {
-        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, true, 0);
-        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
-        return new MqttPublishMessage(fixedHeader, varHeader, message);
-    }
-
-    // TODO move this method in Session
-    void sendPublishNotRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
-        MqttPublishMessage publishMsg = notRetainedPublish(topic.toString(), qos, payload);
-        sendPublish(publishMsg);
-    }
-
-    static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
-        return notRetainedPublishWithMessageId(topic, qos, message, 0);
-    }
-
-    static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message,
-                                                              int messageId) {
-        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
-        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
-        return new MqttPublishMessage(fixedHeader, varHeader, message);
-    }
-
-    public void resendNotAckedPublishes() {
-        final Session session = sessionRegistry.retrieve(getClientId());
-        session.resendInflightNotAcked();
-    }
-
-    int nextPacketId() {
-        return lastPacketId.incrementAndGet();
-    }
-
-    @Override
-    public String toString() {
-        return "MQTTConnection{channel=" + channel + ", connected=" + connected + '}';
-    }
-
-    InetSocketAddress remoteAddress() {
-        return (InetSocketAddress) channel.remoteAddress();
-    }
-}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java b/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java
index a49c02c..a024cef 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/MQTTService.java
@@ -76,6 +76,7 @@ public class MQTTService implements IService {
         properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, iotDBConfig.getMqttHost());
         properties.setProperty(BrokerConstants.PORT_PROPERTY_NAME, String.valueOf(iotDBConfig.getMqttPort()));
         properties.setProperty(BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, String.valueOf(iotDBConfig.getMqttHandlerPoolSize()));
+        properties.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, "true");
         return new MemoryConfig(properties);
     }