You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/25 11:51:35 UTC

[iotdb] branch master updated: Pipe: Add IT for different cluster config and consensus protocols (#11097)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c7718ea8fa Pipe: Add IT for different cluster config and consensus protocols (#11097)
5c7718ea8fa is described below

commit 5c7718ea8fa65fa6495f204f56f02c021ea5deca
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Mon Sep 25 19:51:28 2023 +0800

    Pipe: Add IT for different cluster config and consensus protocols (#11097)
---
 .../iotdb/it/env/cluster/ClusterConstant.java      |   2 +
 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 +
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  10 +-
 .../iotdb/it/env/cluster/env/Cluster1Env.java      |   6 +
 .../iotdb/it/env/cluster/env/MultiClusterEnv.java  |   6 +
 .../apache/iotdb/it/env/cluster/env/SimpleEnv.java |   6 +
 .../iotdb/it/env/cluster/node/DataNodeWrapper.java |   9 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |   6 +
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   9 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |  98 +++
 .../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java   | 766 +++++++++++++++++++++
 .../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 469 ++++++++-----
 .../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java  | 415 +++++++++++
 .../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java     |  23 +-
 .../pipe/it/extractor/IoTDBPipeExtractorIT.java    | 151 ++--
 18 files changed, 1704 insertions(+), 292 deletions(-)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 2bf3161aabb..1ce6143887c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -165,6 +165,8 @@ public class ClusterConstant {
   public static final String DN_MPP_DATA_EXCHANGE_PORT = "dn_mpp_data_exchange_port";
   public static final String DN_DATA_REGION_CONSENSUS_PORT = "dn_data_region_consensus_port";
   public static final String DN_SCHEMA_REGION_CONSENSUS_PORT = "dn_schema_region_consensus_port";
+  public static final String PIPE_AIR_GAP_RECEIVER_ENABLED = "pipe_air_gap_receiver_enabled";
+  public static final String PIPE_AIR_GAP_RECEIVER_PORT = "pipe_air_gap_receiver_port";
   public static final String MAX_TSBLOCK_SIZE_IN_BYTES = "max_tsblock_size_in_bytes";
   public static final String PAGE_SIZE_IN_BYTE = "page_size_in_byte";
   public static final String DN_JOIN_CLUSTER_RETRY_INTERVAL_MS =
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index ecf17099ba5..02f89b74652 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -379,6 +379,12 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled) {
+    setProperty("pipe_air_gap_receiver_enabled", String.valueOf(isPipeAirGapReceiverEnabled));
+    return this;
+  }
+
   // For part of the log directory
   public String getClusterConfigStr() {
     return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 2e577044749..27d973311e5 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -385,4 +385,11 @@ public class MppSharedCommonConfig implements CommonConfig {
     cnConfig.setSchemaRegionPerDataNode(schemaRegionPerDataNode);
     return this;
   }
+
+  @Override
+  public CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled) {
+    dnConfig.setPipeAirGapReceiverEnabled(isPipeAirGapReceiverEnabled);
+    cnConfig.setPipeAirGapReceiverEnabled(isPipeAirGapReceiverEnabled);
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 9d077b46f7a..95a55a5108a 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -95,6 +95,7 @@ public abstract class AbstractEnv implements BaseEnv {
   protected String testMethodName = null;
   protected int index = 0;
   protected long startTime;
+  protected int testWorkingRetryCount = 30;
 
   private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
 
@@ -122,6 +123,11 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   protected void initEnvironment(int configNodesNum, int dataNodesNum) {
+    initEnvironment(configNodesNum, dataNodesNum, 30);
+  }
+
+  protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+    this.testWorkingRetryCount = testWorkingRetryCount;
     this.configNodeWrapperList = new ArrayList<>();
     this.dataNodeWrapperList = new ArrayList<>();
 
@@ -275,7 +281,7 @@ public abstract class AbstractEnv implements BaseEnv {
       testDelegate.addRequest(
           () -> {
             Exception lastException = null;
-            for (int i = 0; i < 30; i++) {
+            for (int i = 0; i < testWorkingRetryCount; i++) {
               try (Connection ignored = getConnection(dataNodeEndpoint, PROBE_TIMEOUT_MS)) {
                 logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint);
                 return null;
@@ -296,7 +302,7 @@ public abstract class AbstractEnv implements BaseEnv {
       logger.info("Start cluster costs: {}s", (System.currentTimeMillis() - startTime) / 1000.0);
     } catch (Exception e) {
       logger.error("exception in testWorking of ClusterID, message: {}", e.getMessage(), e);
-      fail("After 30 times retry, the cluster can't work!");
+      fail(String.format("After %d times retry, the cluster can't work!", testWorkingRetryCount));
     }
   }
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/Cluster1Env.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/Cluster1Env.java
index 52888c744e4..20f22e0b5ad 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/Cluster1Env.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/Cluster1Env.java
@@ -34,4 +34,10 @@ public class Cluster1Env extends AbstractEnv {
   public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
     super.initEnvironment(configNodesNum, dataNodesNum);
   }
+
+  @Override
+  public void initClusterEnvironment(
+      int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+    super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
index c6f907633ac..8eecd4ea00d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
@@ -40,4 +40,10 @@ public class MultiClusterEnv extends AbstractEnv {
   public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
     super.initEnvironment(configNodesNum, dataNodesNum);
   }
+
+  @Override
+  public void initClusterEnvironment(
+      int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+    super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/SimpleEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/SimpleEnv.java
index f11ec341fce..eb9a1828a73 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/SimpleEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/SimpleEnv.java
@@ -30,4 +30,10 @@ public class SimpleEnv extends AbstractEnv {
   public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
     super.initEnvironment(configNodesNum, dataNodesNum);
   }
+
+  @Override
+  public void initClusterEnvironment(
+      int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+    super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index fa99044667a..ebd9934a16d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -59,6 +59,7 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.MAX_TSBLOCK_SIZE_I
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_HOST;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_PORT;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.PAGE_SIZE_IN_BYTE;
+import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_AIR_GAP_RECEIVER_PORT;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCHEMA_REPLICATION_FACTOR;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.SYSTEM_PROPERTIES_FILE;
@@ -72,6 +73,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   private final int dataRegionConsensusPort;
   private final int schemaRegionConsensusPort;
   private final int mqttPort;
+  private final int pipeAirGapReceiverPort;
 
   private final String defaultNodePropertiesFile;
 
@@ -92,6 +94,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     this.dataRegionConsensusPort = portList[3];
     this.schemaRegionConsensusPort = portList[4];
     this.mqttPort = portList[5];
+    this.pipeAirGapReceiverPort = portList[6];
     this.defaultNodePropertiesFile =
         EnvUtils.getFilePathFromSysVar(DEFAULT_DATA_NODE_PROPERTIES, clusterIndex);
     this.defaultCommonPropertiesFile =
@@ -103,6 +106,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     // Override mqtt properties of super class
     immutableCommonProperties.setProperty(MQTT_HOST, super.getIp());
     immutableCommonProperties.setProperty(MQTT_PORT, String.valueOf(this.mqttPort));
+    immutableCommonProperties.setProperty(
+        PIPE_AIR_GAP_RECEIVER_PORT, String.valueOf(this.pipeAirGapReceiverPort));
 
     immutableNodeProperties.setProperty(IoTDBConstant.DN_TARGET_CONFIG_NODE_LIST, targetConfigNode);
     immutableNodeProperties.setProperty(DN_SYSTEM_DIR, MppBaseConfig.NULL_VALUE);
@@ -253,4 +258,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   public int getMqttPort() {
     return mqttPort;
   }
+
+  public int getPipeAirGapReceiverPort() {
+    return pipeAirGapReceiverPort;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 7e8069a74e7..81d1bb8d1f2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -275,4 +275,9 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) {
     return this;
   }
+
+  @Override
+  public CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled) {
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 687e89e1614..e25e97835f5 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -81,6 +81,12 @@ public class RemoteServerEnv implements BaseEnv {
     initClusterEnvironment();
   }
 
+  @Override
+  public void initClusterEnvironment(
+      int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+    initClusterEnvironment();
+  }
+
   @Override
   public void cleanClusterEnvironment() {
     clientManager.close();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 1a38310335c..5e3f18f450f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -47,6 +47,15 @@ public interface BaseEnv {
    */
   void initClusterEnvironment(int configNodesNum, int dataNodesNum);
 
+  /**
+   * Init a cluster with the specified number of ConfigNodes and DataNodes.
+   *
+   * @param configNodesNum the number of ConfigNodes.
+   * @param dataNodesNum the number of DataNodes.
+   * @param testWorkingRetryCount the retry count when testing the availability of cluster
+   */
+  void initClusterEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount);
+
   /** Destroy the cluster and all the configurations. */
   void cleanClusterEnvironment();
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 943ab1d7c24..61ab834539e 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -123,4 +123,6 @@ public interface CommonConfig {
   CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode);
 
   CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode);
+
+  CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled);
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 0b6f6be0dac..4b9be28d4a1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.db.it.utils;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -39,10 +42,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
 import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
 import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -338,6 +343,47 @@ public class TestUtils {
     }
   }
 
+  public static void executeNonQueryWithRetry(BaseEnv env, String sql) {
+    for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+      try (Connection connection = env.getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.execute(sql);
+        break;
+      } catch (SQLException e) {
+        if (retryCountLeft > 0) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException ignored) {
+          }
+        } else {
+          e.printStackTrace();
+          fail(e.getMessage());
+        }
+      }
+    }
+  }
+
+  public static void executeNonQueryOnSpecifiedDataNodeWithRetry(
+      BaseEnv env, DataNodeWrapper wrapper, String sql) {
+    for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+      try (Connection connection = env.getConnectionWithSpecifiedDataNode(wrapper);
+          Statement statement = connection.createStatement()) {
+        statement.execute(sql);
+        break;
+      } catch (SQLException e) {
+        if (retryCountLeft > 0) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException ignored) {
+          }
+        } else {
+          e.printStackTrace();
+          fail(e.getMessage());
+        }
+      }
+    }
+  }
+
   public static void executeQuery(String sql) {
     executeQuery(sql, "root", "root");
   }
@@ -352,6 +398,25 @@ public class TestUtils {
     }
   }
 
+  public static ResultSet executeQueryWithRetry(Statement statement, String sql) {
+    for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+      try {
+        return statement.executeQuery(sql);
+      } catch (SQLException e) {
+        if (retryCountLeft > 0) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException ignored) {
+          }
+        } else {
+          e.printStackTrace();
+          fail(e.getMessage());
+        }
+      }
+    }
+    return null;
+  }
+
   public static void assertResultSetEqual(
       SessionDataSet actualResultSet,
       List<String> expectedColumnNames,
@@ -420,4 +485,37 @@ public class TestUtils {
       fail(e.getMessage());
     }
   }
