You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/24 08:01:36 UTC
[iotdb] 01/02: Change previous IT from server module to integration-test
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch UTToIT
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b457c0a7afe01f3830465d78c3684a672e5c6c6f
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Oct 24 15:42:07 2022 +0800
Change previous IT from server module to integration-test
---
integration-test/import-control.xml | 9 +
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 6 +
.../org/apache/iotdb/it/env/DataNodeWrapper.java | 8 +
.../java/org/apache/iotdb/it/env/MppConfig.java | 30 +++
.../org/apache/iotdb/it/env/RemoteServerEnv.java | 5 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 40 ++++
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 +
.../org/apache/iotdb/db/it/env/StandaloneEnv.java | 5 +
.../iotdb/db/it/env/StandaloneEnvConfig.java | 55 ++++++
.../apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java | 68 +++----
.../iotdb/db/it/watermark/IoTDBWatermarkIT.java | 179 ++++++++---------
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/protocol/mqtt/PublishHandler.java | 4 -
.../iotdb/db/protocol/mqtt/PublishHandlerTest.java | 132 -------------
.../apache/iotdb/db/sink/LocalIoTDBSinkTest.java | 218 ---------------------
15 files changed, 286 insertions(+), 477 deletions(-)
diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index d33888162c..95c2749b8f 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -35,6 +35,11 @@
<allow pkg="org\.slf4j.*" regex="true" />
<subpackage name="db.it">
<disallow pkg="org.apache.iotdb.jdbc.*"/>
+ <allow class="org.apache.iotdb.db.tools.watermark.WatermarkDetector" />
+ <allow class="io.netty.buffer.ByteBuf" />
+ <allow class="io.netty.buffer.Unpooled" />
+ <allow class="org.fusesource.mqtt.client.QoS" />
+ <allow class="org.apache.iotdb.commons.path.PartialPath" />
<allow pkg="java.text"/>
<allow pkg="org.apache.iotdb.db.it.utils" />
<allow pkg="org\.apache\.iotdb\.db\.it\.utils\.TestUtils.*" regex="true"/>
@@ -46,6 +51,10 @@
<allow pkg="org\.apache\.iotdb\.tsfile\.read.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.tsfile\.utils.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.tsfile\.write.*" regex="true" />
+ <allow pkg="org\.apache\.iotdb\.db\.protocol\.mqtt.*" regex="true" />
+ <allow pkg="io\.moquette\.interception\.messages.*" regex="true" />
+ <allow pkg="io\.netty\.handler\.codec\.mqtt.*" regex="true" />
+ <allow pkg="org\.apache\.iotdb\.db\.engine\.trigger\.sink\.mqtt.*" regex="true" />
</subpackage>
<subpackage name="confignode.it">
<allow class="java.nio.ByteBuffer" />
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index ffaa5898f3..994eb323c5 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -479,4 +479,10 @@ public abstract class AbstractEnv implements BaseEnv {
public void shutdownDataNode(int index) {
dataNodeWrapperList.get(index).stop();
}
+
+ @Override
+ public int getMqttPort() {
+ int randomIndex = new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size());
+ return dataNodeWrapperList.get(randomIndex).getMqttPort();
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
index 27f34f275b..b11b829219 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
@@ -32,6 +32,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
private final int internalPort;
private final int dataRegionConsensusPort;
private final int schemaRegionConsensusPort;
+ private final int mqttPort;
public DataNodeWrapper(
String targetConfigNode, String testClassName, String testMethodName, int[] portList) {
@@ -41,6 +42,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
this.internalPort = portList[2];
this.dataRegionConsensusPort = portList[3];
this.schemaRegionConsensusPort = portList[4];
+ this.mqttPort = portList[5];
}
@Override
@@ -54,6 +56,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
"data_region_consensus_port", String.valueOf(this.dataRegionConsensusPort));
properties.setProperty(
"schema_region_consensus_port", String.valueOf(this.schemaRegionConsensusPort));
+ properties.setProperty("mqtt_host", super.getIp());
+ properties.setProperty("mqtt_port", String.valueOf(this.mqttPort));
properties.setProperty("connection_timeout_ms", "30000");
if (this.targetConfigNode != null) {
properties.setProperty(IoTDBConstant.TARGET_CONFIG_NODES, this.targetConfigNode);
@@ -106,4 +110,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
public int getSchemaRegionConsensusPort() {
return schemaRegionConsensusPort;
}
+
+ public int getMqttPort() {
+ return mqttPort;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index a1c1b8dcfd..5db9045ba4 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -281,4 +281,34 @@ public class MppConfig implements BaseConfig {
engineProperties.setProperty("max_degree_of_index_node", String.valueOf(maxDegreeOfIndexNode));
return this;
}
+
+ @Override
+ public BaseConfig setEnableWatermark(boolean enableWatermark) {
+ engineProperties.setProperty("watermark_module_opened", String.valueOf(enableWatermark));
+ return this;
+ }
+
+ @Override
+ public BaseConfig setWatermarkSecretKey(String watermarkSecretKey) {
+ engineProperties.setProperty("watermark_secret_key", watermarkSecretKey);
+ return this;
+ }
+
+ @Override
+ public BaseConfig setWatermarkBitString(String watermarkBitString) {
+ engineProperties.setProperty("watermark_bit_string", watermarkBitString);
+ return this;
+ }
+
+ @Override
+ public BaseConfig setWatermarkMethod(String watermarkMethod) {
+ engineProperties.setProperty("watermark_method", watermarkMethod);
+ return this;
+ }
+
+ @Override
+ public BaseConfig setEnableMQTTService(boolean enableMQTTService) {
+ engineProperties.setProperty("enable_mqtt_service", String.valueOf(enableMQTTService));
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index f3d4ad410e..a7ac83d9de 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -182,4 +182,9 @@ public class RemoteServerEnv implements BaseEnv {
public void shutdownDataNode(int index) {
getDataNodeWrapperList().get(index).stop();
}
+
+ @Override
+ public int getMqttPort() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 90a69dbc26..8b560e4068 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -286,4 +286,44 @@ public interface BaseConfig {
default int getMaxDegreeOfIndexNode() {
return 256;
}
+
+ default BaseConfig setEnableWatermark(boolean enableWatermark) {
+ return this;
+ }
+
+ default boolean isEnableWatermark() {
+ return false;
+ }
+
+ default String getWatermarkSecretKey() {
+ return "IoTDB*2019@Beijing";
+ }
+
+ default BaseConfig setWatermarkSecretKey(String watermarkSecretKey) {
+ return this;
+ }
+
+ default String getWatermarkBitString() {
+ return "100101110100";
+ }
+
+ default BaseConfig setWatermarkBitString(String watermarkBitString) {
+ return this;
+ }
+
+ default String getWatermarkMethod() {
+ return "GroupBasedLSBMethod(embed_row_cycle=2,embed_lsb_num=5)";
+ }
+
+ default BaseConfig setWatermarkMethod(String watermarkMethod) {
+ return this;
+ }
+
+ default boolean isEnableMQTTService() {
+ return false;
+ }
+
+ default BaseConfig setEnableMQTTService(boolean enableMQTTService) {
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 91ca65b99e..2f97f3b37c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -143,4 +143,6 @@ public interface BaseEnv {
void startDataNode(int index);
void shutdownDataNode(int index);
+
+ int getMqttPort();
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 02210a36ca..c11a6cb937 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -202,4 +202,9 @@ public class StandaloneEnv implements BaseEnv {
public void shutdownDataNode(int index) {
// Do nothing
}
+
+ @Override
+ public int getMqttPort() {
+ return 1883;
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
index c40e3141d2..9157c6922e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnvConfig.java
@@ -271,4 +271,59 @@ public class StandaloneEnvConfig implements BaseConfig {
public int getMaxDegreeOfIndexNode() {
return TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode();
}
+
+ @Override
+ public BaseConfig setEnableWatermark(boolean enableWatermark) {
+ IoTDBDescriptor.getInstance().getConfig().setEnableWatermark(enableWatermark);
+ return this;
+ }
+
+ @Override
+ public boolean isEnableWatermark() {
+ return IoTDBDescriptor.getInstance().getConfig().isEnableWatermark();
+ }
+
+ @Override
+ public String getWatermarkSecretKey() {
+ return IoTDBDescriptor.getInstance().getConfig().getWatermarkSecretKey();
+ }
+
+ @Override
+ public BaseConfig setWatermarkSecretKey(String watermarkSecretKey) {
+ IoTDBDescriptor.getInstance().getConfig().setWatermarkSecretKey(watermarkSecretKey);
+ return this;
+ }
+
+ @Override
+ public String getWatermarkBitString() {
+ return IoTDBDescriptor.getInstance().getConfig().getWatermarkBitString();
+ }
+
+ @Override
+ public BaseConfig setWatermarkBitString(String watermarkBitString) {
+ IoTDBDescriptor.getInstance().getConfig().setWatermarkBitString(watermarkBitString);
+ return this;
+ }
+
+ @Override
+ public String getWatermarkMethod() {
+ return IoTDBDescriptor.getInstance().getConfig().getWatermarkMethod();
+ }
+
+ @Override
+ public BaseConfig setWatermarkMethod(String watermarkMethod) {
+ IoTDBDescriptor.getInstance().getConfig().setWatermarkMethod(watermarkMethod);
+ return this;
+ }
+
+ @Override
+ public boolean isEnableMQTTService() {
+ return IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService();
+ }
+
+ @Override
+ public BaseConfig setEnableMQTTService(boolean enableMQTTService) {
+ IoTDBDescriptor.getInstance().getConfig().setEnableMQTTService(enableMQTTService);
+ return this;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java
similarity index 76%
rename from server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java
rename to integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java
index 6ae304ad96..d4c6bccf77 100644
--- a/server/src/test/java/org/apache/iotdb/db/sink/MQTTSinkTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTSinkIT.java
@@ -16,25 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.iotdb.db.sink;
+package org.apache.iotdb.db.it.mqtt;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTConfiguration;
import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTEvent;
import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTHandler;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
import org.fusesource.mqtt.client.QoS;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -42,24 +43,28 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-@SuppressWarnings("squid:S2925")
-public class MQTTSinkTest {
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBMQTTSinkIT {
+
+ private boolean enableMQTTService;
@Before
public void setUp() throws Exception {
- IoTDBDescriptor.getInstance().getConfig().setEnableMQTTService(true);
- EnvironmentUtils.envSetUp();
+ enableMQTTService = ConfigFactory.getConfig().isEnableMQTTService();
+ ConfigFactory.getConfig().setEnableMQTTService(true);
+ EnvFactory.getEnv().initBeforeTest();
}
@After
public void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
+ EnvFactory.getEnv().cleanAfterTest();
+ ConfigFactory.getConfig().setEnableMQTTService(enableMQTTService);
}
@Test
@@ -68,31 +73,29 @@ public class MQTTSinkTest {
mqttHandler.open(
new MQTTConfiguration(
"127.0.0.1",
- 1883,
+ EnvFactory.getEnv().getMqttPort(),
"root",
"root",
new PartialPath("root.sg1.d1"),
new String[] {"s1"}));
- for (int i = 0; i < 10000; ++i) {
+ for (int i = 0; i < 5; ++i) {
mqttHandler.onEvent(new MQTTEvent("test", QoS.EXACTLY_ONCE, false, i, i));
}
mqttHandler.close();
- await().atMost(1, MINUTES).until(() -> 10000 == checkSingleSensorHandlerResult());
+ TimeUnit.SECONDS.sleep(10);
+
+ assertEquals(5, checkSingleSensorHandlerResult());
}
- private int checkSingleSensorHandlerResult() throws ClassNotFoundException {
+ private int checkSingleSensorHandlerResult() {
int count = 0;
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- Assert.assertTrue(statement.execute("select * from root.**"));
+ try (ResultSet resultSet = statement.executeQuery("select * from root.**")) {
- try (ResultSet resultSet = statement.getResultSet()) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
checkHeader(
@@ -122,13 +125,13 @@ public class MQTTSinkTest {
mqttHandler.open(
new MQTTConfiguration(
"127.0.0.1",
- 1883,
+ EnvFactory.getEnv().getMqttPort(),
"root",
"root",
new PartialPath("root.sg1.d1"),
new String[] {"s1", "s2", "s3", "s4", "s5", "s6"}));
- for (int i = 0; i < 10000; ++i) {
+ for (int i = 0; i < 5; ++i) {
mqttHandler.onEvent(
new MQTTEvent(
"test",
@@ -145,19 +148,18 @@ public class MQTTSinkTest {
mqttHandler.close();
- await().atMost(1, MINUTES).until(() -> 10000 == checkMultiSensorsHandlerResult());
+ TimeUnit.SECONDS.sleep(10);
+
+ assertEquals(5, checkMultiSensorsHandlerResult());
}
- private int checkMultiSensorsHandlerResult() throws ClassNotFoundException {
+ private int checkMultiSensorsHandlerResult() {
int count = 0;
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- Assert.assertTrue(statement.execute("select * from root.**"));
- try (ResultSet resultSet = statement.getResultSet()) {
+ try (ResultSet resultSet = statement.executeQuery("select * from root.**")) {
+
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
checkHeader(
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java
similarity index 53%
rename from server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java
rename to integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java
index 30a09168e5..1b54f3cf67 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/IoTDBWatermarkTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/watermark/IoTDBWatermarkIT.java
@@ -16,41 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.tools;
+package org.apache.iotdb.db.it.watermark;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.constant.TestConstant;
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.tools.watermark.WatermarkDetector;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.TestConstant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
import java.io.File;
-import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
+import static org.apache.iotdb.itbase.constant.TestConstant.BASE_OUTPUT_PATH;
import static org.junit.Assert.fail;
-/**
- * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
- * IoTDB server should be defined as integration test.
- */
-public class IoTDBWatermarkTest {
+@Ignore // TODO add it back when we support watermark in mpp mode
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBWatermarkIT {
- private static String filePath1 =
- TestConstant.BASE_OUTPUT_PATH.concat("watermarked_query_result.csv");
- private static String filePath2 =
- TestConstant.BASE_OUTPUT_PATH.concat("notWatermarked_query_result.csv");
+ private static final String filePath1 = BASE_OUTPUT_PATH.concat("watermarked_query_result.csv");
+ private static final String filePath2 =
+ BASE_OUTPUT_PATH.concat(File.separator).concat("notWatermarked_query_result.csv");
private static PrintWriter writer1;
private static PrintWriter writer2;
private static String secretKey = "ASDFGHJKL";
@@ -58,19 +57,33 @@ public class IoTDBWatermarkTest {
private static int embed_row_cycle = 5;
private static int embed_lsb_num = 5;
+ private boolean originEnableWatermark;
+
+ private String originWatermarkSecretKey;
+
+ private String originWatermarkBitString;
+
+ private String originWatermarkMethod;
+
@Before
public void setUp() throws Exception {
- IoTDBDescriptor.getInstance().getConfig().setEnableWatermark(true); // default false
- IoTDBDescriptor.getInstance().getConfig().setWatermarkSecretKey(secretKey);
- IoTDBDescriptor.getInstance().getConfig().setWatermarkBitString(watermarkBitString);
- IoTDBDescriptor.getInstance()
- .getConfig()
+
+ originEnableWatermark = ConfigFactory.getConfig().isEnableWatermark();
+ originWatermarkSecretKey = ConfigFactory.getConfig().getWatermarkSecretKey();
+ originWatermarkBitString = ConfigFactory.getConfig().getWatermarkBitString();
+ originWatermarkMethod = ConfigFactory.getConfig().getWatermarkMethod();
+
+ ConfigFactory.getConfig().setEnableWatermark(true);
+ ConfigFactory.getConfig().setWatermarkSecretKey(secretKey);
+ ConfigFactory.getConfig().setWatermarkBitString(watermarkBitString);
+ ConfigFactory.getConfig()
.setWatermarkMethod(
String.format(
"GroupBasedLSBMethod" + "(embed_row_cycle=%d,embed_lsb_num=%d)",
embed_row_cycle, embed_lsb_num));
- EnvironmentUtils.envSetUp();
+ EnvFactory.getEnv().initBeforeTest();
+
insertData();
File file1 = new File(filePath1);
@@ -98,14 +111,16 @@ public class IoTDBWatermarkTest {
if (file2.exists()) {
file2.delete();
}
- EnvironmentUtils.cleanEnv();
+ EnvFactory.getEnv().cleanAfterTest();
+
+ ConfigFactory.getConfig().setEnableWatermark(originEnableWatermark);
+ ConfigFactory.getConfig().setWatermarkSecretKey(originWatermarkSecretKey);
+ ConfigFactory.getConfig().setWatermarkBitString(originWatermarkBitString);
+ ConfigFactory.getConfig().setWatermarkMethod(originWatermarkMethod);
}
- private static void insertData() throws ClassNotFoundException {
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ private static void insertData() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
String[] create_sql =
@@ -123,7 +138,7 @@ public class IoTDBWatermarkTest {
String sql =
String.format(
"insert into root.vehicle.d0(timestamp,s0,s2) values(%s,%s,%s)",
- time, time % 50, time % 50, time % 50);
+ time, time % 50, time % 50);
statement.execute(sql);
if (time % 10 == 0) {
sql =
@@ -139,36 +154,24 @@ public class IoTDBWatermarkTest {
}
@Test
- public void EncodeAndDecodeTest1()
- throws IOException, ClassNotFoundException, LogicalOperatorException {
+ public void EncodeAndDecodeTest1() {
// Watermark Embedding
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("GRANT WATERMARK_EMBEDDING TO root");
- boolean hasResultSet = statement.execute("SELECT s0,s1,s2 FROM root.vehicle.d0");
- Assert.assertTrue(hasResultSet);
- ResultSet resultSet = statement.getResultSet();
- try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT s0,s1,s2 FROM root.vehicle.d0")) {
while (resultSet.next()) {
String ans =
resultSet.getString(TestConstant.TIMESTAMP_STR)
+ ","
- + resultSet.getString(
- TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s0)
+ + resultSet.getString(TestConstant.d0 + "." + TestConstant.s0)
+ ","
- + resultSet.getString(
- TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s1)
+ + resultSet.getString(TestConstant.d0 + "." + TestConstant.s1)
+ ","
- + resultSet.getString(
- TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s2);
+ + resultSet.getString(TestConstant.d0 + "." + TestConstant.s2);
writer1.println(ans);
}
writer1.close();
- } finally {
- resultSet.close();
}
} catch (Exception e) {
e.printStackTrace();
@@ -178,50 +181,43 @@ public class IoTDBWatermarkTest {
// Watermark Detection
double alpha = 0.1;
int columnIndex = 1;
- boolean isWatermarked =
- WatermarkDetector.isWatermarked(
- filePath1,
- secretKey,
- watermarkBitString,
- embed_row_cycle,
- embed_lsb_num,
- alpha,
- columnIndex,
- "int");
- Assert.assertTrue(isWatermarked);
+ try {
+ boolean isWatermarked =
+ WatermarkDetector.isWatermarked(
+ filePath1,
+ secretKey,
+ watermarkBitString,
+ embed_row_cycle,
+ embed_lsb_num,
+ alpha,
+ columnIndex,
+ "int");
+ Assert.assertTrue(isWatermarked);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
- public void EncodeAndDecodeTest2()
- throws IOException, ClassNotFoundException, LogicalOperatorException {
+ public void EncodeAndDecodeTest2() {
// No Watermark Embedding
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("REVOKE WATERMARK_EMBEDDING FROM root");
- boolean hasResultSet = statement.execute("SELECT s0,s1,s2 FROM root.vehicle.d0");
- Assert.assertTrue(hasResultSet);
- ResultSet resultSet = statement.getResultSet();
- try {
+ try (ResultSet resultSet = statement.executeQuery("SELECT s0,s1,s2 FROM root.vehicle.d0")) {
while (resultSet.next()) {
String ans =
resultSet.getString(TestConstant.TIMESTAMP_STR)
+ ","
- + resultSet.getString(
- TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s0)
+ + resultSet.getString(TestConstant.d0 + "." + TestConstant.s0)
+ ","
- + resultSet.getString(
- TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s1)
+ + resultSet.getString(TestConstant.d0 + "." + TestConstant.s1)
+ ","
- + resultSet.getString(
- TestConstant.d0 + IoTDBConstant.PATH_SEPARATOR + TestConstant.s2);
+ + resultSet.getString(TestConstant.d0 + "." + TestConstant.s2);
writer2.println(ans);
}
writer2.close();
- } finally {
- resultSet.close();
}
} catch (Exception e) {
e.printStackTrace();
@@ -231,16 +227,21 @@ public class IoTDBWatermarkTest {
// Watermark Detection
double alpha = 0.1;
int columnIndex = 1;
- boolean isWatermarked =
- WatermarkDetector.isWatermarked(
- filePath2,
- secretKey,
- watermarkBitString,
- embed_row_cycle,
- embed_lsb_num,
- alpha,
- columnIndex,
- "int");
- Assert.assertFalse(isWatermarked);
+ try {
+ boolean isWatermarked =
+ WatermarkDetector.isWatermarked(
+ filePath2,
+ secretKey,
+ watermarkBitString,
+ embed_row_cycle,
+ embed_lsb_num,
+ alpha,
+ columnIndex,
+ "int");
+ Assert.assertFalse(isWatermarked);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 72c006533b..16a2890956 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -2123,7 +2123,7 @@ public class IoTDBConfig {
this.watermarkBitString = watermarkBitString;
}
- String getWatermarkMethod() {
+ public String getWatermarkMethod() {
return this.watermarkMethod;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 7fb4ee82b9..2a1f544101 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -53,10 +53,6 @@ public class PublishHandler extends AbstractInterceptHandler {
this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
}
- protected PublishHandler(PayloadFormatter payloadFormat) {
- this.payloadFormat = payloadFormat;
- }
-
@Override
public String getID() {
return "iotdb-mqtt-broker-listener";
diff --git a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
deleted file mode 100644
index f1b1a20b1a..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/PublishHandlerTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iotdb.db.protocol.mqtt;
-
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
-
-import io.moquette.interception.messages.InterceptConnectMessage;
-import io.moquette.interception.messages.InterceptDisconnectMessage;
-import io.moquette.interception.messages.InterceptPublishMessage;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.mqtt.MqttConnectMessage;
-import io.netty.handler.codec.mqtt.MqttConnectPayload;
-import io.netty.handler.codec.mqtt.MqttFixedHeader;
-import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
-import io.netty.handler.codec.mqtt.MqttQoS;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.Statement;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class PublishHandlerTest {
-
- @BeforeClass
- public static void setUp() throws Exception {
- EnvironmentUtils.envSetUp();
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void onPublish() throws ClassNotFoundException {
- PayloadFormatter payloadFormat = PayloadFormatManager.getPayloadFormat("json");
- PublishHandler handler = new PublishHandler(payloadFormat);
- String clientId = "clientId";
-
- String payload =
- "{\n"
- + "\"device\":\"root.sg.d1\",\n"
- + "\"timestamp\":1586076045524,\n"
- + "\"measurements\":[\"s1\"],\n"
- + "\"values\":[0.530635]\n"
- + "}";
-
- ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
-
- // connect
- MqttConnectPayload mqttConnectPayload =
- new MqttConnectPayload(
- clientId,
- null,
- "test".getBytes(StandardCharsets.UTF_8),
- "root",
- "root".getBytes(StandardCharsets.UTF_8));
- MqttConnectMessage mqttConnectMessage = new MqttConnectMessage(null, null, mqttConnectPayload);
- InterceptConnectMessage interceptConnectMessage =
- new InterceptConnectMessage(mqttConnectMessage);
- handler.onConnect(interceptConnectMessage);
-
- // publish
- MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("root.sg.d1", 1);
- MqttFixedHeader fixedHeader =
- new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1);
- MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, variableHeader, buf);
- InterceptPublishMessage message = new InterceptPublishMessage(publishMessage, clientId, null);
- handler.onPublish(message);
-
- // disconnect
- InterceptDisconnectMessage interceptDisconnectMessage =
- new InterceptDisconnectMessage(clientId, null);
- handler.onDisconnect(interceptDisconnectMessage);
-
- String[] retArray = new String[] {"1586076045524,0.530635,"};
-
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
- boolean hasResultSet = statement.execute("select * from root.sg.d1");
- Assert.assertTrue(hasResultSet);
-
- try (ResultSet resultSet = statement.getResultSet()) {
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- int cnt = 0;
- while (resultSet.next()) {
- StringBuilder builder = new StringBuilder();
- for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
- builder.append(resultSet.getString(i)).append(",");
- }
- assertEquals(retArray[cnt], builder.toString());
- cnt++;
- }
- assertEquals(retArray.length, cnt);
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java b/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java
deleted file mode 100644
index 83f2c273d8..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/sink/LocalIoTDBSinkTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.sink;
-
-import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBConfiguration;
-import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBEvent;
-import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBHandler;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class LocalIoTDBSinkTest {
-
- @Before
- public void setUp() throws Exception {
- EnvironmentUtils.envSetUp();
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void onEventUsingSingleSensorHandler() throws Exception {
- LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
- localIoTDBHandler.open(
- new LocalIoTDBConfiguration(
- "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.INT32}));
-
- for (int i = 0; i < 10000; ++i) {
- localIoTDBHandler.onEvent(new LocalIoTDBEvent(i, i));
- }
-
- localIoTDBHandler.close();
-
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
- Assert.assertTrue(statement.execute("select * from root.**"));
-
- try (ResultSet resultSet = statement.getResultSet()) {
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-
- checkHeader(
- resultSetMetaData,
- "Time,root.sg1.d1.s1,",
- new int[] {
- Types.TIMESTAMP, Types.INTEGER,
- });
-
- int count = 0;
- while (resultSet.next()) {
- for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
- assertEquals(count, Double.parseDouble(resultSet.getString(i)), 0.0);
- }
- count++;
- }
- Assert.assertEquals(10000, count);
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void onEventUsingMultiSensorsHandler() throws Exception {
- LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
- localIoTDBHandler.open(
- new LocalIoTDBConfiguration(
- "root.sg1.d1",
- new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
- new TSDataType[] {
- TSDataType.INT32,
- TSDataType.INT64,
- TSDataType.FLOAT,
- TSDataType.DOUBLE,
- TSDataType.BOOLEAN,
- TSDataType.TEXT
- }));
-
- for (int i = 0; i < 10000; ++i) {
- localIoTDBHandler.onEvent(
- new LocalIoTDBEvent(
- i,
- i,
- (long) i,
- (float) i,
- (double) i,
- i % 2 == 0,
- Binary.valueOf(String.valueOf(i))));
- }
-
- localIoTDBHandler.close();
-
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
- Assert.assertTrue(statement.execute("select * from root.**"));
-
- try (ResultSet resultSet = statement.getResultSet()) {
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-
- checkHeader(
- resultSetMetaData,
- "Time,root.sg1.d1.s1,root.sg1.d1.s2,root.sg1.d1.s3,"
- + "root.sg1.d1.s4,root.sg1.d1.s5,root.sg1.d1.s6,",
- new int[] {
- Types.TIMESTAMP,
- Types.INTEGER,
- Types.BIGINT,
- Types.FLOAT,
- Types.DOUBLE,
- Types.BOOLEAN,
- Types.VARCHAR,
- });
-
- int count = 0;
- while (resultSet.next()) {
- for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
- try {
- assertEquals(count, Double.parseDouble(resultSet.getString(i)), 0.0);
- } catch (NumberFormatException e) {
- assertEquals(count % 2 == 0, Boolean.parseBoolean(resultSet.getString(i)));
- }
- }
- count++;
- }
- Assert.assertEquals(10000, count);
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private void checkHeader(
- ResultSetMetaData resultSetMetaData, String expectedHeaderStrings, int[] expectedTypes)
- throws SQLException {
- String[] expectedHeaders = expectedHeaderStrings.split(",");
- Map<String, Integer> expectedHeaderToTypeIndexMap = new HashMap<>();
- for (int i = 0; i < expectedHeaders.length; ++i) {
- expectedHeaderToTypeIndexMap.put(expectedHeaders[i], i);
- }
-
- for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
- Integer typeIndex = expectedHeaderToTypeIndexMap.get(resultSetMetaData.getColumnName(i));
- Assert.assertNotNull(typeIndex);
- Assert.assertEquals(expectedTypes[typeIndex], resultSetMetaData.getColumnType(i));
- }
- }
-
- @Test(expected = QueryProcessException.class)
- public void onEventWithWrongType1() throws Exception {
- LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
- localIoTDBHandler.open(
- new LocalIoTDBConfiguration(
- "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.INT32}));
-
- localIoTDBHandler.onEvent(new LocalIoTDBEvent(0, Binary.valueOf(String.valueOf(0))));
-
- localIoTDBHandler.close();
- }
-
- @Test(expected = ClassCastException.class)
- public void onEventWithWrongType2() throws Exception {
- LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
- localIoTDBHandler.open(
- new LocalIoTDBConfiguration(
- "root.sg1.d1", new String[] {"s1"}, new TSDataType[] {TSDataType.TEXT}));
-
- localIoTDBHandler.onEvent(new LocalIoTDBEvent(0, String.valueOf(0)));
-
- localIoTDBHandler.close();
- }
-}