You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/24 08:01:36 UTC

[iotdb] 01/02: Change previous IT from server module to integration-test

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch UTToIT
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b457c0a7afe01f3830465d78c3684a672e5c6c6f
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Oct 24 15:42:07 2022 +0800

    Change previous IT from server module to integration-test
---
 integration-test/import-control.xml                |   9 +
 .../java/org/apache/iotdb/it/env/AbstractEnv.java  |   6 +
 .../org/apache/iotdb/it/env/DataNodeWrapper.java   |   8 +
 .../java/org/apache/iotdb/it/env/MppConfig.java    |  30 +++
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   |   5 +
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |  40 ++++
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   2 +
 .../org/apache/iotdb/db/it/env/StandaloneEnv.java  |   5 +
 .../iotdb/db/it/env/StandaloneEnvConfig.java       |  55 ++++++
 .../apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java   |  68 +++----
 .../iotdb/db/it/watermark/IoTDBWatermarkIT.java    | 179 ++++++++---------
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../iotdb/db/protocol/mqtt/PublishHandler.java     |   4 -
 .../iotdb/db/protocol/mqtt/PublishHandlerTest.java | 132 -------------
 .../apache/iotdb/db/sink/LocalIoTDBSinkTest.java   | 218 ---------------------
 15 files changed, 286 insertions(+), 477 deletions(-)

diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index d33888162c..95c2749b8f 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -35,6 +35,11 @@
   <allow pkg="org\.slf4j.*" regex="true" />
   <subpackage name="db.it">
     <disallow pkg="org.apache.iotdb.jdbc.*"/>
+    <allow class="org.apache.iotdb.db.tools.watermark.WatermarkDetector" />
+    <allow class="io.netty.buffer.ByteBuf" />
+    <allow class="io.netty.buffer.Unpooled" />
+    <allow class="org.fusesource.mqtt.client.QoS" />
+    <allow class="org.apache.iotdb.commons.path.PartialPath" />
     <allow pkg="java.text"/>
     <allow pkg="org.apache.iotdb.db.it.utils" />
     <allow pkg="org\.apache\.iotdb\.db\.it\.utils\.TestUtils.*" regex="true"/>
@@ -46,6 +51,10 @@
     <allow pkg="org\.apache\.iotdb\.tsfile\.read.*" regex="true" />
     <allow pkg="org\.apache\.iotdb\.tsfile\.utils.*" regex="true" />
     <allow pkg="org\.apache\.iotdb\.tsfile\.write.*" regex="true" />
+    <allow pkg="org\.apache\.iotdb\.db\.protocol\.mqtt.*" regex="true" />
+    <allow pkg="io\.moquette\.interception\.messages.*" regex="true" />
+    <allow pkg="io\.netty\.handler\.codec\.mqtt.*" regex="true" />
+    <allow pkg="org\.apache\.iotdb\.db\.engine\.trigger\.sink\.mqtt.*" regex="true" />
   </subpackage>
   <subpackage name="confignode.it">
     <allow class="java.nio.ByteBuffer" />
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index ffaa5898f3..994eb323c5 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -479,4 +479,10 @@ public abstract class AbstractEnv implements BaseEnv {
   public void shutdownDataNode(int index) {
     dataNodeWrapperList.get(index).stop();
   }
+
+  @Override
+  public int getMqttPort() {
+    int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size());
+    return dataNodeWrapperList.get(randomIndex).getMqttPort();
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
index 27f34f275b..b11b829219 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
@@ -32,6 +32,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   private final int internalPort;
   private final int dataRegionConsensusPort;
   private final int schemaRegionConsensusPort;
+  private final int mqttPort;
 
   public DataNodeWrapper(
       String targetConfigNode, String testClassName, String testMethodName, int[] portList) {
@@ -41,6 +42,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     this.internalPort = portList[2];
     this.dataRegionConsensusPort = portList[3];
     this.schemaRegionConsensusPort = portList[4];
+    this.mqttPort = portList[5];
   }
 
   @Override
@@ -54,6 +56,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
         "data_region_consensus_port", String.valueOf(this.dataRegionConsensusPort));
     properties.setProperty(
         "schema_region_consensus_port", String.valueOf(this.schemaRegionConsensusPort));
