You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/09 10:03:28 UTC
[iotdb] branch master updated: [IOTDB-2772] Add testcontainer e2e test for newsync (#5416)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new eca0b03b0e [IOTDB-2772] Add testcontainer e2e test for newsync (#5416)
eca0b03b0e is described below
commit eca0b03b0eda301180bb8cf36e64cc8139413636
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon May 9 18:03:22 2022 +0800
[IOTDB-2772] Add testcontainer e2e test for newsync (#5416)
Co-authored-by: yschengzi <87...@users.noreply.github.com>
Co-authored-by: Jamber <ja...@sina.com>
Co-authored-by: haiyi.zb <ha...@alibaba-inc.com>
Co-authored-by: wangjunqing <wa...@alibaba-inc.com>
Co-authored-by: Irvine <ir...@gmail.com>
Co-authored-by: yschengzi <ys...@126.com>
Co-authored-by: Haonan <hh...@outlook.com>
---
.github/workflows/e2e.yml | 4 +-
.github/workflows/sync.yml | 54 +++
docker/src/main/Dockerfile-single-tc | 45 +++
pom.xml | 2 +-
.../apache/iotdb/db/metadata/tag/TagLogFile.java | 7 +-
testcontainer/Readme.md | 2 +
testcontainer/pom.xml | 103 +++++
.../test/java/org/apache/iotdb/db/sync/SyncIT.java | 426 +++++++++++++++++++++
.../apache/iotdb/db/sync/SyncWeakNetworkIT.java | 40 ++
.../src/test/resources/sync/docker-compose.yaml | 52 +++
10 files changed, 731 insertions(+), 4 deletions(-)
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index bb2f0120c4..bd82f858fb 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -5,12 +5,12 @@ name: E2E Tests
on:
push:
- branches:
+ branches:
- test/e2e
paths-ignore:
- 'docs/**'
pull_request:
- branches:
+ branches:
- cluster_new
paths-ignore:
- 'docs/**'
diff --git a/.github/workflows/sync.yml b/.github/workflows/sync.yml
new file mode 100644
index 0000000000..14636454b9
--- /dev/null
+++ b/.github/workflows/sync.yml
@@ -0,0 +1,54 @@
+name: Sync Tests
+
+on:
+ push:
+ branches:
+ - master
+ paths-ignore:
+ - 'docs/**'
+ pull_request:
+ branches:
+ - master
+ paths-ignore:
+ - 'docs/**'
+ # allow manually run the action:
+ workflow_dispatch:
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
+env:
+ MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
+
+jobs:
+ E2E:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: true
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK 11
+ uses: actions/setup-java@v1
+ with:
+ java-version: 11
+
+ - name: Cache Maven packages
+ uses: actions/cache@v2
+ with:
+ path: ~/.m2
+ key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-m2-
+
+ - name: Build IoTDB server distribution zip
+ run: mvn -B clean install -pl distribution -am -DskipTests
+
+ - name: Build Docker Image
+ run: |
+ docker build . -f docker/src/main/Dockerfile-single-tc -t "iotdb:$GITHUB_SHA"
+ docker images
+
+ - name: Test Sync
+ run: |
+ mvn -B -T 4 integration-test -pl testcontainer -P test-sync
\ No newline at end of file
diff --git a/docker/src/main/Dockerfile-single-tc b/docker/src/main/Dockerfile-single-tc
new file mode 100644
index 0000000000..f31c81b8e8
--- /dev/null
+++ b/docker/src/main/Dockerfile-single-tc
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# docker build context is the root path of the repository
+
+FROM openjdk:11-jre-slim
+
+ADD distribution/target/apache-iotdb-*-server-bin.zip /
+
+RUN apt update \
+ && apt install lsof dos2unix procps unzip iproute2 iputils-ping -y \
+ && unzip /apache-iotdb-*-bin.zip -d / \
+ && rm /apache-iotdb-*-bin.zip \
+ && mv /apache-iotdb-* /iotdb \
+ && apt remove unzip -y \
+ && apt autoremove -y \
+ && apt purge --auto-remove -y \
+ && apt clean -y
+RUN dos2unix /iotdb/sbin/start-server.sh
+RUN dos2unix /iotdb/sbin/../conf/iotdb-env.sh
+EXPOSE 6667
+EXPOSE 31999
+EXPOSE 5555
+EXPOSE 8086
+EXPOSE 8181
+VOLUME /iotdb/data
+VOLUME /iotdb/logs
+ENV PATH="/iotdb/sbin/:/iotdb/tools/:${PATH}"
+ENTRYPOINT ["/iotdb/sbin/start-server.sh"]
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d752972cc1..d2265f6d50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -204,7 +204,7 @@
<jackson-core-asl.version>1.9.13</jackson-core-asl.version>
<mqtt-client.version>1.12</mqtt-client.version>
<google.code.findbugs.jsr305.version>3.0.2</google.code.findbugs.jsr305.version>
- <jna.version>5.5.0</jna.version>
+ <jna.version>5.8.0</jna.version>
<zookeeper.version>3.4.9</zookeeper.version>
<commons-beanutils.version>1.9.4</commons-beanutils.version>
<commons-compress.version>1.21</commons-compress.version>
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagLogFile.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagLogFile.java
index 87345427c2..2fa968ddf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagLogFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagLogFile.java
@@ -31,6 +31,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
@@ -71,7 +72,11 @@ public class TagLogFile implements AutoCloseable {
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
// move the current position to the tail of the file
- this.fileChannel.position(fileChannel.size());
+ try {
+ this.fileChannel.position(fileChannel.size());
+ } catch (ClosedByInterruptException e) {
+ // ignore
+ }
}
/** @return tags map, attributes map */
diff --git a/testcontainer/Readme.md b/testcontainer/Readme.md
index 234c6861b6..46cf905fc6 100644
--- a/testcontainer/Readme.md
+++ b/testcontainer/Readme.md
@@ -58,6 +58,8 @@ For testing a cluster, we use `docker-compose` and `testcontainer`.
The docker-compose file is located at `src/test/resources/1nodes`, `src/test/resources/3nodes` and `src/test/resources/5nodes`,
in which one is for 1 node with replica number =1 , 3 nodes with replica number=3, and the last one is for 5 nodes with replica number =3.
+For testing sync module only, we use `mvn integration-test -P sync`.
+
TestContainer can start the docker (or docker compose) automatically.
But these docker compose files can also be used independently.
diff --git a/testcontainer/pom.xml b/testcontainer/pom.xml
index 02ac9daabd..8bb032df4c 100644
--- a/testcontainer/pom.xml
+++ b/testcontainer/pom.xml
@@ -35,6 +35,8 @@
<docker.clean.single.argument>image rm apache/iotdb:maven-development</docker.clean.single.argument>
<docker.build.cluster.argument>build -t apache/iotdb:cluster-maven-development -f ${basedir}/../docker/src/main/Dockerfile-cluster ${basedir}/../.</docker.build.cluster.argument>
<docker.clean.cluster.argument>image rm apache/iotdb:cluster-maven-development</docker.clean.cluster.argument>
+ <docker.build.sync.argument>build -t apache/iotdb:sync-maven-development -f ${basedir}/../docker/src/main/Dockerfile-single-tc ${basedir}/../.</docker.build.sync.argument>
+ <docker.clean.sync.argument>image rm apache/iotdb:sync-maven-development</docker.clean.sync.argument>
</properties>
<dependencies>
<dependency>
@@ -100,6 +102,18 @@
<commandlineArgs>${docker.build.cluster.argument}</commandlineArgs>
</configuration>
</execution>
+ <execution>
+ <id>build-sync-docker-image</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <skip>${docker.test.skip}</skip>
+ <executable>${docker.build.executable}</executable>
+ <commandlineArgs>${docker.build.sync.argument}</commandlineArgs>
+ </configuration>
+ </execution>
<execution>
<id>clean-docker-image</id>
<phase>post-integration-test</phase>
@@ -124,6 +138,90 @@
<commandlineArgs>${docker.clean.cluster.argument}</commandlineArgs>
</configuration>
</execution>
+ <execution>
+ <id>clean-sync-docker-image</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <skip>${docker.test.skip}</skip>
+ <executable>${docker.build.executable}</executable>
+ <commandlineArgs>${docker.clean.sync.argument}</commandlineArgs>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+ <id>add-test-container-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${basedir}/src/test/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>run-integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>test-sync</id>
+ <build>
+ <plugins>
+ <!-- before integration test, we build the docker image -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.6.0</version>
+ <executions>
+ <execution>
+ <id>build-sync-docker-image</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <skip>${docker.test.skip}</skip>
+ <executable>${docker.build.executable}</executable>
+ <commandlineArgs>${docker.build.sync.argument}</commandlineArgs>
+ </configuration>
+ </execution>
+ <execution>
+ <id>clean-sync-docker-image</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <skip>${docker.test.skip}</skip>
+ <executable>${docker.build.executable}</executable>
+ <commandlineArgs>${docker.clean.sync.argument}</commandlineArgs>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
@@ -158,6 +256,11 @@
</goals>
</execution>
</executions>
+ <configuration>
+ <includes>
+ <include>**/org/apache/iotdb/db/sync/**</include>
+ </includes>
+ </configuration>
</plugin>
</plugins>
</build>
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncIT.java
new file mode 100644
index 0000000000..c7caf1918d
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncIT.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync;
+
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.NoProjectNameDockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SyncIT {
+ private static Logger receiverLogger = LoggerFactory.getLogger("iotdb-receiver_1");
+ private static Logger senderLogger = LoggerFactory.getLogger("iotdb-sender_1");
+ private static int RETRY_TIME = 30;
+
+ protected Statement senderStatement;
+ protected Connection senderConnection;
+ protected Statement receiverStatement;
+ protected Connection receiverConnection;
+
+ // in TestContainer's document, it is @ClassRule, and the environment is `public static`
+ // I am not sure the difference now.
+ @Rule
+ public DockerComposeContainer environment =
+ new NoProjectNameDockerComposeContainer(
+ "sync", new File("src/test/resources/sync/docker-compose.yaml"))
+ .withExposedService("iotdb-sender_1", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-sender_1", new Slf4jLogConsumer(senderLogger))
+ .withExposedService("iotdb-receiver_1", 6667, Wait.forListeningPort())
+ .withLogConsumer("iotdb-receiver_1", new Slf4jLogConsumer(receiverLogger))
+ .withLocalCompose(true);
+
+ protected int getSenderRpcPort() {
+ return environment.getServicePort("iotdb-sender_1", 6667);
+ }
+
+ protected String getSenderIp() {
+ return environment.getServiceHost("iotdb-sender_1", 6667);
+ }
+
+ protected int getReceiverRpcPort() {
+ return environment.getServicePort("iotdb-receiver_1", 6667);
+ }
+
+ protected String getReceiverIp() {
+ return environment.getServiceHost("iotdb-receiver_1", 6667);
+ }
+
+ @Before
+ public void init() throws Exception {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ senderConnection =
+ DriverManager.getConnection(
+ "jdbc:iotdb://" + getSenderIp() + ":" + getSenderRpcPort(), "root", "root");
+ senderStatement = senderConnection.createStatement();
+ receiverConnection =
+ DriverManager.getConnection(
+ "jdbc:iotdb://" + getReceiverIp() + ":" + getReceiverRpcPort(), "root", "root");
+ receiverStatement = receiverConnection.createStatement();
+ }
+
+ @After
+ public void clean() throws Exception {
+ senderStatement.close();
+ senderConnection.close();
+ receiverStatement.close();
+ receiverConnection.close();
+ }
+
+ private void prepareSchema() throws Exception {
+ senderStatement.execute("set storage group to root.sg1");
+ senderStatement.execute("set storage group to root.sg2");
+ senderStatement.execute("create timeseries root.sg1.d1.s1 with datatype=int32, encoding=PLAIN");
+ senderStatement.execute("create timeseries root.sg1.d1.s2 with datatype=float, encoding=RLE");
+ senderStatement.execute("create timeseries root.sg1.d1.s3 with datatype=TEXT, encoding=PLAIN");
+ senderStatement.execute("create timeseries root.sg1.d2.s4 with datatype=int64, encoding=PLAIN");
+ senderStatement.execute("create timeseries root.sg2.d1.s0 with datatype=int32, encoding=PLAIN");
+ senderStatement.execute(
+ "create timeseries root.sg2.d2.s1 with datatype=boolean, encoding=PLAIN");
+ }
+
+ /* add one seq tsfile in sg1 */
+ private void prepareIns1() throws Exception {
+ senderStatement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(1, 1, 16.0, 'a')");
+ senderStatement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(2, 2, 25.16, 'b')");
+ senderStatement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(3, 3, 65.25, 'c')");
+ senderStatement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(16, 25, 100.0, 'd')");
+ senderStatement.execute("insert into root.sg1.d2(timestamp, s4) values(1, 1)");
+ senderStatement.execute("flush");
+ }
+
+ /* add one seq tsfile in sg1 */
+ private void prepareIns2() throws Exception {
+ senderStatement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(100, 65, 16.25, 'e')");
+ senderStatement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(65, 100, 25.0, 'f')");
+ senderStatement.execute("insert into root.sg1.d2(timestamp, s4) values(200, 100)");
+ senderStatement.execute("flush");
+ }
+
+ /* add one seq tsfile in sg1, one unseq tsfile in sg1, one seq tsfile in sg2 */
+ private void prepareIns3() throws Exception {
+ senderStatement.execute("insert into root.sg2.d1(timestamp, s0) values(100, 100)");
+ senderStatement.execute("insert into root.sg2.d1(timestamp, s0) values(65, 65)");
+ senderStatement.execute("insert into root.sg2.d2(timestamp, s1) values(1, true)");
+ senderStatement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(25, 16, 65.16, 'g')");
+ senderStatement.execute(
+ "insert into root.sg1.d1(timestamp, s1, s2, s3) values(200, 100, 16.65, 'h')");
+ senderStatement.execute("flush");
+ }
+
+ private void prepareDel1() throws Exception { // after ins1, add 2 deletions
+ senderStatement.execute("delete from root.sg1.d1.s1 where time == 3");
+ senderStatement.execute("delete from root.sg1.d1.s2 where time >= 1 and time <= 2");
+ }
+
+ private void prepareDel2() throws Exception { // after ins2, add 3 deletions
+ senderStatement.execute("delete from root.sg1.d1.s3 where time <= 65");
+ }
+
+ private void prepareDel3() throws Exception { // after ins3, add 5 deletions, 2 schemas{
+ senderStatement.execute("delete from root.sg1.d1.* where time <= 2");
+ senderStatement.execute("delete timeseries root.sg1.d2.*");
+ senderStatement.execute("delete storage group root.sg2");
+ }
+
+ private void preparePipe() throws Exception {
+ receiverStatement.execute("start pipeserver");
+ senderStatement.execute(
+ "create pipesink my_iotdb as iotdb(ip='sync_iotdb-receiver_1',port=6670)");
+ senderStatement.execute("create pipe p to my_iotdb");
+ }
+
+ private void startPipe() throws Exception {
+ senderStatement.execute("start pipe p");
+ }
+
+ private void stopPipe() throws Exception {
+ senderStatement.execute("stop pipe p");
+ }
+
+ private void dropPipe() throws Exception {
+ senderStatement.execute("drop pipe p");
+ }
+
+ private void checkResult() throws Exception {
+ String[] columnNames =
+ new String[] {
+ "root.sg1.d1.s3",
+ "root.sg1.d1.s1",
+ "root.sg1.d1.s2",
+ "root.sg1.d2.s4",
+ "root.sg2.d1.s0",
+ "root.sg2.d2.s1"
+ };
+ String[] results =
+ new String[] {
+ "1,a,1,16.0,1,null,true",
+ "2,b,2,25.16,null,null,null",
+ "3,c,3,65.25,null,null,null",
+ "16,d,25,100.0,null,null,null",
+ "25,g,16,65.16,null,null,null",
+ "65,f,100,25.0,null,65,null",
+ "100,e,65,16.25,null,100,null",
+ "200,h,100,16.65,100,null,null"
+ };
+ checkResult(receiverStatement, "select ** from root", columnNames, results, true);
+ }
+
+ private void checkResultWithDeletion() throws Exception {
+ String[] columnNames =
+ new String[] {
+ "root.sg1.d1.s3", "root.sg1.d1.s1", "root.sg1.d1.s2",
+ };
+ String[] results =
+ new String[] {
+ "3,null,null,65.25",
+ "16,null,25,100.0",
+ "25,null,16,65.16",
+ "65,null,100,25.0",
+ "100,e,65,16.25",
+ "200,h,100,16.65"
+ };
+ checkResult(receiverStatement, "select ** from root", columnNames, results, true);
+ }
+
+ @Test
+ public void testCreatePipe() throws Exception {
+ preparePipe();
+ checkResult(
+ receiverStatement,
+ "show pipe",
+ new String[] {"name", "role", "status"},
+ new String[] {"p,receiver,STOP"},
+ false);
+ dropPipe();
+ checkResult(
+ senderStatement,
+ "show pipe",
+ new String[] {"name", "role", "remote", "status"},
+ new String[] {"p,sender,my_iotdb,DROP"},
+ false);
+ }
+
+ @Test
+ public void testHistoryInsert() {
+ try {
+ prepareSchema();
+ prepareIns1();
+ prepareIns2();
+ prepareIns3();
+ preparePipe();
+ startPipe();
+ checkResult();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testHistoryAndRealTimeInsert() {
+ try {
+ prepareSchema();
+ prepareIns1();
+ prepareIns2();
+ preparePipe();
+ startPipe();
+ prepareIns3();
+ checkResult();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStopAndStartInsert() {
+ try {
+ prepareSchema();
+ prepareIns1();
+ preparePipe();
+ startPipe();
+ prepareIns2();
+ stopPipe();
+ prepareIns3();
+ startPipe();
+ checkResult();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRealTimeAndStopInsert() {
+ try {
+ preparePipe(); // realtime
+ startPipe();
+ prepareSchema();
+ prepareIns1();
+ stopPipe();
+ prepareIns2();
+ startPipe();
+ prepareIns3();
+ checkResult();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testHistoryDel() {
+ try {
+ prepareSchema(); // history
+ prepareIns1();
+ prepareIns2();
+ prepareIns3();
+ prepareDel1();
+ prepareDel2();
+ prepareDel3();
+ preparePipe(); // realtime
+ startPipe();
+ checkResultWithDeletion();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRealtimeDel() {
+ try {
+ prepareSchema(); // history
+ prepareIns1();
+ preparePipe(); // realtime
+ startPipe();
+ prepareIns2();
+ prepareDel1();
+ stopPipe();
+ prepareIns3();
+ startPipe();
+ prepareDel2();
+ prepareDel3();
+ checkResultWithDeletion();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /**
+ * Execute sql in IoTDB and compare resultSet with expected result. This method only check columns
+ * that is explicitly declared in columnNames. This method will compare expected result with
+ * actual result RETRY_TIME times. Interval of each run is 1000ms.
+ *
+ * @param statement Statement of IoTDB.
+ * @param sql SQL to be executed.
+ * @param columnNames Columns to be compared with.
+ * @param retArray Expected result set. Order of columns is as same as columnNames.
+ * @param hasTimeColumn If result set contains time column (e.g. timeserires query), set
+ * hasTimeColumn = true.
+ */
+ private static void checkResult(
+ Statement statement,
+ String sql,
+ String[] columnNames,
+ String[] retArray,
+ boolean hasTimeColumn)
+ throws Exception {
+ loop:
+ for (int loop = 0; loop < RETRY_TIME; loop++) {
+ try {
+ Thread.sleep(1000);
+ boolean hasResultSet = statement.execute(sql);
+ if (!assertOrCompareEqual(true, hasResultSet, loop)) {
+ continue;
+ }
+ ResultSet resultSet = statement.getResultSet();
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ if (hasTimeColumn) {
+ builder.append(resultSet.getString(1)).append(",");
+ }
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(resultSet.getString(index)).append(",");
+ }
+ if (builder.length() > 0) {
+ builder.deleteCharAt(builder.length() - 1);
+ }
+ if (!assertOrCompareEqual(retArray[cnt], builder.toString(), loop)) {
+ continue loop;
+ }
+ cnt++;
+ }
+ if (!assertOrCompareEqual(retArray.length, cnt, loop)) {
+ continue;
+ }
+ return;
+ } catch (Exception e) {
+ if (loop == RETRY_TIME - 1) {
+ throw e;
+ }
+ }
+ }
+ Assert.fail();
+ }
+
+ private static boolean assertOrCompareEqual(Object expected, Object actual, int loop) {
+ if (loop == RETRY_TIME - 1) {
+ assertEquals(expected, actual);
+ return true;
+ } else {
+ return expected.equals(actual);
+ }
+ }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncWeakNetworkIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncWeakNetworkIT.java
new file mode 100644
index 0000000000..09effada3a
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncWeakNetworkIT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync;
+
+import org.junit.Assert;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.ContainerState;
+
+/** Simulate network delay and loss. */
+public class SyncWeakNetworkIT extends SyncIT {
+ @Override
+ public void init() throws Exception {
+ super.init();
+ // set delay is 200±50ms that conform to a normal distribution;
+ // network packet with 10% loss rate, 10% duplicate rate, 10% reorder rate and 10% corrupt rate;
+ Container.ExecResult res =
+ ((ContainerState) environment.getContainerByServiceName("iotdb-sender_1").get())
+ .execInContainer(
+ "sh",
+ "-c",
+ "tc qdisc add dev eth0 root netem delay 200ms 50ms 25% distribution normal loss random 10% duplicate 10% reorder 10% corrupt 10%");
+ Assert.assertEquals(0, res.getExitCode());
+ }
+}
diff --git a/testcontainer/src/test/resources/sync/docker-compose.yaml b/testcontainer/src/test/resources/sync/docker-compose.yaml
new file mode 100644
index 0000000000..fa0cedc8a4
--- /dev/null
+++ b/testcontainer/src/test/resources/sync/docker-compose.yaml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+version: '3.8'
+
+services:
+ iotdb-sender:
+ image: apache/iotdb:sync-maven-development
+ networks:
+ - iotdb
+ cap_add:
+ - NET_ADMIN
+ healthcheck:
+ test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667" ]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ volumes:
+ - ../logback-container.xml:/iotdb/conf/logback.xml
+ iotdb-receiver:
+ image: apache/iotdb:sync-maven-development
+ networks:
+ - iotdb
+ cap_add:
+ - NET_ADMIN
+ healthcheck:
+ test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667" ]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ volumes:
+ - ../logback-container.xml:/iotdb/conf/logback.xml
+
+
+networks:
+ iotdb:
+ driver: bridge
\ No newline at end of file