You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/09 10:03:28 UTC

[iotdb] branch master updated: [IOTDB-2772] Add testcontainer e2e test for newsync (#5416)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eca0b03b0e [IOTDB-2772] Add testcontainer e2e test for newsync (#5416)
eca0b03b0e is described below

commit eca0b03b0eda301180bb8cf36e64cc8139413636
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon May 9 18:03:22 2022 +0800

    [IOTDB-2772] Add testcontainer e2e test for newsync (#5416)
    
    Co-authored-by: yschengzi <87...@users.noreply.github.com>
    Co-authored-by: Jamber <ja...@sina.com>
    Co-authored-by: haiyi.zb <ha...@alibaba-inc.com>
    Co-authored-by: wangjunqing <wa...@alibaba-inc.com>
    Co-authored-by: Irvine <ir...@gmail.com>
    Co-authored-by: yschengzi <ys...@126.com>
    Co-authored-by: Haonan <hh...@outlook.com>
---
 .github/workflows/e2e.yml                          |   4 +-
 .github/workflows/sync.yml                         |  54 +++
 docker/src/main/Dockerfile-single-tc               |  45 +++
 pom.xml                                            |   2 +-
 .../apache/iotdb/db/metadata/tag/TagLogFile.java   |   7 +-
 testcontainer/Readme.md                            |   2 +
 testcontainer/pom.xml                              | 103 +++++
 .../test/java/org/apache/iotdb/db/sync/SyncIT.java | 426 +++++++++++++++++++++
 .../apache/iotdb/db/sync/SyncWeakNetworkIT.java    |  40 ++
 .../src/test/resources/sync/docker-compose.yaml    |  52 +++
 10 files changed, 731 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index bb2f0120c4..bd82f858fb 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -5,12 +5,12 @@ name: E2E Tests
 
 on:
   push:
-    branches: 
+    branches:
       - test/e2e
     paths-ignore:
       - 'docs/**'
   pull_request:
-    branches: 
+    branches:
       - cluster_new
     paths-ignore:
       - 'docs/**'
diff --git a/.github/workflows/sync.yml b/.github/workflows/sync.yml
new file mode 100644
index 0000000000..14636454b9
--- /dev/null
+++ b/.github/workflows/sync.yml
@@ -0,0 +1,54 @@
+name: Sync Tests
+
+on:
+  push:
+    branches:
+      - master
+    paths-ignore:
+      - 'docs/**'
+  pull_request:
+    branches:
+      - master
+    paths-ignore:
+      - 'docs/**'
+  # allow manually run the action:
+  workflow_dispatch:
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}
+  cancel-in-progress: true
+
+env:
+  MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
+
+jobs:
+  E2E:
+    runs-on: ubuntu-latest
+    strategy:
+      fail-fast: true
+
+    steps:
+      - uses: actions/checkout@v2
+      - name: Set up JDK 11
+        uses: actions/setup-java@v1
+        with:
+          java-version: 11
+
+      - name: Cache Maven packages
+        uses: actions/cache@v2
+        with:
+          path: ~/.m2
+          key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
+          restore-keys: ${{ runner.os }}-m2-
+
+      - name: Build IoTDB server distribution zip
+        run:  mvn -B clean install -pl distribution -am -DskipTests
+
+      - name: Build Docker Image
+        run: |
+          docker build . -f docker/src/main/Dockerfile-single-tc -t "iotdb:$GITHUB_SHA"
+          docker images
+
+      - name: Test Sync
+        run: |
+          mvn -B -T 4 integration-test -pl testcontainer -P test-sync
\ No newline at end of file
diff --git a/docker/src/main/Dockerfile-single-tc b/docker/src/main/Dockerfile-single-tc
new file mode 100644
index 0000000000..f31c81b8e8
--- /dev/null
+++ b/docker/src/main/Dockerfile-single-tc
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# docker build context is the root path of the repository
+
+FROM openjdk:11-jre-slim
+
+ADD distribution/target/apache-iotdb-*-server-bin.zip /
+
+RUN apt update \
+  && apt install lsof dos2unix procps unzip iproute2 iputils-ping -y \
+  && unzip /apache-iotdb-*-bin.zip -d / \
+  && rm /apache-iotdb-*-bin.zip \
+  && mv /apache-iotdb-* /iotdb \
+  && apt remove unzip -y \
+  && apt autoremove -y \
+  && apt purge --auto-remove -y \
+  && apt clean -y
+RUN dos2unix /iotdb/sbin/start-server.sh
+RUN dos2unix /iotdb/sbin/../conf/iotdb-env.sh
+EXPOSE 6667
+EXPOSE 31999
+EXPOSE 5555
+EXPOSE 8086
+EXPOSE 8181
+VOLUME /iotdb/data
+VOLUME /iotdb/logs
+ENV PATH="/iotdb/sbin/:/iotdb/tools/:${PATH}"
+ENTRYPOINT ["/iotdb/sbin/start-server.sh"]
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d752972cc1..d2265f6d50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -204,7 +204,7 @@
         <jackson-core-asl.version>1.9.13</jackson-core-asl.version>
         <mqtt-client.version>1.12</mqtt-client.version>
         <google.code.findbugs.jsr305.version>3.0.2</google.code.findbugs.jsr305.version>
-        <jna.version>5.5.0</jna.version>
+        <jna.version>5.8.0</jna.version>
         <zookeeper.version>3.4.9</zookeeper.version>
         <commons-beanutils.version>1.9.4</commons-beanutils.version>
         <commons-compress.version>1.21</commons-compress.version>
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagLogFile.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagLogFile.java
index 87345427c2..2fa968ddf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagLogFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagLogFile.java
@@ -31,6 +31,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.Collections;
@@ -71,7 +72,11 @@ public class TagLogFile implements AutoCloseable {
             StandardOpenOption.WRITE,
             StandardOpenOption.CREATE);
     // move the current position to the tail of the file
-    this.fileChannel.position(fileChannel.size());
+    try {
+      this.fileChannel.position(fileChannel.size());
+    } catch (ClosedByInterruptException e) {
+      // ignore
+    }
   }
 
   /** @return tags map, attributes map */