+    properties.setProperty("mqtt_host", super.getIp());
+    properties.setProperty("mqtt_port", String.valueOf(this.mqttPort));
     properties.setProperty("connection_timeout_ms", "30000");
     if (this.targetConfigNode != null) {
       properties.setProperty(IoTDBConstant.TARGET_CONFIG_NODES, this.targetConfigNode);
@@ -106,4 +110,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   public int getSchemaRegionConsensusPort() {
     return schemaRegionConsensusPort;
   }
+
+  public int getMqttPort() {
+    return mqttPort;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index a1c1b8dcfd..5db9045ba4 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -281,4 +281,34 @@ public class MppConfig implements BaseConfig {
     engineProperties.setProperty("max_degree_of_index_node", String.valueOf(maxDegreeOfIndexNode));
     return this;
   }
+
+  @Override
+  public BaseConfig setEnableWatermark(boolean enableWatermark) {
+    engineProperties.setProperty("watermark_module_opened", String.valueOf(enableWatermark));
+    return this;
+  }
+
+  @Override
+  public BaseConfig setWatermarkSecretKey(String watermarkSecretKey) {
+    engineProperties.setProperty("watermark_secret_key", watermarkSecretKey);
+    return this;
+  }
+
+  @Override
+  public BaseConfig setWatermarkBitString(String watermarkBitString) {
+    engineProperties.setProperty("watermark_bit_string", watermarkBitString);
+    return this;
+  }
+
+  @Override
+  public BaseConfig setWatermarkMethod(String watermarkMethod) {
+    engineProperties.setProperty("watermark_method", watermarkMethod);
+    return this;
+  }
+
+  @Override
+  public BaseConfig setEnableMQTTService(boolean enableMQTTService) {
+    engineProperties.setProperty("enable_mqtt_service", String.valueOf(enableMQTTService));
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index f3d4ad410e..a7ac83d9de 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -182,4 +182,9 @@ public class RemoteServerEnv implements BaseEnv {
   public void shutdownDataNode(int index) {
     getDataNodeWrapperList().get(index).stop();
   }
+
+  @Override
+  public int getMqttPort() {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 90a69dbc26..8b560e4068 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -286,4 +286,44 @@ public interface BaseConfig {
   default int getMaxDegreeOfIndexNode() {
     return 256;
   }
+
+  default BaseConfig setEnableWatermark(boolean enableWatermark) {
+    return this;
+  }
+
+  default boolean isEnableWatermark() {
+    return false;
+  }
+
+  default String getWatermarkSecretKey() {
+    return "IoTDB*2019@Beijing";
+  }
+
+  default BaseConfig setWatermarkSecretKey(String watermarkSecretKey) {
+    return this;
+  }
+
+  default String getWatermarkBitString() {
+    return "100101110100";
+  }
+
+  default BaseConfig setWatermarkBitString(String watermarkBitString) {
+    return this;
+  }
+
+  default String getWatermarkMethod() {
+    return "GroupBasedLSBMethod(embed_row_cycle=2,embed_lsb_num=5)";
+  }
+
+  default BaseConfig setWatermarkMethod(String watermarkMethod) {
+    return this;
+  }
+
+  default boolean isEnableMQTTService() {
+    return false;
+  }
+
+  default BaseConfig setEnableMQTTService(boolean enableMQTTService) {
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 91ca65b99e..2f97f3b37c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -143,4 +143,6 @@ public interface BaseEnv {
   void startDataNode(int index);
 
   void shutdownDataNode(int index);
+
+  int getMqttPort();
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 02210a36ca..c11a6cb937 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -202,4 +202,9 @@ public class StandaloneEnv implements BaseEnv {
   public void shutdownDataNode(int index) {
     // Do nothing
   }
+
+  @Override
+  public int getMqttPort() {
+    return 1883;
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
index c40e3141d2..9157c6922e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
@@ -271,4 +271,59 @@ public class StandaloneEnvConfig implements BaseConfig {
   public int getMaxDegreeOfIndexNode() {
     return TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode();
   }
+
+  @Override
+  public BaseConfig setEnableWatermark(boolean enableWatermark) {
+    IoTDBDescriptor.getInstance().getConfig().setEnableWatermark(enableWatermark);
+    return this;
+  }
+
+  @Override
+  public boolean isEnableWatermark() {
+    return IoTDBDescriptor.getInstance().getConfig().isEnableWatermark();
+  }
+
+  @Override
+  public String getWatermarkSecretKey() {
+    return IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey();
+  }
+
+  @Override
+  public BaseConfig setWatermarkSecretKey(String watermarkSecretKey) {
+    IoTDBDescriptor.getInstance().getConfig().setWatermarkSecretKey(watermarkSecretKey);
+    return this;
+  }
+
+  @Override
+  public String getWatermarkBitString() {
+    return IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString();
+  }
+
+  @Override
+  public BaseConfig setWatermarkBitString(String watermarkBitString) {
+    IoTDBDescriptor.getInstance().getConfig().setWatermarkBitString(watermarkBitString);
+    return this;
+  }
+
+  @Override
+  public String getWatermarkMethod() {
+    return IoTDBDescriptor.getInstance().getConfig().getWatermarkMethod();
+  }
+
+  @Override
+  public BaseConfig setWatermarkMethod(String watermarkMethod) {
+    IoTDBDescriptor.getInstance().getConfig().setWatermarkMethod(watermarkMethod);
+    return this;
+  }
+
+  @Override
+  public boolean isEnableMQTTService() {
+    return IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService();
+  }
+
+  @Override
+  public BaseConfig setEnableMQTTService(boolean enableMQTTService) {
+    IoTDBDescriptor.getInstance().getConfig().setEnableMQTTService(enableMQTTService);
+    return this;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java
similarity index 76%
rename from server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java
rename to integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java
index 6ae304ad96..d4c6bccf77 100644
--- a/server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java
@@ -16,25 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.iotdb.db.sink;
+package org.apache.iotdb.db.it.mqtt;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTConfiguration;
 import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTEvent;
 import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTHandler;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
 
 import org.fusesource.mqtt.client.QoS;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
 
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -42,24 +43,28 @@ import java.sql.Statement;
 import java.sql.Types;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-@SuppressWarnings("squid:S2925")
-public class MQTTSinkTest {
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBMQTTSinkIT {
+
+  private boolean enableMQTTService;
 
   @Before
   public void setUp() throws Exception {
-    IoTDBDescriptor.getInstance().getConfig().setEnableMQTTService(true);
-    EnvironmentUtils.envSetUp();
+    enableMQTTService = ConfigFactory.getConfig().isEnableMQTTService();
+    ConfigFactory.getConfig().setEnableMQTTService(true);
+    EnvFactory.getEnv().initBeforeTest();
   }
 
   @After
   public void tearDown() throws Exception {
-    EnvironmentUtils.cleanEnv();
+    EnvFactory.getEnv().cleanAfterTest();
+    ConfigFactory.getConfig().setEnableMQTTService(enableMQTTService);
   }
 
   @Test
@@ -68,31 +73,29 @@ public class MQTTSinkTest {
     mqttHandler.open(
         new MQTTConfiguration(
             "127.0.0.1",
-            1883,
+            EnvFactory.getEnv().getMqttPort(),
             "root",
             "root",
             new PartialPath("root.sg1.d1"),
             new String[] {"s1"}));
 
-    for (int i = 0; i < 10000; ++i) {
+    for (int i = 0; i < 5; ++i) {
       mqttHandler.onEvent(new MQTTEvent("test", QoS.EXACTLY_ONCE, false, i, i));
     }
 
     mqttHandler.close();
 
-    await().atMost(1, MINUTES).until(() -> 10000 == checkSingleSensorHandlerResult());
+    TimeUnit.SECONDS.sleep(10);
+
+    assertEquals(5, checkSingleSensorHandlerResult());
   }
 
-  private int checkSingleSensorHandlerResult() throws ClassNotFoundException {
+  private int checkSingleSensorHandlerResult() {
     int count = 0;
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+    try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
-      Assert.assertTrue(statement.execute("select * from root.**"));
+      try (ResultSet resultSet = statement.executeQuery("select * from root.**")) {
 
-      try (ResultSet resultSet = statement.getResultSet()) {
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
 
         checkHeader(
@@ -122,13 +125,13 @@ public class MQTTSinkTest {
     mqttHandler.open(
         new MQTTConfiguration(
             "127.0.0.1",
-            1883,
+            EnvFactory.getEnv().getMqttPort(),
             "root",
             "root",
             new PartialPath("root.sg1.d1"),
             new String[] {"s1", "s2", "s3", "s4", "s5", "s6"}));
 
-    for (int i = 0; i < 10000; ++i) {
+    for (int i = 0; i < 5; ++i) {
       mqttHandler.onEvent(
           new MQTTEvent(
               "test",
@@ -145,19 +148,18 @@ public class MQTTSinkTest {
 
     mqttHandler.close();
 
-    await().atMost(1, MINUTES).until(() -> 10000 == checkMultiSensorsHandlerResult());
+    TimeUnit.SECONDS.sleep(10);
+
+    assertEquals(5, checkMultiSensorsHandlerResult());
   }
 
-  private int checkMultiSensorsHandlerResult() throws ClassNotFoundException {
+  private int checkMultiSensorsHandlerResult() {
     int count = 0;
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+    try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
-      Assert.assertTrue(statement.execute("select * from root.**"));
 
-      try (ResultSet resultSet = statement.getResultSet()) {
+      try (ResultSet resultSet = statement.executeQuery("select * from root.**")) {
+
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
 
         checkHeader(
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java
similarity index 53%
rename from server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java
rename to integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java
index 30a09168e5..1b54f3cf67 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java
@@ -16,41 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.tools;
+package org.apache.iotdb.db.it.watermark;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.constant.TestConstant;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.tools.watermark.WatermarkDetector;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.TestConstant;
 
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
 
+import static org.apache.iotdb.itbase.constant.TestConstant.BASE_OUTPUT_PATH;
 import static org.junit.Assert.fail;
 
-/**
- * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
- * IoTDB server should be defined as integration test.
- */
-public class IoTDBWatermarkTest {
+@Ignore // TODO add it back when we support watermark in mpp mode
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBWatermarkIT {
 
-  private static String filePath1 =
-      TestConstant.BASE_OUTPUT_PATH.concat("watermarked_query_result.csv");
-  private static String filePath2 =
-      TestConstant.BASE_OUTPUT_PATH.concat("notWatermarked_query_result.csv");
+  private static final String filePath1 = BASE_OUTPUT_PATH.concat("watermarked_query_result.csv");
+  private static final String filePath2 =
+      BASE_OUTPUT_PATH.concat(File.separator).concat("notWatermarked_query_result.csv");
   private static PrintWriter writer1;
   private static PrintWriter writer2;
   private static String secretKey = "ASDFGHJKL";
@@ -58,19 +57,33 @@ public class IoTDBWatermarkTest {
   private static int embed_row_cycle = 5;
   private static int embed_lsb_num = 5;
 
+  private boolean originEnableWatermark;
+
+  private String originWatermarkSecretKey;
+
+  private String originWatermarkBitString;
+
+  private String originWatermarkMethod;
+
   @Before
   public void setUp() throws Exception {
-    IoTDBDescriptor.getInstance().getConfig().setEnableWatermark(true); // default false
-    IoTDBDescriptor.getInstance().getConfig().setWatermarkSecretKey(secretKey);
-    IoTDBDescriptor.getInstance().getConfig().setWatermarkBitString(watermarkBitString);
-    IoTDBDescriptor.getInstance()
-        .getConfig()
+
+    originEnableWatermark = ConfigFactory.getConfig().isEnableWatermark();
+    originWatermarkSecretKey = ConfigFactory.getConfig().getWatermarkSecretKey();
+    originWatermarkBitString = ConfigFactory.getConfig().getWatermarkBitString();
+    originWatermarkMethod = ConfigFactory.getConfig().getWatermarkMethod();
+
+    ConfigFactory.getConfig().setEnableWatermark(true);
+    ConfigFactory.getConfig().setWatermarkSecretKey(secretKey);
+    ConfigFactory.getConfig().setWatermarkBitString(watermarkBitString);
+    ConfigFactory.getConfig()
         .setWatermarkMethod(
             String.format(
                 "GroupBasedLSBMethod" + "(embed_row_cycle=%d,embed_lsb_num=%d)",
                 embed_row_cycle, embed_lsb_num));
 
-    EnvironmentUtils.envSetUp();
+    EnvFactory.getEnv().initBeforeTest();
+
     insertData();
 
     File file1 = new File(filePath1);
@@ -98,14 +111,16 @@ public class IoTDBWatermarkTest {
     if (file2.exists()) {
       file2.delete();
     }
-    EnvironmentUtils.cleanEnv();
+    EnvFactory.getEnv().cleanAfterTest();
+
+    ConfigFactory.getConfig().setEnableWatermark(originEnableWatermark);
+    ConfigFactory.getConfig().setWatermarkSecretKey(originWatermarkSecretKey);
+    ConfigFactory.getConfig().setWatermarkBitString(originWatermarkBitString);
+    ConfigFactory.getConfig().setWatermarkMethod(originWatermarkMethod);
   }
 
-  private static void insertData() throws ClassNotFoundException {
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+  private static void insertData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
 
       String[] create_sql =
@@ -123,7 +138,7 @@ public class IoTDBWatermarkTest {
         String sql =
             String.format(
                 "insert into root.vehicle.d0(timestamp,s0,s2) values(%s,%s,%s)",
-                time, time % 50, time % 50, time % 50);
+                time, time % 50, time % 50);
         statement.execute(sql);
         if (time % 10 == 0) {
           sql =
@@ -139,36 +154,24 @@ public class IoTDBWatermarkTest {
   }
 
   @Test
-  public void EncodeAndDecodeTest1()
-      throws IOException, ClassNotFoundException, LogicalOperatorException {
+  public void EncodeAndDecodeTest1() {
     // Watermark Embedding
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+    try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
       statement.execute("GRANT WATERMARK_EMBEDDING TO root");
-      boolean hasResultSet = statement.execute("SELECT s0,s1,s2 FROM root.vehicle.d0");
-      Assert.assertTrue(hasResultSet);
-      ResultSet resultSet = statement.getResultSet();
-      try {
+      try (ResultSet resultSet = statement.executeQuery("SELECT s0,s1,s2 FROM root.vehicle.d0")) {
         while (resultSet.next()) {
           String ans =
               resultSet.getString(TestConstant.TIMESTAMP_STR)
                   + ","
-                  + resultSet.getString(
-                      TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s0)
+                  + resultSet.getString(TestConstant.d0 + "." + TestConstant.s0)
                   + ","
-                  + resultSet.getString(
-                      TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s1)
+                  + resultSet.getString(TestConstant.d0 + "." + TestConstant.s1)
                   + ","
-                  + resultSet.getString(
-                      TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s2);
+                  + resultSet.getString(TestConstant.d0 + "." + TestConstant.s2);
           writer1.println(ans);
         }
         writer1.close();
-      } finally {
-        resultSet.close();
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -178,50 +181,43 @@ public class IoTDBWatermarkTest {
     // Watermark Detection
     double alpha = 0.1;
     int columnIndex = 1;
-    boolean isWatermarked =
-        WatermarkDetector.isWatermarked(
-            filePath1,
-            secretKey,
-            watermarkBitString,
-            embed_row_cycle,
-            embed_lsb_num,
-            alpha,
-            columnIndex,
-            "int");
-    Assert.assertTrue(isWatermarked);
+    try {
+      boolean isWatermarked =
+          WatermarkDetector.isWatermarked(
+              filePath1,
+              secretKey,
+              watermarkBitString,
+              embed_row_cycle,
+              embed_lsb_num,
+              alpha,
+              columnIndex,
+              "int");
+      Assert.assertTrue(isWatermarked);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
   }
 
   @Test
-  public void EncodeAndDecodeTest2()
-      throws IOException, ClassNotFoundException, LogicalOperatorException {
+  public void EncodeAndDecodeTest2() {
     // No Watermark Embedding
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+    try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
       statement.execute("REVOKE WATERMARK_EMBEDDING FROM root");
-      boolean hasResultSet = statement.execute("SELECT s0,s1,s2 FROM root.vehicle.d0");
-      Assert.assertTrue(hasResultSet);
-      ResultSet resultSet = statement.getResultSet();
-      try {
+      try (ResultSet resultSet = statement.executeQuery("SELECT s0,s1,s2 FROM root.vehicle.d0")) {
         while (resultSet.next()) {
           String ans =
               resultSet.getString(TestConstant.TIMESTAMP_STR)
                   + ","
-                  + resultSet.getString(
-                      TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s0)
+                  + resultSet.getString(TestConstant.d0 + "." + TestConstant.s0)
                   + ","
-                  + resultSet.getString(
-                      TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s1)
+                  + resultSet.getString(TestConstant.d0 + "." + TestConstant.s1)
                   + ","
-                  + resultSet.getString(
-                      TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s2);
+                  + resultSet.getString(TestConstant.d0 + "." + TestConstant.s2);
           writer2.println(ans);
         }
         writer2.close();
-      } finally {
-        resultSet.close();
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -231,16 +227,21 @@ public class IoTDBWatermarkTest {
     // Watermark Detection
     double alpha = 0.1;
     int columnIndex = 1;
-    boolean isWatermarked =
-        WatermarkDetector.isWatermarked(
-            filePath2,
-            secretKey,
-            watermarkBitString,
-            embed_row_cycle,
-            embed_lsb_num,
-            alpha,
-            columnIndex,
-            "int");
-    Assert.assertFalse(isWatermarked);
+    try {
+      boolean isWatermarked =
+          WatermarkDetector.isWatermarked(
+              filePath2,
+              secretKey,
+              watermarkBitString,
+              embed_row_cycle,
+              embed_lsb_num,
+              alpha,
+              columnIndex,
+              "int");
+      Assert.assertFalse(isWatermarked);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 72c006533b..16a2890956 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -2123,7 +2123,7 @@ public class IoTDBConfig {
     this.watermarkBitString = watermarkBitString;
   }
 
-  String getWatermarkMethod() {
+  public String getWatermarkMethod() {
     return this.watermarkMethod;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 7fb4ee82b9..2a1f544101 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -53,10 +53,6 @@ public class PublishHandler extends AbstractInterceptHandler {
     this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
   }
 
-  protected PublishHandler(PayloadFormatter payloadFormat) {
-    this.payloadFormat = payloadFormat;
-  }
-
   @Override
   public String getID() {
     return "iotdb-mqtt-broker-listener";
diff --git a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
deleted file mode 100644
index f1b1a20b1a..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
-
-import io.moquette.interception.messages.InterceptConnectMessage;
-import io.moquette.interception.messages.InterceptDisconnectMessage;
-import io.moquette.interception.messages.InterceptPublishMessage;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.mqtt.MqttConnectMessage;
-import io.netty.handler.codec.mqtt.MqttConnectPayload;
-import io.netty.handler.codec.mqtt.MqttFixedHeader;
-import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.Statement;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class PublishHandlerTest {
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    EnvironmentUtils.envSetUp();
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    EnvironmentUtils.cleanEnv();
-  }
-
-  @Test
-  public void onPublish() throws ClassNotFoundException {
-    PayloadFormatter payloadFormat = PayloadFormatManager.getPayloadFormat("json");
-    PublishHandler handler = new PublishHandler(payloadFormat);
-    String clientId = "clientId";
-
-    String payload =
-        "{\n"
-            + "\"device\":\"root.sg.d1\",\n"
-            + "\"timestamp\":1586076045524,\n"
-            + "\"measurements\":[\"s1\"],\n"
-            + "\"values\":[0.530635]\n"
-            + "}";
-
-    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-
-    // connect
-    MqttConnectPayload mqttConnectPayload =
-        new MqttConnectPayload(
-            clientId,
-            null,
-            "test".getBytes(StandardCharsets.UTF_8),
-            "root",
-            "root".getBytes(StandardCharsets.UTF_8));
-    MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(null, null, mqttConnectPayload);
-    InterceptConnectMessage interceptConnectMessage =
-        new InterceptConnectMessage(mqttConnectMessage);
-    handler.onConnect(interceptConnectMessage);
-
-    // publish
-    MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("root.sg.d1", 1);
-    MqttFixedHeader fixedHeader =
-        new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1);
-    MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf);
-    InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, clientId, null);
-    handler.onPublish(message);
-
-    // disconnect
-    InterceptDisconnectMessage interceptDisconnectMessage =
-        new InterceptDisconnectMessage(clientId, null);
-    handler.onDisconnect(interceptDisconnectMessage);
-
-    String[] retArray = new String[] {"1586076045524,0.530635,"};
-
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-        Statement statement = connection.createStatement()) {
-      boolean hasResultSet = statement.execute("select * from root.sg.d1");
-      Assert.assertTrue(hasResultSet);
-
-      try (ResultSet resultSet = statement.getResultSet()) {
-        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-        int cnt = 0;
-        while (resultSet.next()) {
-          StringBuilder builder = new StringBuilder();
-          for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
-            builder.append(resultSet.getString(i)).append(",");
-          }
-          assertEquals(retArray[cnt], builder.toString());
-          cnt++;
-        }
-        assertEquals(retArray.length, cnt);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-  }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java b/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java
deleted file mode 100644
index 83f2c273d8..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.sink;
-
-import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBConfiguration;
-import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBEvent;
-import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBHandler;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class LocalIoTDBSinkTest {
-
-  @Before
-  public void setUp() throws Exception {
-    EnvironmentUtils.envSetUp();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    EnvironmentUtils.cleanEnv();
-  }
-
-  @Test
-  public void onEventUsingSingleSensorHandler() throws Exception {
-    LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
-    localIoTDBHandler.open(
-        new LocalIoTDBConfiguration(
-            "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.INT32}));
-
-    for (int i = 0; i < 10000; ++i) {
-      localIoTDBHandler.onEvent(new LocalIoTDBEvent(i, i));
-    }
-
-    localIoTDBHandler.close();
-
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-        Statement statement = connection.createStatement()) {
-      Assert.assertTrue(statement.execute("select * from root.**"));
-
-      try (ResultSet resultSet = statement.getResultSet()) {
-        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-
-        checkHeader(
-            resultSetMetaData,
-            "Time,root.sg1.d1.s1,",
-            new int[] {
-              Types.TIMESTAMP, Types.INTEGER,
-            });
-
-        int count = 0;
-        while (resultSet.next()) {
-          for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
-            assertEquals(count, Double.parseDouble(resultSet.getString(i)), 0.0);
-          }
-          count++;
-        }
-        Assert.assertEquals(10000, count);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-  }
-
-  @Test
-  public void onEventUsingMultiSensorsHandler() throws Exception {
-    LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
-    localIoTDBHandler.open(
-        new LocalIoTDBConfiguration(
-            "root.sg1.d1",
-            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
-            new TSDataType[] {
-              TSDataType.INT32,
-              TSDataType.INT64,
-              TSDataType.FLOAT,
-              TSDataType.DOUBLE,
-              TSDataType.BOOLEAN,
-              TSDataType.TEXT
-            }));
-
-    for (int i = 0; i < 10000; ++i) {
-      localIoTDBHandler.onEvent(
-          new LocalIoTDBEvent(
-              i,
-              i,
-              (long) i,
-              (float) i,
-              (double) i,
-              i % 2 == 0,
-              Binary.valueOf(String.valueOf(i))));
-    }
-
-    localIoTDBHandler.close();
-
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-        Statement statement = connection.createStatement()) {
-      Assert.assertTrue(statement.execute("select * from root.**"));
-
-      try (ResultSet resultSet = statement.getResultSet()) {
-        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-
-        checkHeader(
-            resultSetMetaData,
-            "Time,root.sg1.d1.s1,root.sg1.d1.s2,root.sg1.d1.s3,"
-                + "root.sg1.d1.s4,root.sg1.d1.s5,root.sg1.d1.s6,",
-            new int[] {
-              Types.TIMESTAMP,
-              Types.INTEGER,
-              Types.BIGINT,
-              Types.FLOAT,
-              Types.DOUBLE,
-              Types.BOOLEAN,
-              Types.VARCHAR,
-            });
-
-        int count = 0;
-        while (resultSet.next()) {
-          for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
-            try {
-              assertEquals(count, Double.parseDouble(resultSet.getString(i)), 0.0);
-            } catch (NumberFormatException e) {
-              assertEquals(count % 2 == 0, Boolean.parseBoolean(resultSet.getString(i)));
-            }
-          }
-          count++;
-        }
-        Assert.assertEquals(10000, count);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-  }
-
-  private void checkHeader(
-      ResultSetMetaData resultSetMetaData, String expectedHeaderStrings, int[] expectedTypes)
-      throws SQLException {
-    String[] expectedHeaders = expectedHeaderStrings.split(",");
-    Map<String, Integer> expectedHeaderToTypeIndexMap = new HashMap<>();
-    for (int i = 0; i < expectedHeaders.length; ++i) {
-      expectedHeaderToTypeIndexMap.put(expectedHeaders[i], i);
-    }
-
-    for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
-      Integer typeIndex = expectedHeaderToTypeIndexMap.get(resultSetMetaData.getColumnName(i));
-      Assert.assertNotNull(typeIndex);
-      Assert.assertEquals(expectedTypes[typeIndex], resultSetMetaData.getColumnType(i));
-    }
-  }
-
-  @Test(expected = QueryProcessException.class)
-  public void onEventWithWrongType1() throws Exception {
-    LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
-    localIoTDBHandler.open(
-        new LocalIoTDBConfiguration(
-            "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.INT32}));
-
-    localIoTDBHandler.onEvent(new LocalIoTDBEvent(0, Binary.valueOf(String.valueOf(0))));
-
-    localIoTDBHandler.close();
-  }
-
-  @Test(expected = ClassCastException.class)
-  public void onEventWithWrongType2() throws Exception {
-    LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
-    localIoTDBHandler.open(
-        new LocalIoTDBConfiguration(
-            "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.TEXT}));
-
-    localIoTDBHandler.onEvent(new LocalIoTDBEvent(0, String.valueOf(0)));
-
-    localIoTDBHandler.close();
-  }
-}