+
+  public static void restartCluster(BaseEnv env) throws Exception {
+    for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) {
+      env.shutdownConfigNode(i);
+    }
+    for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) {
+      env.shutdownDataNode(i);
+    }
+    TimeUnit.SECONDS.sleep(1);
+    for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) {
+      env.startConfigNode(i);
+    }
+    for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) {
+      env.startDataNode(i);
+    }
+    ((AbstractEnv) env).testWorkingNoUnknown();
+  }
+
+  public static void assertDataOnEnv(
+      BaseEnv env, String sql, String expectedHeader, Set<String> expectedResSet) {
+    try (Connection connection = env.getConnection();
+        Statement statement = connection.createStatement()) {
+      await()
+          .atMost(600, TimeUnit.SECONDS)
+          .untilAsserted(
+              () ->
+                  TestUtils.assertResultSetEqual(
+                      executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet));
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
new file mode 100644
index 00000000000..5792ea3598c
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+public class IoTDBPipeClusterIT {
+
+  private BaseEnv senderEnv;
+  private BaseEnv receiverEnv;
+
+  @Before
+  public void setUp() throws Exception {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+
+    senderEnv.initClusterEnvironment(3, 3, 180);
+    receiverEnv.initClusterEnvironment(3, 3, 180);
+  }
+
+  @After
+  public void tearDown() {
+    senderEnv.cleanClusterEnvironment();
+    receiverEnv.cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testWithAllParametersInLogMode() throws Exception {
+    testWithAllParameters("log");
+  }
+
+  @Test
+  public void testWithAllParametersInFileMode() throws Exception {
+    testWithAllParameters("file");
+  }
+
+  @Test
+  public void testWithAllParametersInHybridMode() throws Exception {
+    testWithAllParameters("hybrid");
+  }
+
+  public void testWithAllParameters(String realtimeMode) throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2010-01-01T10:00:00+08:00, 1)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2010-01-02T10:00:00+08:00, 2)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor", "iotdb-extractor");
+      extractorAttributes.put("extractor.pattern", "root.db.d1");
+      extractorAttributes.put("extractor.history.enable", "true");
+      extractorAttributes.put("extractor.history.start-time", "2010-01-01T08:00:00+08:00");
+      extractorAttributes.put("extractor.history.end-time", "2010-01-02T08:00:00+08:00");
+      extractorAttributes.put("extractor.realtime.enable", "true");
+      extractorAttributes.put("extractor.realtime.mode", realtimeMode);
+
+      processorAttributes.put("processor", "do-nothing-processor");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+      connectorAttributes.put("connector.user", "root");
+      connectorAttributes.put("connector.password", "root");
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("1,"));
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+    }
+  }
+
+  @Test
+  public void testPipeAfterDataRegionLeaderStop() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.pattern", "root.db.d1");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      AtomicInteger leaderPort = new AtomicInteger(-1);
+      TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+      showRegionResp
+          .getRegionInfoList()
+          .forEach(
+              regionInfo -> {
+                if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+                  leaderPort.set(regionInfo.getClientRpcPort());
+                }
+              });
+
+      int leaderIndex = -1;
+      for (int i = 0; i < 3; ++i) {
+        if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
+          leaderIndex = i;
+          senderEnv.shutdownDataNode(i);
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ignored) {
+          }
+          senderEnv.startDataNode(i);
+          ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+        }
+      }
+      if (leaderIndex == -1) { // ensure the leader is stopped
+        fail();
+      }
+
+      TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
+          senderEnv,
+          senderEnv.getDataNodeWrapper(leaderIndex),
+          "insert into root.db.d1(time, s1) values (2, 2)");
+      TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
+          senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush");
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.db.d1",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+    }
+
+    TestUtils.restartCluster(senderEnv);
+    TestUtils.restartCluster(receiverEnv);
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      // create a new pipe and write new data
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.pattern", "root.db.d2");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p2", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode());
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d2(time, s1) values (1, 1)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.db.d2",
+          "count(root.db.d2.s1),",
+          Collections.singleton("1,"));
+    }
+  }
+
+  @Test
+  public void testPipeAfterRegisterNewDataNode() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.pattern", "root.db.d1");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      senderEnv.registerNewDataNode(true);
+      DataNodeWrapper newDataNode =
+          senderEnv.getDataNodeWrapper(senderEnv.getDataNodeWrapperList().size() - 1);
+      TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
+          senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2, 2)");
+      TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(senderEnv, newDataNode, "flush");
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.db.d1",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+    }
+
+    TestUtils.restartCluster(senderEnv);
+    TestUtils.restartCluster(receiverEnv);
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      // create a new pipe and write new data
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.pattern", "root.db.d2");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p2", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode());
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d2(time, s1) values (1, 1)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.db.d2",
+          "count(root.db.d2.s1),",
+          Collections.singleton("1,"));
+    }
+  }
+
+  @Test
+  public void testCreatePipeWhenRegisteringNewDataNode() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      Thread t =
+          new Thread(
+              () -> {
+                for (int i = 0; i < 30; ++i) {
+                  try {
+                    client.createPipe(
+                        new TCreatePipeReq("p" + i, connectorAttributes)
+                            .setExtractorAttributes(extractorAttributes)
+                            .setProcessorAttributes(processorAttributes));
+                  } catch (TException e) {
+                    e.printStackTrace();
+                    fail(e.getMessage());
+                  }
+                  try {
+                    Thread.sleep(100);
+                  } catch (Exception ignored) {
+                  }
+                }
+              });
+      t.start();
+      senderEnv.registerNewDataNode(true);
+      t.join();
+    }
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(30, showPipeResult.size());
+    }
+  }
+
+  @Test
+  public void testRegisteringNewDataNodeWhenTransferringData() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      Thread t =
+          new Thread(
+              () -> {
+                try {
+                  for (int i = 0; i < 100; ++i) {
+                    TestUtils.executeNonQueryWithRetry(
+                        senderEnv,
+                        String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+                    Thread.sleep(100);
+                  }
+                } catch (InterruptedException ignored) {
+                }
+              });
+      t.start();
+      senderEnv.registerNewDataNode(true);
+      t.join();
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.db.d1",
+          "count(root.db.d1.s1),",
+          Collections.singleton("100,"));
+
+      senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
+      senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
+    }
+  }
+
+  @Test
+  public void testRegisteringNewDataNodeAfterTransferringData() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      for (int i = 0; i < 100; ++i) {
+        TestUtils.executeNonQueryWithRetry(
+            senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+      }
+
+      senderEnv.registerNewDataNode(true);
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.db.d1",
+          "count(root.db.d1.s1),",
+          Collections.singleton("100,"));
+
+      senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
+      senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
+    }
+  }
+
+  @Test
+  public void testNewDataNodeFailureAfterTransferringData() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      for (int i = 0; i < 100; ++i) {
+        TestUtils.executeNonQueryWithRetry(
+            senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i * 1000));
+      }
+
+      senderEnv.registerNewDataNode(false);
+      senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
+      senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); // ctrl + c
+      senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
+      ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.db.d1",
+          "count(root.db.d1.s1),",
+          Collections.singleton("100,"));
+
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+    }
+  }
+
+  @Test
+  public void testSenderRestartWhenTransferring() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+    }
+
+    for (int i = 0; i < 100; ++i) {
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i * 1000));
+    }
+    TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+    TestUtils.restartCluster(senderEnv);
+
+    TestUtils.assertDataOnEnv(
+        receiverEnv,
+        "select count(*) from root.**",
+        "count(root.db.d1.s1),",
+        Collections.singleton("100,"));
+  }
+
+  @Test
+  public void testConcurrentlyCreatePipeOfSameName() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    Map<String, String> extractorAttributes = new HashMap<>();
+    Map<String, String> processorAttributes = new HashMap<>();
+    Map<String, String> connectorAttributes = new HashMap<>();
+
+    connectorAttributes.put("connector", "iotdb-thrift-connector");
+    connectorAttributes.put("connector.batch.enable", "false");
+    connectorAttributes.put("connector.ip", receiverIp);
+    connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+    AtomicInteger successCount = new AtomicInteger(0);
+    List<Thread> threads = new ArrayList<>();
+    for (int i = 0; i < 10; ++i) {
+      Thread t =
+          new Thread(
+              () -> {
+                try (SyncConfigNodeIServiceClient client =
+                    (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+                  TSStatus status =
+                      client.createPipe(
+                          new TCreatePipeReq("p1", connectorAttributes)
+                              .setExtractorAttributes(extractorAttributes)
+                              .setProcessorAttributes(processorAttributes));
+                  if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+                    successCount.updateAndGet(v -> v + 1);
+                  }
+                } catch (Exception e) {
+                  e.printStackTrace();
+                  fail(e.getMessage());
+                }
+              });
+      t.start();
+      threads.add(t);
+    }
+
+    for (Thread t : threads) {
+      t.join();
+    }
+    Assert.assertEquals(1, successCount.get());
+
+    successCount.set(0);
+    for (int i = 0; i < 10; ++i) {
+      Thread t =
+          new Thread(
+              () -> {
+                try (SyncConfigNodeIServiceClient client =
+                    (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+                  TSStatus status = client.dropPipe("p1");
+                  if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+                    successCount.updateAndGet(v -> v + 1);
+                  }
+                } catch (Exception e) {
+                  e.printStackTrace();
+                  fail(e.getMessage());
+                }
+              });
+      t.start();
+      threads.add(t);
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+
+    Assert.assertEquals(10, successCount.get());
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(0, showPipeResult.size());
+    }
+  }
+
+  @Test
+  public void testCreate10PipesWithSameConnector() throws Exception {
+    testCreatePipesWithSameConnector(10);
+  }
+
+  @Test
+  public void testCreate50PipesWithSameConnector() throws Exception {
+    testCreatePipesWithSameConnector(50);
+  }
+
+  @Test
+  public void testCreate100PipesWithSameConnector() throws Exception {
+    testCreatePipesWithSameConnector(100);
+  }
+
+  private void testCreatePipesWithSameConnector(int pipeCount) throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    Map<String, String> extractorAttributes = new HashMap<>();
+    Map<String, String> processorAttributes = new HashMap<>();
+    Map<String, String> connectorAttributes = new HashMap<>();
+
+    connectorAttributes.put("connector", "iotdb-thrift-connector");
+    connectorAttributes.put("connector.batch.enable", "false");
+    connectorAttributes.put("connector.ip", receiverIp);
+    connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+    List<Thread> threads = new ArrayList<>();
+    for (int i = 0; i < pipeCount; ++i) {
+      int finalI = i;
+      Thread t =
+          new Thread(
+              () -> {
+                try (SyncConfigNodeIServiceClient client =
+                    (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+                  TSStatus status =
+                      client.createPipe(
+                          new TCreatePipeReq("p" + finalI, connectorAttributes)
+                              .setExtractorAttributes(extractorAttributes)
+                              .setProcessorAttributes(processorAttributes));
+                  Assert.assertEquals(
+                      TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+                } catch (Exception e) {
+                  e.printStackTrace();
+                  fail(e.getMessage());
+                }
+              });
+      t.start();
+      threads.add(t);
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(pipeCount, showPipeResult.size());
+      showPipeResult =
+          client.showPipe(new TShowPipeReq().setPipeName("p1").setWhereClause(true)).pipeInfoList;
+      Assert.assertEquals(pipeCount, showPipeResult.size());
+    }
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index a906cf61abf..e0c789b4f9d 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.MultiEnvFactory;
-import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
@@ -38,17 +37,11 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
@@ -86,13 +79,8 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -116,37 +104,31 @@ public class IoTDBPipeLifeCycleIT {
 
       Set<String> expectedResSet = new HashSet<>();
       expectedResSet.add("1,1.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
 
       expectedResSet.add("2,2.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
 
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
 
       expectedResSet.add("3,3.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
     }
   }
 
@@ -160,14 +142,9 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
-        statement.execute("flush");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -194,30 +171,22 @@ public class IoTDBPipeLifeCycleIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
 
       Set<String> expectedResSet = new HashSet<>();
       expectedResSet.add("2,2.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
 
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
     }
   }
 
@@ -231,13 +200,8 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -263,31 +227,24 @@ public class IoTDBPipeLifeCycleIT {
 
       Set<String> expectedResSet = new HashSet<>();
       expectedResSet.add("1,1.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
 
       expectedResSet.add("2,2.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
 
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
     }
   }
 
@@ -301,14 +258,8 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
-        statement.execute("flush");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -334,33 +285,24 @@ public class IoTDBPipeLifeCycleIT {
 
       Set<String> expectedResSet = new HashSet<>();
       expectedResSet.add("1,1.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
-        statement.execute("flush");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
 
       expectedResSet.add("2,2.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
-        statement.execute("flush");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
 
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
     }
   }
 
@@ -374,13 +316,8 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -406,31 +343,24 @@ public class IoTDBPipeLifeCycleIT {
 
       Set<String> expectedResSet = new HashSet<>();
       expectedResSet.add("1,1.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
 
       expectedResSet.add("2,2.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
 
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
     }
   }
 
@@ -445,13 +375,8 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -474,73 +399,247 @@ public class IoTDBPipeLifeCycleIT {
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
 
       expectedResSet.add("1,1.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
 
       expectedResSet.add("2,2.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
     }
 
-    restartCluster(senderEnv);
-    restartCluster(receiverEnv);
+    TestUtils.restartCluster(senderEnv);
+    TestUtils.restartCluster(receiverEnv);
 
     try (SyncConfigNodeIServiceClient ignored =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
 
       expectedResSet.add("3,3.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
     }
   }
 
-  private void assertDataOnReceiver(BaseEnv receiverEnv, Set<String> expectedResSet) {
-    try (Connection connection = receiverEnv.getConnection();
-        Statement statement = connection.createStatement()) {
-      await()
-          .atMost(600, TimeUnit.SECONDS)
-          .untilAsserted(
-              () ->
-                  TestUtils.assertResultSetEqual(
-                      statement.executeQuery("select * from root.**"),
-                      "Time,root.sg1.d1.at1,",
-                      expectedResSet));
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
+  @Test
+  public void testReceiverRestartWhenTransferring() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      Thread t =
+          new Thread(
+              () -> {
+                try {
+                  for (int i = 0; i < 100; ++i) {
+                    TestUtils.executeNonQueryWithRetry(
+                        senderEnv,
+                        String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+                    Thread.sleep(100);
+                  }
+                } catch (InterruptedException ignored) {
+                }
+              });
+      t.start();
+
+      TestUtils.restartCluster(receiverEnv);
+      t.join();
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("100,"));
     }
   }
 
-  private void restartCluster(BaseEnv env) {
-    for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) {
-      env.shutdownConfigNode(i);
+  @Test
+  public void testReceiverAlreadyHaveTimeSeries() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      TestUtils.executeNonQueryWithRetry(
+          receiverEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+
+      Thread.sleep(5000);
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
     }
-    for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) {
-      env.shutdownDataNode(i);
+  }
+
+  @Test
+  public void testDoubleLiving() throws Exception {
+    // Double living is two clusters with pipes connecting each other.
+    DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String senderIp = senderDataNode.getIp();
+    int senderPort = senderDataNode.getPort();
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    for (int i = 0; i < 100; ++i) {
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
     }
-    try {
-      TimeUnit.SECONDS.sleep(1);
-    } catch (InterruptedException ignored) {
+    TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
     }
-    for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) {
-      env.startConfigNode(i);
+    for (int i = 100; i < 200; ++i) {
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
     }
-    for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) {
-      env.startDataNode(i);
+
+    for (int i = 200; i < 300; ++i) {
+      TestUtils.executeNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+    }
+    TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) receiverEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", senderIp);
+      connectorAttributes.put("connector.port", Integer.toString(senderPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+    }
+    for (int i = 300; i < 400; ++i) {
+      TestUtils.executeNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+    }
+
+    Set<String> expectedResSet = new HashSet<>();
+    for (int i = 0; i < 400; ++i) {
+      expectedResSet.add(i + ",1.0,");
+    }
+
+    TestUtils.assertDataOnEnv(
+        senderEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
+    TestUtils.assertDataOnEnv(
+        receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
+
+    TestUtils.restartCluster(senderEnv);
+    TestUtils.restartCluster(receiverEnv);
+
+    for (int i = 400; i < 500; ++i) {
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+    }
+    TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+    for (int i = 500; i < 600; ++i) {
+      TestUtils.executeNonQueryWithRetry(
+          receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+    }
+    TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+
+    for (int i = 400; i < 600; ++i) {
+      expectedResSet.add(i + ",1.0,");
     }
-    ((AbstractEnv) env).testWorkingNoUnknown();
+    TestUtils.assertDataOnEnv(
+        senderEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
+    TestUtils.assertDataOnEnv(
+        receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
   }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
new file mode 100644
index 00000000000..c112aeb51b1
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+/** Test pipe's basic functionalities under multiple cluster and consensus protocol settings. */
+public class IoTDBPipeProtocolIT {
+
+  private BaseEnv senderEnv;
+  private BaseEnv receiverEnv;
+
+  @Before
+  public void setUp() throws Exception {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+  }
+
+  @After
+  public void tearDown() {
+    senderEnv.cleanClusterEnvironment();
+    receiverEnv.cleanClusterEnvironment();
+  }
+
+  private void innerSetUp(
+      String configNodeConsensus,
+      String schemaRegionConsensus,
+      String dataRegionConsensus,
+      int configNodesNum,
+      int dataNodesNum,
+      int schemaRegionReplicationFactor,
+      int dataRegionReplicationFactor) {
+    schemaRegionReplicationFactor = Math.min(schemaRegionReplicationFactor, dataNodesNum);
+    dataRegionReplicationFactor = Math.min(dataRegionReplicationFactor, dataNodesNum);
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(configNodeConsensus)
+        .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
+        .setDataRegionConsensusProtocolClass(dataRegionConsensus)
+        .setSchemaReplicationFactor(schemaRegionReplicationFactor)
+        .setDataReplicationFactor(dataRegionReplicationFactor);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(configNodeConsensus)
+        .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
+        .setDataRegionConsensusProtocolClass(dataRegionConsensus)
+        .setSchemaReplicationFactor(schemaRegionReplicationFactor)
+        .setDataReplicationFactor(dataRegionReplicationFactor);
+
+    senderEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
+    receiverEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
+  }
+
+  @Test
+  public void test1C1DWithRatisRatisIot() throws Exception {
+    innerSetUp(
+        ConsensusFactory.RATIS_CONSENSUS,
+        ConsensusFactory.RATIS_CONSENSUS,
+        ConsensusFactory.IOT_CONSENSUS,
+        1,
+        1,
+        1,
+        1);
+    doTest();
+  }
+
+  @Test
+  public void test1C1DWithSimpleSimpleIot() throws Exception {
+    innerSetUp(
+        ConsensusFactory.SIMPLE_CONSENSUS,
+        ConsensusFactory.SIMPLE_CONSENSUS,
+        ConsensusFactory.IOT_CONSENSUS,
+        1,
+        1,
+        1,
+        1);
+    doTest();
+  }
+
+  @Test
+  public void test1C1DWithRatisRatisSimple() throws Exception {
+    innerSetUp(
+        ConsensusFactory.RATIS_CONSENSUS,
+        ConsensusFactory.RATIS_CONSENSUS,
+        ConsensusFactory.SIMPLE_CONSENSUS,
+        1,
+        1,
+        1,
+        1);
+    doTest();
+  }
+
+  @Test
+  public void test3C3DWith3SchemaRegionFactor3DataRegionFactor() throws Exception {
+    innerSetUp(
+        ConsensusFactory.RATIS_CONSENSUS,
+        ConsensusFactory.RATIS_CONSENSUS,
+        ConsensusFactory.IOT_CONSENSUS,
+        3,
+        3,
+        3,
+        3);
+    doTest();
+  }
+
+  @Test
+  public void test3C3DWith3SchemaRegionFactor2DataRegionFactor() throws Exception {
+    innerSetUp(
+        ConsensusFactory.RATIS_CONSENSUS,
+        ConsensusFactory.RATIS_CONSENSUS,
+        ConsensusFactory.IOT_CONSENSUS,
+        3,
+        3,
+        3,
+        2);
+    doTest();
+  }
+
+  @Test
+  public void testPipeOnBothSenderAndReceiver() throws Exception {
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setSchemaReplicationFactor(3)
+        .setDataReplicationFactor(2);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setSchemaReplicationFactor(1)
+        .setDataReplicationFactor(1);
+
+    senderEnv.initClusterEnvironment(3, 3);
+    receiverEnv.initClusterEnvironment(1, 1);
+
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("1,"));
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
+    }
+
+    DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
+    String senderIp = senderDataNode.getIp();
+    int senderPort = senderDataNode.getPort();
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) receiverEnv.getLeaderConfigNodeConnection()) {
+
+      TestUtils.executeNonQueryWithRetry(
+          receiverEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", senderIp);
+      connectorAttributes.put("connector.port", Integer.toString(senderPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.assertDataOnEnv(
+          senderEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+    }
+  }
+
+  private void doTest() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("1,"));
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+
+      Thread.sleep(5000);
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+    }
+  }
+
+  @Test
+  public void testSyncConnectorUseNodeUrls() throws Exception {
+    doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
+  }
+
+  @Test
+  public void testAsyncConnectorUseNodeUrls() throws Exception {
+    doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
+  }
+
+  @Test
+  public void testAirGapConnectorUseNodeUrls() throws Exception {
+    doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
+  }
+
+  private void doTestUseNodeUrls(String connectorName) throws Exception {
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setPipeAirGapReceiverEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setSchemaReplicationFactor(1)
+        .setDataReplicationFactor(1);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setPipeAirGapReceiverEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setSchemaReplicationFactor(3)
+        .setDataReplicationFactor(2);
+
+    senderEnv.initClusterEnvironment(1, 1);
+    receiverEnv.initClusterEnvironment(1, 3);
+
+    StringBuilder nodeUrlsBuilder = new StringBuilder();
+    for (DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) {
+      if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
+        // use default port for convenience
+        nodeUrlsBuilder
+            .append(wrapper.getIp())
+            .append(":")
+            .append(wrapper.getPipeAirGapReceiverPort())
+            .append(",");
+      } else {
+        nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
+      }
+    }
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", connectorName);
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.node-urls", nodeUrlsBuilder.toString());
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      System.out.println(status.getMessage());
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+    }
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
index b4c9e5c53fa..5849c40dc31 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -38,15 +39,10 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.fail;
-
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
 public class IoTDBPipeSwitchStatusIT {
@@ -216,13 +212,8 @@ public class IoTDBPipeSwitchStatusIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -245,13 +236,7 @@ public class IoTDBPipeSwitchStatusIT {
       Assert.assertTrue(
           showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("RUNNING")));
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("drop database root.**");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(senderEnv, "drop database root.**");
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p1").getCode());
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
index 2c2ac032526..e127225e11e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
@@ -167,13 +167,8 @@ public class IoTDBPipeExtractorIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
       // insert data to create data region
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.sg1.d1(time, at1) values (1, 1)");
 
       String formatString =
           String.format(
@@ -214,33 +209,33 @@ public class IoTDBPipeExtractorIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.nonAligned.1TS (time, s_float) values (now(), 0.5)");
-        statement.execute("insert into root.nonAligned.100TS (time, s_float) values (now(), 0.5)");
-        statement.execute("insert into root.nonAligned.1000TS (time, s_float) values (now(), 0.5)");
-        statement.execute(
-            "insert into root.nonAligned.`1(TS)` (time, s_float) values (now(), 0.5)");
-        statement.execute(
-            "insert into root.nonAligned.6TS.`6` ("
-                + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) "
-                + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)");
-        statement.execute(
-            "insert into root.aligned.1TS (time, s_float) aligned values (now(), 0.5)");
-        statement.execute(
-            "insert into root.aligned.100TS (time, s_float) aligned values (now(), 0.5)");
-        statement.execute(
-            "insert into root.aligned.1000TS (time, s_float) aligned values (now(), 0.5)");
-        statement.execute(
-            "insert into root.aligned.`1(TS)` (time, s_float) aligned values (now(), 0.5)");
-        statement.execute(
-            "insert into root.aligned.6TS.`6` ("
-                + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) "
-                + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.nonAligned.1TS (time, s_float) values (now(), 0.5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.nonAligned.100TS (time, s_float) values (now(), 0.5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.nonAligned.1000TS (time, s_float) values (now(), 0.5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.nonAligned.`1(TS)` (time, s_float) values (now(), 0.5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv,
+          "insert into root.nonAligned.6TS.`6` ("
+              + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) "
+              + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.aligned.1TS (time, s_float) aligned values (now(), 0.5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.aligned.100TS (time, s_float) aligned values (now(), 0.5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.aligned.1000TS (time, s_float) aligned values (now(), 0.5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv,
+          "insert into root.aligned.`1(TS)` (time, s_float) aligned values (now(), 0.5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv,
+          "insert into root.aligned.6TS.`6` ("
+              + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) "
+              + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -343,15 +338,11 @@ public class IoTDBPipeExtractorIT {
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
       assertTimeseriesCountOnReceiver(receiverEnv, 0);
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.db1.d1 (time, at1) values (1, 10)");
-        statement.execute("insert into root.db2.d1 (time, at1) values (1, 20)");
-        statement.execute("flush");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db1.d1 (time, at1) values (1, 10)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db2.d1 (time, at1) values (1, 20)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
       extractorAttributes.replace("extractor.pattern", "root.db2");
       status =
@@ -369,15 +360,11 @@ public class IoTDBPipeExtractorIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p2").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.db1.d1 (time, at1) values (2, 11)");
-        statement.execute("insert into root.db2.d1 (time, at1) values (2, 21)");
-        statement.execute("flush");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db1.d1 (time, at1) values (2, 11)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db2.d1 (time, at1) values (2, 21)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
       extractorAttributes.remove("extractor.pattern"); // no pattern, will match all databases
       status =
@@ -415,17 +402,15 @@ public class IoTDBPipeExtractorIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.db.d1 (time, at1) values (1, 10)");
-        statement.execute("insert into root.db.d2 (time, at1) values (1, 20)");
-        statement.execute("insert into root.db.d3 (time, at1) values (1, 30)");
-        statement.execute("insert into root.db.d4 (time, at1) values (1, 40)");
-        statement.execute("flush");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1 (time, at1) values (1, 10)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d2 (time, at1) values (1, 20)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d3 (time, at1) values (1, 30)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d4 (time, at1) values (1, 40)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
@@ -483,16 +468,14 @@ public class IoTDBPipeExtractorIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p4").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.db.d1 (time, at1) values (2, 11)");
-        statement.execute("insert into root.db.d2 (time, at1) values (2, 21)");
-        statement.execute("insert into root.db.d3 (time, at1) values (2, 31)");
-        statement.execute("insert into root.db.d4 (time, at1) values (2, 41)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1 (time, at1) values (2, 11)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d2 (time, at1) values (2, 21)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d3 (time, at1) values (2, 31)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "insert into root.db.d4 (time, at1) values (2, 41)");
 
       try (Connection connection = receiverEnv.getConnection();
           Statement statement = connection.createStatement()) {
@@ -528,19 +511,15 @@ public class IoTDBPipeExtractorIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute(
-            "insert into root.db.d1 (time, at1)"
-                + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)");
-        statement.execute(
-            "insert into root.db.d2 (time, at1)"
-                + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)");
-        statement.execute("flush");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv,
+          "insert into root.db.d1 (time, at1)"
+              + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)");
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv,
+          "insert into root.db.d2 (time, at1)"
+              + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)");
+      TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
 
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();