diff --git a/testcontainer/Readme.md b/testcontainer/Readme.md
index 234c6861b6..46cf905fc6 100644
--- a/testcontainer/Readme.md
+++ b/testcontainer/Readme.md
@@ -58,6 +58,8 @@ For testing a cluster, we use `docker-compose` and `testcontainer`.
 The docker-compose file is located at  `src/test/resources/1nodes`,  `src/test/resources/3nodes` and `src/test/resources/5nodes`, 
 in which one is for 1 node with replica number =1 , 3 nodes with replica number=3, and the last one is for 5 nodes with replica number =3.
 
+For testing sync module only, we use `mvn integration-test -P sync`.
+
 TestContainer can start the docker (or docker compose) automatically.
 
 But these docker compose files can also be used independently.
diff --git a/testcontainer/pom.xml b/testcontainer/pom.xml
index 02ac9daabd..8bb032df4c 100644
--- a/testcontainer/pom.xml
+++ b/testcontainer/pom.xml
@@ -35,6 +35,8 @@
         <docker.clean.single.argument>image rm apache/iotdb:maven-development</docker.clean.single.argument>
         <docker.build.cluster.argument>build -t apache/iotdb:cluster-maven-development -f ${basedir}/../docker/src/main/Dockerfile-cluster ${basedir}/../.</docker.build.cluster.argument>
         <docker.clean.cluster.argument>image rm apache/iotdb:cluster-maven-development</docker.clean.cluster.argument>
+        <docker.build.sync.argument>build -t apache/iotdb:sync-maven-development -f ${basedir}/../docker/src/main/Dockerfile-single-tc ${basedir}/../.</docker.build.sync.argument>
+        <docker.clean.sync.argument>image rm apache/iotdb:sync-maven-development</docker.clean.sync.argument>
     </properties>
     <dependencies>
         <dependency>
@@ -100,6 +102,18 @@
                                     <commandlineArgs>${docker.build.cluster.argument}</commandlineArgs>
                                 </configuration>
                             </execution>
+                            <execution>
+                                <id>build-sync-docker-image</id>
+                                <phase>pre-integration-test</phase>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                                <configuration>
+                                    <skip>${docker.test.skip}</skip>
+                                    <executable>${docker.build.executable}</executable>
+                                    <commandlineArgs>${docker.build.sync.argument}</commandlineArgs>
+                                </configuration>
+                            </execution>
                             <execution>
                                 <id>clean-docker-image</id>
                                 <phase>post-integration-test</phase>
@@ -124,6 +138,90 @@
                                     <commandlineArgs>${docker.clean.cluster.argument}</commandlineArgs>
                                 </configuration>
                             </execution>
+                            <execution>
+                                <id>clean-sync-docker-image</id>
+                                <phase>post-integration-test</phase>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                                <configuration>
+                                    <skip>${docker.test.skip}</skip>
+                                    <executable>${docker.build.executable}</executable>
+                                    <commandlineArgs>${docker.clean.sync.argument}</commandlineArgs>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>build-helper-maven-plugin</artifactId>
+                        <version>3.2.0</version>
+                        <executions>
+                            <execution>
+                                <id>add-test-container-source</id>
+                                <phase>generate-test-sources</phase>
+                                <goals>
+                                    <goal>add-test-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>${basedir}/src/test/java</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>run-integration-tests</id>
+                                <phase>integration-test</phase>
+                                <goals>
+                                    <goal>integration-test</goal>
+                                    <goal>verify</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>test-sync</id>
+            <build>
+                <plugins>
+                    <!-- before integration test, we build the docker image -->
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>exec-maven-plugin</artifactId>
+                        <version>1.6.0</version>
+                        <executions>
+                            <execution>
+                                <id>build-sync-docker-image</id>
+                                <phase>pre-integration-test</phase>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                                <configuration>
+                                    <skip>${docker.test.skip}</skip>
+                                    <executable>${docker.build.executable}</executable>
+                                    <commandlineArgs>${docker.build.sync.argument}</commandlineArgs>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>clean-sync-docker-image</id>
+                                <phase>post-integration-test</phase>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                                <configuration>
+                                    <skip>${docker.test.skip}</skip>
+                                    <executable>${docker.build.executable}</executable>
+                                    <commandlineArgs>${docker.clean.sync.argument}</commandlineArgs>
+                                </configuration>
+                            </execution>
                         </executions>
                     </plugin>
                     <plugin>
@@ -158,6 +256,11 @@
                                 </goals>
                             </execution>
                         </executions>
+                        <configuration>
+                            <includes>
+                                <include>**/org/apache/iotdb/db/sync/**</include>
+                            </includes>
+                        </configuration>
                     </plugin>
                 </plugins>
             </build>
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncIT.java
new file mode 100644
index 0000000000..c7caf1918d
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncIT.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync;
+
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.NoProjectNameDockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SyncIT {
+  private static Logger receiverLogger = LoggerFactory.getLogger("iotdb-receiver_1");
+  private static Logger senderLogger = LoggerFactory.getLogger("iotdb-sender_1");
+  private static int RETRY_TIME = 30;
+
+  protected Statement senderStatement;
+  protected Connection senderConnection;
+  protected Statement receiverStatement;
+  protected Connection receiverConnection;
+
+  // in TestContainer's document, it is @ClassRule, and the environment is `public static`
+  // I am not sure the difference now.
+  @Rule
+  public DockerComposeContainer environment =
+      new NoProjectNameDockerComposeContainer(
+              "sync", new File("src/test/resources/sync/docker-compose.yaml"))
+          .withExposedService("iotdb-sender_1", 6667, Wait.forListeningPort())
+          .withLogConsumer("iotdb-sender_1", new Slf4jLogConsumer(senderLogger))
+          .withExposedService("iotdb-receiver_1", 6667, Wait.forListeningPort())
+          .withLogConsumer("iotdb-receiver_1", new Slf4jLogConsumer(receiverLogger))
+          .withLocalCompose(true);
+
+  protected int getSenderRpcPort() {
+    return environment.getServicePort("iotdb-sender_1", 6667);
+  }
+
+  protected String getSenderIp() {
+    return environment.getServiceHost("iotdb-sender_1", 6667);
+  }
+
+  protected int getReceiverRpcPort() {
+    return environment.getServicePort("iotdb-receiver_1", 6667);
+  }
+
+  protected String getReceiverIp() {
+    return environment.getServiceHost("iotdb-receiver_1", 6667);
+  }
+
+  @Before
+  public void init() throws Exception {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    senderConnection =
+        DriverManager.getConnection(
+            "jdbc:iotdb://" + getSenderIp() + ":" + getSenderRpcPort(), "root", "root");
+    senderStatement = senderConnection.createStatement();
+    receiverConnection =
+        DriverManager.getConnection(
+            "jdbc:iotdb://" + getReceiverIp() + ":" + getReceiverRpcPort(), "root", "root");
+    receiverStatement = receiverConnection.createStatement();
+  }
+
+  @After
+  public void clean() throws Exception {
+    senderStatement.close();
+    senderConnection.close();
+    receiverStatement.close();
+    receiverConnection.close();
+  }
+
+  private void prepareSchema() throws Exception {
+    senderStatement.execute("set storage group to root.sg1");
+    senderStatement.execute("set storage group to root.sg2");
+    senderStatement.execute("create timeseries root.sg1.d1.s1 with datatype=int32, encoding=PLAIN");
+    senderStatement.execute("create timeseries root.sg1.d1.s2 with datatype=float, encoding=RLE");
+    senderStatement.execute("create timeseries root.sg1.d1.s3 with datatype=TEXT, encoding=PLAIN");
+    senderStatement.execute("create timeseries root.sg1.d2.s4 with datatype=int64, encoding=PLAIN");
+    senderStatement.execute("create timeseries root.sg2.d1.s0 with datatype=int32, encoding=PLAIN");
+    senderStatement.execute(
+        "create timeseries root.sg2.d2.s1 with datatype=boolean, encoding=PLAIN");
+  }
+
+  /* add one seq tsfile in sg1 */
+  private void prepareIns1() throws Exception {
+    senderStatement.execute(
+        "insert into root.sg1.d1(timestamp, s1, s2, s3) values(1, 1, 16.0, 'a')");
+    senderStatement.execute(
+        "insert into root.sg1.d1(timestamp, s1, s2, s3) values(2, 2, 25.16, 'b')");
+    senderStatement.execute(
+        "insert into root.sg1.d1(timestamp, s1, s2, s3) values(3, 3, 65.25, 'c')");
+    senderStatement.execute(
+        "insert into root.sg1.d1(timestamp, s1, s2, s3) values(16, 25, 100.0, 'd')");
+    senderStatement.execute("insert into root.sg1.d2(timestamp, s4) values(1, 1)");
+    senderStatement.execute("flush");
+  }
+
+  /* add one seq tsfile in sg1 */
+  private void prepareIns2() throws Exception {
+    senderStatement.execute(
+        "insert into root.sg1.d1(timestamp, s1, s2, s3) values(100, 65, 16.25, 'e')");
+    senderStatement.execute(
+        "insert into root.sg1.d1(timestamp, s1, s2, s3) values(65, 100, 25.0, 'f')");
+    senderStatement.execute("insert into root.sg1.d2(timestamp, s4) values(200, 100)");
+    senderStatement.execute("flush");
+  }
+
+  /* add one seq tsfile in sg1, one unseq tsfile in sg1, one seq tsfile in sg2 */
+  private void prepareIns3() throws Exception {
+    senderStatement.execute("insert into root.sg2.d1(timestamp, s0) values(100, 100)");
+    senderStatement.execute("insert into root.sg2.d1(timestamp, s0) values(65, 65)");
+    senderStatement.execute("insert into root.sg2.d2(timestamp, s1) values(1, true)");
+    senderStatement.execute(
+        "insert into root.sg1.d1(timestamp, s1, s2, s3) values(25, 16, 65.16, 'g')");
+    senderStatement.execute(
+        "insert into root.sg1.d1(timestamp, s1, s2, s3) values(200, 100, 16.65, 'h')");
+    senderStatement.execute("flush");
+  }
+
+  private void prepareDel1() throws Exception { // after ins1, add 2 deletions
+    senderStatement.execute("delete from root.sg1.d1.s1 where time == 3");
+    senderStatement.execute("delete from root.sg1.d1.s2 where time >= 1 and time <= 2");
+  }
+
+  private void prepareDel2() throws Exception { // after ins2, add 3 deletions
+    senderStatement.execute("delete from root.sg1.d1.s3 where time <= 65");
+  }
+
+  private void prepareDel3() throws Exception { // after ins3, add 5 deletions, 2 schemas{
+    senderStatement.execute("delete from root.sg1.d1.* where time <= 2");
+    senderStatement.execute("delete timeseries root.sg1.d2.*");
+    senderStatement.execute("delete storage group root.sg2");
+  }
+
+  private void preparePipe() throws Exception {
+    receiverStatement.execute("start pipeserver");
+    senderStatement.execute(
+        "create pipesink my_iotdb as iotdb(ip='sync_iotdb-receiver_1',port=6670)");
+    senderStatement.execute("create pipe p to my_iotdb");
+  }
+
+  private void startPipe() throws Exception {
+    senderStatement.execute("start pipe p");
+  }
+
+  private void stopPipe() throws Exception {
+    senderStatement.execute("stop pipe p");
+  }
+
+  private void dropPipe() throws Exception {
+    senderStatement.execute("drop pipe p");
+  }
+
+  private void checkResult() throws Exception {
+    String[] columnNames =
+        new String[] {
+          "root.sg1.d1.s3",
+          "root.sg1.d1.s1",
+          "root.sg1.d1.s2",
+          "root.sg1.d2.s4",
+          "root.sg2.d1.s0",
+          "root.sg2.d2.s1"
+        };
+    String[] results =
+        new String[] {
+          "1,a,1,16.0,1,null,true",
+          "2,b,2,25.16,null,null,null",
+          "3,c,3,65.25,null,null,null",
+          "16,d,25,100.0,null,null,null",
+          "25,g,16,65.16,null,null,null",
+          "65,f,100,25.0,null,65,null",
+          "100,e,65,16.25,null,100,null",
+          "200,h,100,16.65,100,null,null"
+        };
+    checkResult(receiverStatement, "select ** from root", columnNames, results, true);
+  }
+
+  private void checkResultWithDeletion() throws Exception {
+    String[] columnNames =
+        new String[] {
+          "root.sg1.d1.s3", "root.sg1.d1.s1", "root.sg1.d1.s2",
+        };
+    String[] results =
+        new String[] {
+          "3,null,null,65.25",
+          "16,null,25,100.0",
+          "25,null,16,65.16",
+          "65,null,100,25.0",
+          "100,e,65,16.25",
+          "200,h,100,16.65"
+        };
+    checkResult(receiverStatement, "select ** from root", columnNames, results, true);
+  }
+
+  @Test
+  public void testCreatePipe() throws Exception {
+    preparePipe();
+    checkResult(
+        receiverStatement,
+        "show pipe",
+        new String[] {"name", "role", "status"},
+        new String[] {"p,receiver,STOP"},
+        false);
+    dropPipe();
+    checkResult(
+        senderStatement,
+        "show pipe",
+        new String[] {"name", "role", "remote", "status"},
+        new String[] {"p,sender,my_iotdb,DROP"},
+        false);
+  }
+
+  @Test
+  public void testHistoryInsert() {
+    try {
+      prepareSchema();
+      prepareIns1();
+      prepareIns2();
+      prepareIns3();
+      preparePipe();
+      startPipe();
+      checkResult();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testHistoryAndRealTimeInsert() {
+    try {
+      prepareSchema();
+      prepareIns1();
+      prepareIns2();
+      preparePipe();
+      startPipe();
+      prepareIns3();
+      checkResult();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testStopAndStartInsert() {
+    try {
+      prepareSchema();
+      prepareIns1();
+      preparePipe();
+      startPipe();
+      prepareIns2();
+      stopPipe();
+      prepareIns3();
+      startPipe();
+      checkResult();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRealTimeAndStopInsert() {
+    try {
+      preparePipe(); // realtime
+      startPipe();
+      prepareSchema();
+      prepareIns1();
+      stopPipe();
+      prepareIns2();
+      startPipe();
+      prepareIns3();
+      checkResult();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testHistoryDel() {
+    try {
+      prepareSchema(); // history
+      prepareIns1();
+      prepareIns2();
+      prepareIns3();
+      prepareDel1();
+      prepareDel2();
+      prepareDel3();
+      preparePipe(); // realtime
+      startPipe();
+      checkResultWithDeletion();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRealtimeDel() {
+    try {
+      prepareSchema(); // history
+      prepareIns1();
+      preparePipe(); // realtime
+      startPipe();
+      prepareIns2();
+      prepareDel1();
+      stopPipe();
+      prepareIns3();
+      startPipe();
+      prepareDel2();
+      prepareDel3();
+      checkResultWithDeletion();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+  }
+
+  /**
+   * Execute sql in IoTDB and compare resultSet with expected result. This method only check columns
+   * that is explicitly declared in columnNames. This method will compare expected result with
+   * actual result RETRY_TIME times. Interval of each run is 1000ms.
+   *
+   * @param statement Statement of IoTDB.
+   * @param sql SQL to be executed.
+   * @param columnNames Columns to be compared with.
+   * @param retArray Expected result set. Order of columns is as same as columnNames.
+   * @param hasTimeColumn If result set contains time column (e.g. timeserires query), set
+   *     hasTimeColumn = true.
+   */
+  private static void checkResult(
+      Statement statement,
+      String sql,
+      String[] columnNames,
+      String[] retArray,
+      boolean hasTimeColumn)
+      throws Exception {
+    loop:
+    for (int loop = 0; loop < RETRY_TIME; loop++) {
+      try {
+        Thread.sleep(1000);
+        boolean hasResultSet = statement.execute(sql);
+        if (!assertOrCompareEqual(true, hasResultSet, loop)) {
+          continue;
+        }
+        ResultSet resultSet = statement.getResultSet();
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          if (hasTimeColumn) {
+            builder.append(resultSet.getString(1)).append(",");
+          }
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(resultSet.getString(index)).append(",");
+          }
+          if (builder.length() > 0) {
+            builder.deleteCharAt(builder.length() - 1);
+          }
+          if (!assertOrCompareEqual(retArray[cnt], builder.toString(), loop)) {
+            continue loop;
+          }
+          cnt++;
+        }
+        if (!assertOrCompareEqual(retArray.length, cnt, loop)) {
+          continue;
+        }
+        return;
+      } catch (Exception e) {
+        if (loop == RETRY_TIME - 1) {
+          throw e;
+        }
+      }
+    }
+    Assert.fail();
+  }
+
+  private static boolean assertOrCompareEqual(Object expected, Object actual, int loop) {
+    if (loop == RETRY_TIME - 1) {
+      assertEquals(expected, actual);
+      return true;
+    } else {
+      return expected.equals(actual);
+    }
+  }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncWeakNetworkIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncWeakNetworkIT.java
new file mode 100644
index 0000000000..09effada3a
--- /dev/null
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sync/SyncWeakNetworkIT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync;
+
+import org.junit.Assert;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.ContainerState;
+
+/** Simulate network delay and loss. */
+public class SyncWeakNetworkIT extends SyncIT {
+  @Override
+  public void init() throws Exception {
+    super.init();
+    // set delay is 200±50ms that conform to a normal distribution;
+    // network packet with 10% loss rate, 10% duplicate rate, 10% reorder rate and 10% corrupt rate;
+    Container.ExecResult res =
+        ((ContainerState) environment.getContainerByServiceName("iotdb-sender_1").get())
+            .execInContainer(
+                "sh",
+                "-c",
+                "tc qdisc add dev eth0 root netem delay 200ms 50ms 25% distribution normal loss random 10% duplicate 10% reorder 10% corrupt 10%");
+    Assert.assertEquals(0, res.getExitCode());
+  }
+}
diff --git a/testcontainer/src/test/resources/sync/docker-compose.yaml b/testcontainer/src/test/resources/sync/docker-compose.yaml
new file mode 100644
index 0000000000..fa0cedc8a4
--- /dev/null
+++ b/testcontainer/src/test/resources/sync/docker-compose.yaml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+version: '3.8'
+
+services:
+  iotdb-sender:
+    image: apache/iotdb:sync-maven-development
+    networks:
+      - iotdb
+    cap_add:
+      - NET_ADMIN
+    healthcheck:
+      test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667" ]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    volumes:
+    - ../logback-container.xml:/iotdb/conf/logback.xml
+  iotdb-receiver:
+    image: apache/iotdb:sync-maven-development
+    networks:
+      - iotdb
+    cap_add:
+      - NET_ADMIN
+    healthcheck:
+      test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667" ]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    volumes:
+      - ../logback-container.xml:/iotdb/conf/logback.xml
+
+
+networks:
+  iotdb:
+    driver: bridge
\ No newline at end of file