You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/25 11:51:35 UTC
[iotdb] branch master updated: Pipe: Add IT for different cluster config and consensus protocols (#11097)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5c7718ea8fa Pipe: Add IT for different cluster config and consensus protocols (#11097)
5c7718ea8fa is described below
commit 5c7718ea8fa65fa6495f204f56f02c021ea5deca
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Mon Sep 25 19:51:28 2023 +0800
Pipe: Add IT for different cluster config and consensus protocols (#11097)
---
.../iotdb/it/env/cluster/ClusterConstant.java | 2 +
.../it/env/cluster/config/MppCommonConfig.java | 6 +
.../env/cluster/config/MppSharedCommonConfig.java | 7 +
.../iotdb/it/env/cluster/env/AbstractEnv.java | 10 +-
.../iotdb/it/env/cluster/env/Cluster1Env.java | 6 +
.../iotdb/it/env/cluster/env/MultiClusterEnv.java | 6 +
.../apache/iotdb/it/env/cluster/env/SimpleEnv.java | 6 +
.../iotdb/it/env/cluster/node/DataNodeWrapper.java | 9 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../iotdb/it/env/remote/env/RemoteServerEnv.java | 6 +
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 9 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../org/apache/iotdb/db/it/utils/TestUtils.java | 98 +++
.../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java | 766 +++++++++++++++++++++
.../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 469 ++++++++-----
.../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java | 415 +++++++++++
.../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java | 23 +-
.../pipe/it/extractor/IoTDBPipeExtractorIT.java | 151 ++--
18 files changed, 1704 insertions(+), 292 deletions(-)
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 2bf3161aabb..1ce6143887c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -165,6 +165,8 @@ public class ClusterConstant {
public static final String DN_MPP_DATA_EXCHANGE_PORT = "dn_mpp_data_exchange_port";
public static final String DN_DATA_REGION_CONSENSUS_PORT = "dn_data_region_consensus_port";
public static final String DN_SCHEMA_REGION_CONSENSUS_PORT = "dn_schema_region_consensus_port";
+ public static final String PIPE_AIR_GAP_RECEIVER_ENABLED = "pipe_air_gap_receiver_enabled";
+ public static final String PIPE_AIR_GAP_RECEIVER_PORT = "pipe_air_gap_receiver_port";
public static final String MAX_TSBLOCK_SIZE_IN_BYTES = "max_tsblock_size_in_bytes";
public static final String PAGE_SIZE_IN_BYTE = "page_size_in_byte";
public static final String DN_JOIN_CLUSTER_RETRY_INTERVAL_MS =
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index ecf17099ba5..02f89b74652 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -379,6 +379,12 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled) {
+ setProperty("pipe_air_gap_receiver_enabled", String.valueOf(isPipeAirGapReceiverEnabled));
+ return this;
+ }
+
// For part of the log directory
public String getClusterConfigStr() {
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 2e577044749..27d973311e5 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -385,4 +385,11 @@ public class MppSharedCommonConfig implements CommonConfig {
cnConfig.setSchemaRegionPerDataNode(schemaRegionPerDataNode);
return this;
}
+
+ @Override
+ public CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled) {
+ dnConfig.setPipeAirGapReceiverEnabled(isPipeAirGapReceiverEnabled);
+ cnConfig.setPipeAirGapReceiverEnabled(isPipeAirGapReceiverEnabled);
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 9d077b46f7a..95a55a5108a 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -95,6 +95,7 @@ public abstract class AbstractEnv implements BaseEnv {
protected String testMethodName = null;
protected int index = 0;
protected long startTime;
+ protected int testWorkingRetryCount = 30;
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
@@ -122,6 +123,11 @@ public abstract class AbstractEnv implements BaseEnv {
}
protected void initEnvironment(int configNodesNum, int dataNodesNum) {
+ initEnvironment(configNodesNum, dataNodesNum, 30);
+ }
+
+ protected void initEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+ this.testWorkingRetryCount = testWorkingRetryCount;
this.configNodeWrapperList = new ArrayList<>();
this.dataNodeWrapperList = new ArrayList<>();
@@ -275,7 +281,7 @@ public abstract class AbstractEnv implements BaseEnv {
testDelegate.addRequest(
() -> {
Exception lastException = null;
- for (int i = 0; i < 30; i++) {
+ for (int i = 0; i < testWorkingRetryCount; i++) {
try (Connection ignored = getConnection(dataNodeEndpoint, PROBE_TIMEOUT_MS)) {
logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint);
return null;
@@ -296,7 +302,7 @@ public abstract class AbstractEnv implements BaseEnv {
logger.info("Start cluster costs: {}s", (System.currentTimeMillis() - startTime) / 1000.0);
} catch (Exception e) {
logger.error("exception in testWorking of ClusterID, message: {}", e.getMessage(), e);
- fail("After 30 times retry, the cluster can't work!");
+ fail(String.format("After %d times retry, the cluster can't work!", testWorkingRetryCount));
}
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/Cluster1Env.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/Cluster1Env.java
index 52888c744e4..20f22e0b5ad 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/Cluster1Env.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/Cluster1Env.java
@@ -34,4 +34,10 @@ public class Cluster1Env extends AbstractEnv {
public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
super.initEnvironment(configNodesNum, dataNodesNum);
}
+
+ @Override
+ public void initClusterEnvironment(
+ int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+ super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
index c6f907633ac..8eecd4ea00d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
@@ -40,4 +40,10 @@ public class MultiClusterEnv extends AbstractEnv {
public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
super.initEnvironment(configNodesNum, dataNodesNum);
}
+
+ @Override
+ public void initClusterEnvironment(
+ int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+ super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/SimpleEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/SimpleEnv.java
index f11ec341fce..eb9a1828a73 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/SimpleEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/SimpleEnv.java
@@ -30,4 +30,10 @@ public class SimpleEnv extends AbstractEnv {
public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
super.initEnvironment(configNodesNum, dataNodesNum);
}
+
+ @Override
+ public void initClusterEnvironment(
+ int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+ super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index fa99044667a..ebd9934a16d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -59,6 +59,7 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.MAX_TSBLOCK_SIZE_I
import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_HOST;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_PORT;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.PAGE_SIZE_IN_BYTE;
+import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_AIR_GAP_RECEIVER_PORT;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCHEMA_REPLICATION_FACTOR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SYSTEM_PROPERTIES_FILE;
@@ -72,6 +73,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
private final int dataRegionConsensusPort;
private final int schemaRegionConsensusPort;
private final int mqttPort;
+ private final int pipeAirGapReceiverPort;
private final String defaultNodePropertiesFile;
@@ -92,6 +94,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
this.dataRegionConsensusPort = portList[3];
this.schemaRegionConsensusPort = portList[4];
this.mqttPort = portList[5];
+ this.pipeAirGapReceiverPort = portList[6];
this.defaultNodePropertiesFile =
EnvUtils.getFilePathFromSysVar(DEFAULT_DATA_NODE_PROPERTIES, clusterIndex);
this.defaultCommonPropertiesFile =
@@ -103,6 +106,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
// Override mqtt properties of super class
immutableCommonProperties.setProperty(MQTT_HOST, super.getIp());
immutableCommonProperties.setProperty(MQTT_PORT, String.valueOf(this.mqttPort));
+ immutableCommonProperties.setProperty(
+ PIPE_AIR_GAP_RECEIVER_PORT, String.valueOf(this.pipeAirGapReceiverPort));
immutableNodeProperties.setProperty(IoTDBConstant.DN_TARGET_CONFIG_NODE_LIST, targetConfigNode);
immutableNodeProperties.setProperty(DN_SYSTEM_DIR, MppBaseConfig.NULL_VALUE);
@@ -253,4 +258,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
public int getMqttPort() {
return mqttPort;
}
+
+ public int getPipeAirGapReceiverPort() {
+ return pipeAirGapReceiverPort;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 7e8069a74e7..81d1bb8d1f2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -275,4 +275,9 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) {
return this;
}
+
+ @Override
+ public CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled) {
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 687e89e1614..e25e97835f5 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -81,6 +81,12 @@ public class RemoteServerEnv implements BaseEnv {
initClusterEnvironment();
}
+ @Override
+ public void initClusterEnvironment(
+ int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+ initClusterEnvironment();
+ }
+
@Override
public void cleanClusterEnvironment() {
clientManager.close();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 1a38310335c..5e3f18f450f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -47,6 +47,15 @@ public interface BaseEnv {
*/
void initClusterEnvironment(int configNodesNum, int dataNodesNum);
+ /**
+ * Init a cluster with the specified number of ConfigNodes and DataNodes.
+ *
+ * @param configNodesNum the number of ConfigNodes.
+ * @param dataNodesNum the number of DataNodes.
+ * @param testWorkingRetryCount the retry count when testing the availability of cluster
+ */
+ void initClusterEnvironment(int configNodesNum, int dataNodesNum, int testWorkingRetryCount);
+
/** Destroy the cluster and all the configurations. */
void cleanClusterEnvironment();
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 943ab1d7c24..61ab834539e 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -123,4 +123,6 @@ public interface CommonConfig {
CommonConfig setDataRegionPerDataNode(double dataRegionPerDataNode);
CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode);
+
+ CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled);
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 0b6f6be0dac..4b9be28d4a1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.db.it.utils;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -39,10 +42,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -338,6 +343,47 @@ public class TestUtils {
}
}
+ public static void executeNonQueryWithRetry(BaseEnv env, String sql) {
+ for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+ try (Connection connection = env.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ break;
+ } catch (SQLException e) {
+ if (retryCountLeft > 0) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignored) {
+ }
+ } else {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+ }
+
+ public static void executeNonQueryOnSpecifiedDataNodeWithRetry(
+ BaseEnv env, DataNodeWrapper wrapper, String sql) {
+ for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+ try (Connection connection = env.getConnectionWithSpecifiedDataNode(wrapper);
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ break;
+ } catch (SQLException e) {
+ if (retryCountLeft > 0) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignored) {
+ }
+ } else {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+ }
+
public static void executeQuery(String sql) {
executeQuery(sql, "root", "root");
}
@@ -352,6 +398,25 @@ public class TestUtils {
}
}
+ public static ResultSet executeQueryWithRetry(Statement statement, String sql) {
+ for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+ try {
+ return statement.executeQuery(sql);
+ } catch (SQLException e) {
+ if (retryCountLeft > 0) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignored) {
+ }
+ } else {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+ return null;
+ }
+
public static void assertResultSetEqual(
SessionDataSet actualResultSet,
List<String> expectedColumnNames,
@@ -420,4 +485,37 @@ public class TestUtils {
fail(e.getMessage());
}
}
+
+ public static void restartCluster(BaseEnv env) throws Exception {
+ for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) {
+ env.shutdownConfigNode(i);
+ }
+ for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) {
+ env.shutdownDataNode(i);
+ }
+ TimeUnit.SECONDS.sleep(1);
+ for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) {
+ env.startConfigNode(i);
+ }
+ for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) {
+ env.startDataNode(i);
+ }
+ ((AbstractEnv) env).testWorkingNoUnknown();
+ }
+
+ public static void assertDataOnEnv(
+ BaseEnv env, String sql, String expectedHeader, Set<String> expectedResSet) {
+ try (Connection connection = env.getConnection();
+ Statement statement = connection.createStatement()) {
+ await()
+ .atMost(600, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ TestUtils.assertResultSetEqual(
+ executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
new file mode 100644
index 00000000000..5792ea3598c
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+public class IoTDBPipeClusterIT {
+
+ private BaseEnv senderEnv;
+ private BaseEnv receiverEnv;
+
+ @Before
+ public void setUp() throws Exception {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+
+ senderEnv.initClusterEnvironment(3, 3, 180);
+ receiverEnv.initClusterEnvironment(3, 3, 180);
+ }
+
+ @After
+ public void tearDown() {
+ senderEnv.cleanClusterEnvironment();
+ receiverEnv.cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testWithAllParametersInLogMode() throws Exception {
+ testWithAllParameters("log");
+ }
+
+ @Test
+ public void testWithAllParametersInFileMode() throws Exception {
+ testWithAllParameters("file");
+ }
+
+ @Test
+ public void testWithAllParametersInHybridMode() throws Exception {
+ testWithAllParameters("hybrid");
+ }
+
+ public void testWithAllParameters(String realtimeMode) throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2010-01-01T10:00:00+08:00, 1)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2010-01-02T10:00:00+08:00, 2)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor", "iotdb-extractor");
+ extractorAttributes.put("extractor.pattern", "root.db.d1");
+ extractorAttributes.put("extractor.history.enable", "true");
+ extractorAttributes.put("extractor.history.start-time", "2010-01-01T08:00:00+08:00");
+ extractorAttributes.put("extractor.history.end-time", "2010-01-02T08:00:00+08:00");
+ extractorAttributes.put("extractor.realtime.enable", "true");
+ extractorAttributes.put("extractor.realtime.mode", realtimeMode);
+
+ processorAttributes.put("processor", "do-nothing-processor");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+ connectorAttributes.put("connector.user", "root");
+ connectorAttributes.put("connector.password", "root");
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("1,"));
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+ }
+ }
+
+ @Test
+ public void testPipeAfterDataRegionLeaderStop() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.pattern", "root.db.d1");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ AtomicInteger leaderPort = new AtomicInteger(-1);
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ showRegionResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo -> {
+ if (RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ leaderPort.set(regionInfo.getClientRpcPort());
+ }
+ });
+
+ int leaderIndex = -1;
+ for (int i = 0; i < 3; ++i) {
+ if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
+ leaderIndex = i;
+ senderEnv.shutdownDataNode(i);
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ignored) {
+ }
+ senderEnv.startDataNode(i);
+ ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+ }
+ }
+ if (leaderIndex == -1) { // ensure the leader is stopped
+ fail();
+ }
+
+ TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
+ senderEnv,
+ senderEnv.getDataNodeWrapper(leaderIndex),
+ "insert into root.db.d1(time, s1) values (2, 2)");
+ TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
+ senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush");
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.db.d1",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+ }
+
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ // create a new pipe and write new data
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.pattern", "root.db.d2");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p2", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode());
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d2(time, s1) values (1, 1)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.db.d2",
+ "count(root.db.d2.s1),",
+ Collections.singleton("1,"));
+ }
+ }
+
+ @Test
+ public void testPipeAfterRegisterNewDataNode() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.pattern", "root.db.d1");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ senderEnv.registerNewDataNode(true);
+ DataNodeWrapper newDataNode =
+ senderEnv.getDataNodeWrapper(senderEnv.getDataNodeWrapperList().size() - 1);
+ TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(
+ senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2, 2)");
+ TestUtils.executeNonQueryOnSpecifiedDataNodeWithRetry(senderEnv, newDataNode, "flush");
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.db.d1",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+ }
+
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ // create a new pipe and write new data
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.pattern", "root.db.d2");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p2", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p2").getCode());
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d2(time, s1) values (1, 1)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.db.d2",
+ "count(root.db.d2.s1),",
+ Collections.singleton("1,"));
+ }
+ }
+
+ @Test
+ public void testCreatePipeWhenRegisteringNewDataNode() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ Thread t =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 30; ++i) {
+ try {
+ client.createPipe(
+ new TCreatePipeReq("p" + i, connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ } catch (TException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ try {
+ Thread.sleep(100);
+ } catch (Exception ignored) {
+ }
+ }
+ });
+ t.start();
+ senderEnv.registerNewDataNode(true);
+ t.join();
+ }
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(30, showPipeResult.size());
+ }
+ }
+
+ @Test
+ public void testRegisteringNewDataNodeWhenTransferringData() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ Thread t =
+ new Thread(
+ () -> {
+ try {
+ for (int i = 0; i < 100; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException ignored) {
+ }
+ });
+ t.start();
+ senderEnv.registerNewDataNode(true);
+ t.join();
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.db.d1",
+ "count(root.db.d1.s1),",
+ Collections.singleton("100,"));
+
+ senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
+ senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
+ }
+ }
+
+ @Test
+ public void testRegisteringNewDataNodeAfterTransferringData() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ for (int i = 0; i < 100; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ }
+
+ senderEnv.registerNewDataNode(true);
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.db.d1",
+ "count(root.db.d1.s1),",
+ Collections.singleton("100,"));
+
+ senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
+ senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
+ }
+ }
+
+ @Test
+ public void testNewDataNodeFailureAfterTransferringData() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ for (int i = 0; i < 100; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i * 1000));
+ }
+
+ senderEnv.registerNewDataNode(false);
+ senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
+ senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 1); // ctrl + c
+ senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size() - 1);
+ ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.db.d1",
+ "count(root.db.d1.s1),",
+ Collections.singleton("100,"));
+
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ }
+ }
+
+ @Test
+ public void testSenderRestartWhenTransferring() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+ }
+
+ for (int i = 0; i < 100; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i * 1000));
+ }
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ TestUtils.restartCluster(senderEnv);
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("100,"));
+ }
+
+ @Test
+ public void testConcurrentlyCreatePipeOfSameName() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ AtomicInteger successCount = new AtomicInteger(0);
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ Thread t =
+ new Thread(
+ () -> {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ successCount.updateAndGet(v -> v + 1);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ });
+ t.start();
+ threads.add(t);
+ }
+
+ for (Thread t : threads) {
+ t.join();
+ }
+ Assert.assertEquals(1, successCount.get());
+
+ successCount.set(0);
+ for (int i = 0; i < 10; ++i) {
+ Thread t =
+ new Thread(
+ () -> {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ TSStatus status = client.dropPipe("p1");
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ successCount.updateAndGet(v -> v + 1);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ });
+ t.start();
+ threads.add(t);
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ Assert.assertEquals(10, successCount.get());
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(0, showPipeResult.size());
+ }
+ }
+
+ @Test
+ public void testCreate10PipesWithSameConnector() throws Exception {
+ testCreatePipesWithSameConnector(10);
+ }
+
+ @Test
+ public void testCreate50PipesWithSameConnector() throws Exception {
+ testCreatePipesWithSameConnector(50);
+ }
+
+ @Test
+ public void testCreate100PipesWithSameConnector() throws Exception {
+ testCreatePipesWithSameConnector(100);
+ }
+
+ private void testCreatePipesWithSameConnector(int pipeCount) throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < pipeCount; ++i) {
+ int finalI = i;
+ Thread t =
+ new Thread(
+ () -> {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p" + finalI, connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ });
+ t.start();
+ threads.add(t);
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(pipeCount, showPipeResult.size());
+ showPipeResult =
+ client.showPipe(new TShowPipeReq().setPipeName("p1").setWhereClause(true)).pipeInfoList;
+ Assert.assertEquals(pipeCount, showPipeResult.size());
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index a906cf61abf..e0c789b4f9d 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
-import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
@@ -38,17 +37,11 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
@@ -86,13 +79,8 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -116,37 +104,31 @@ public class IoTDBPipeLifeCycleIT {
Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
expectedResSet.add("2,2.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
expectedResSet.add("3,3.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
}
@@ -160,14 +142,9 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
- statement.execute("flush");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -194,30 +171,22 @@ public class IoTDBPipeLifeCycleIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("2,2.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
}
@@ -231,13 +200,8 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -263,31 +227,24 @@ public class IoTDBPipeLifeCycleIT {
Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
expectedResSet.add("2,2.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
}
@@ -301,14 +258,8 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
- statement.execute("flush");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -334,33 +285,24 @@ public class IoTDBPipeLifeCycleIT {
Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
- statement.execute("flush");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
expectedResSet.add("2,2.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
- statement.execute("flush");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
}
@@ -374,13 +316,8 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -406,31 +343,24 @@ public class IoTDBPipeLifeCycleIT {
Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
expectedResSet.add("2,2.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
}
@@ -445,13 +375,8 @@ public class IoTDBPipeLifeCycleIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -474,73 +399,247 @@ public class IoTDBPipeLifeCycleIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
expectedResSet.add("1,1.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (2, 2)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
expectedResSet.add("2,2.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
- restartCluster(senderEnv);
- restartCluster(receiverEnv);
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
try (SyncConfigNodeIServiceClient ignored =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (3, 3)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
expectedResSet.add("3,3.0,");
- assertDataOnReceiver(receiverEnv, expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
}
- private void assertDataOnReceiver(BaseEnv receiverEnv, Set<String> expectedResSet) {
- try (Connection connection = receiverEnv.getConnection();
- Statement statement = connection.createStatement()) {
- await()
- .atMost(600, TimeUnit.SECONDS)
- .untilAsserted(
- () ->
- TestUtils.assertResultSetEqual(
- statement.executeQuery("select * from root.**"),
- "Time,root.sg1.d1.at1,",
- expectedResSet));
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ @Test
+ public void testReceiverRestartWhenTransferring() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ Thread t =
+ new Thread(
+ () -> {
+ try {
+ for (int i = 0; i < 100; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException ignored) {
+ }
+ });
+ t.start();
+
+ TestUtils.restartCluster(receiverEnv);
+ t.join();
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("100,"));
}
}
- private void restartCluster(BaseEnv env) {
- for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) {
- env.shutdownConfigNode(i);
+ @Test
+ public void testReceiverAlreadyHaveTimeSeries() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ TestUtils.executeNonQueryWithRetry(
+ receiverEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+
+ Thread.sleep(5000);
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
}
- for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) {
- env.shutdownDataNode(i);
+ }
+
+ @Test
+ public void testDoubleLiving() throws Exception {
+ // Double living is two clusters with pipes connecting each other.
+ DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String senderIp = senderDataNode.getIp();
+ int senderPort = senderDataNode.getPort();
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ for (int i = 0; i < 100; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException ignored) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
}
- for (int i = 0; i < env.getConfigNodeWrapperList().size(); ++i) {
- env.startConfigNode(i);
+ for (int i = 100; i < 200; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
- for (int i = 0; i < env.getDataNodeWrapperList().size(); ++i) {
- env.startDataNode(i);
+
+ for (int i = 200; i < 300; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ }
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) receiverEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", senderIp);
+ connectorAttributes.put("connector.port", Integer.toString(senderPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+ }
+ for (int i = 300; i < 400; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ }
+
+ Set<String> expectedResSet = new HashSet<>();
+ for (int i = 0; i < 400; ++i) {
+ expectedResSet.add(i + ",1.0,");
+ }
+
+ TestUtils.assertDataOnEnv(
+ senderEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
+
+ TestUtils.restartCluster(senderEnv);
+ TestUtils.restartCluster(receiverEnv);
+
+ for (int i = 400; i < 500; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ }
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ for (int i = 500; i < 600; ++i) {
+ TestUtils.executeNonQueryWithRetry(
+ receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
+ }
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+
+ for (int i = 400; i < 600; ++i) {
+ expectedResSet.add(i + ",1.0,");
}
- ((AbstractEnv) env).testWorkingNoUnknown();
+ TestUtils.assertDataOnEnv(
+ senderEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", expectedResSet);
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
new file mode 100644
index 00000000000..c112aeb51b1
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+/** Test pipe's basic functionalities under multiple cluster and consensus protocol settings. */
+public class IoTDBPipeProtocolIT {
+
+ private BaseEnv senderEnv;
+ private BaseEnv receiverEnv;
+
+ @Before
+ public void setUp() throws Exception {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+ }
+
+ @After
+ public void tearDown() {
+ senderEnv.cleanClusterEnvironment();
+ receiverEnv.cleanClusterEnvironment();
+ }
+
+ private void innerSetUp(
+ String configNodeConsensus,
+ String schemaRegionConsensus,
+ String dataRegionConsensus,
+ int configNodesNum,
+ int dataNodesNum,
+ int schemaRegionReplicationFactor,
+ int dataRegionReplicationFactor) {
+ schemaRegionReplicationFactor = Math.min(schemaRegionReplicationFactor, dataNodesNum);
+ dataRegionReplicationFactor = Math.min(dataRegionReplicationFactor, dataNodesNum);
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(configNodeConsensus)
+ .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
+ .setDataRegionConsensusProtocolClass(dataRegionConsensus)
+ .setSchemaReplicationFactor(schemaRegionReplicationFactor)
+ .setDataReplicationFactor(dataRegionReplicationFactor);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(configNodeConsensus)
+ .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
+ .setDataRegionConsensusProtocolClass(dataRegionConsensus)
+ .setSchemaReplicationFactor(schemaRegionReplicationFactor)
+ .setDataReplicationFactor(dataRegionReplicationFactor);
+
+ senderEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
+ receiverEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
+ }
+
+ @Test
+ public void test1C1DWithRatisRatisIot() throws Exception {
+ innerSetUp(
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusFactory.IOT_CONSENSUS,
+ 1,
+ 1,
+ 1,
+ 1);
+ doTest();
+ }
+
+ @Test
+ public void test1C1DWithSimpleSimpleIot() throws Exception {
+ innerSetUp(
+ ConsensusFactory.SIMPLE_CONSENSUS,
+ ConsensusFactory.SIMPLE_CONSENSUS,
+ ConsensusFactory.IOT_CONSENSUS,
+ 1,
+ 1,
+ 1,
+ 1);
+ doTest();
+ }
+
+ @Test
+ public void test1C1DWithRatisRatisSimple() throws Exception {
+ innerSetUp(
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusFactory.SIMPLE_CONSENSUS,
+ 1,
+ 1,
+ 1,
+ 1);
+ doTest();
+ }
+
+ @Test
+ public void test3C3DWith3SchemaRegionFactor3DataRegionFactor() throws Exception {
+ innerSetUp(
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusFactory.IOT_CONSENSUS,
+ 3,
+ 3,
+ 3,
+ 3);
+ doTest();
+ }
+
+ @Test
+ public void test3C3DWith3SchemaRegionFactor2DataRegionFactor() throws Exception {
+ innerSetUp(
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusFactory.RATIS_CONSENSUS,
+ ConsensusFactory.IOT_CONSENSUS,
+ 3,
+ 3,
+ 3,
+ 2);
+ doTest();
+ }
+
+ @Test
+ public void testPipeOnBothSenderAndReceiver() throws Exception {
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(3)
+ .setDataReplicationFactor(2);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(1)
+ .setDataReplicationFactor(1);
+
+ senderEnv.initClusterEnvironment(3, 3);
+ receiverEnv.initClusterEnvironment(1, 1);
+
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("1,"));
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
+ }
+
+ DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
+ String senderIp = senderDataNode.getIp();
+ int senderPort = senderDataNode.getPort();
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) receiverEnv.getLeaderConfigNodeConnection()) {
+
+ TestUtils.executeNonQueryWithRetry(
+ receiverEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", senderIp);
+ connectorAttributes.put("connector.port", Integer.toString(senderPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.assertDataOnEnv(
+ senderEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+ }
+ }
+
+ private void doTest() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("1,"));
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.stopPipe("p1").getCode());
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (3, 3)");
+
+ Thread.sleep(5000);
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+ }
+ }
+
+ @Test
+ public void testSyncConnectorUseNodeUrls() throws Exception {
+ doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
+ }
+
+ @Test
+ public void testAsyncConnectorUseNodeUrls() throws Exception {
+ doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
+ }
+
+ @Test
+ public void testAirGapConnectorUseNodeUrls() throws Exception {
+ doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
+ }
+
+ private void doTestUseNodeUrls(String connectorName) throws Exception {
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setPipeAirGapReceiverEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(1)
+ .setDataReplicationFactor(1);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setPipeAirGapReceiverEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(3)
+ .setDataReplicationFactor(2);
+
+ senderEnv.initClusterEnvironment(1, 1);
+ receiverEnv.initClusterEnvironment(1, 3);
+
+ StringBuilder nodeUrlsBuilder = new StringBuilder();
+ for (DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) {
+ if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
+ // use default port for convenience
+ nodeUrlsBuilder
+ .append(wrapper.getIp())
+ .append(":")
+ .append(wrapper.getPipeAirGapReceiverPort())
+ .append(",");
+ } else {
+ nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
+ }
+ }
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", connectorName);
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.node-urls", nodeUrlsBuilder.toString());
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ System.out.println(status.getMessage());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (2, 2)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("2,"));
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
index b4c9e5c53fa..5849c40dc31 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -38,15 +39,10 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.fail;
-
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
public class IoTDBPipeSwitchStatusIT {
@@ -216,13 +212,8 @@ public class IoTDBPipeSwitchStatusIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1(time, s1) values (1, 1)");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -245,13 +236,7 @@ public class IoTDBPipeSwitchStatusIT {
Assert.assertTrue(
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && o.state.equals("RUNNING")));
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("drop database root.**");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(senderEnv, "drop database root.**");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p1").getCode());
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
index 2c2ac032526..e127225e11e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
@@ -167,13 +167,8 @@ public class IoTDBPipeExtractorIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
// insert data to create data region
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.sg1.d1(time, at1) values (1, 1)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.sg1.d1(time, at1) values (1, 1)");
String formatString =
String.format(
@@ -214,33 +209,33 @@ public class IoTDBPipeExtractorIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.nonAligned.1TS (time, s_float) values (now(), 0.5)");
- statement.execute("insert into root.nonAligned.100TS (time, s_float) values (now(), 0.5)");
- statement.execute("insert into root.nonAligned.1000TS (time, s_float) values (now(), 0.5)");
- statement.execute(
- "insert into root.nonAligned.`1(TS)` (time, s_float) values (now(), 0.5)");
- statement.execute(
- "insert into root.nonAligned.6TS.`6` ("
- + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) "
- + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)");
- statement.execute(
- "insert into root.aligned.1TS (time, s_float) aligned values (now(), 0.5)");
- statement.execute(
- "insert into root.aligned.100TS (time, s_float) aligned values (now(), 0.5)");
- statement.execute(
- "insert into root.aligned.1000TS (time, s_float) aligned values (now(), 0.5)");
- statement.execute(
- "insert into root.aligned.`1(TS)` (time, s_float) aligned values (now(), 0.5)");
- statement.execute(
- "insert into root.aligned.6TS.`6` ("
- + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) "
- + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.nonAligned.1TS (time, s_float) values (now(), 0.5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.nonAligned.100TS (time, s_float) values (now(), 0.5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.nonAligned.1000TS (time, s_float) values (now(), 0.5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.nonAligned.`1(TS)` (time, s_float) values (now(), 0.5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "insert into root.nonAligned.6TS.`6` ("
+ + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) "
+ + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.aligned.1TS (time, s_float) aligned values (now(), 0.5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.aligned.100TS (time, s_float) aligned values (now(), 0.5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.aligned.1000TS (time, s_float) aligned values (now(), 0.5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "insert into root.aligned.`1(TS)` (time, s_float) aligned values (now(), 0.5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "insert into root.aligned.6TS.`6` ("
+ + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, `s_text(1)`, `s_bool(1)`) "
+ + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -343,15 +338,11 @@ public class IoTDBPipeExtractorIT {
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
assertTimeseriesCountOnReceiver(receiverEnv, 0);
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.db1.d1 (time, at1) values (1, 10)");
- statement.execute("insert into root.db2.d1 (time, at1) values (1, 20)");
- statement.execute("flush");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db1.d1 (time, at1) values (1, 10)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db2.d1 (time, at1) values (1, 20)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
extractorAttributes.replace("extractor.pattern", "root.db2");
status =
@@ -369,15 +360,11 @@ public class IoTDBPipeExtractorIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("p2").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.db1.d1 (time, at1) values (2, 11)");
- statement.execute("insert into root.db2.d1 (time, at1) values (2, 21)");
- statement.execute("flush");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db1.d1 (time, at1) values (2, 11)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db2.d1 (time, at1) values (2, 21)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
extractorAttributes.remove("extractor.pattern"); // no pattern, will match all databases
status =
@@ -415,17 +402,15 @@ public class IoTDBPipeExtractorIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.db.d1 (time, at1) values (1, 10)");
- statement.execute("insert into root.db.d2 (time, at1) values (1, 20)");
- statement.execute("insert into root.db.d3 (time, at1) values (1, 30)");
- statement.execute("insert into root.db.d4 (time, at1) values (1, 40)");
- statement.execute("flush");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1 (time, at1) values (1, 10)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d2 (time, at1) values (1, 20)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d3 (time, at1) values (1, 30)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d4 (time, at1) values (1, 40)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
@@ -483,16 +468,14 @@ public class IoTDBPipeExtractorIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p4").getCode());
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("insert into root.db.d1 (time, at1) values (2, 11)");
- statement.execute("insert into root.db.d2 (time, at1) values (2, 21)");
- statement.execute("insert into root.db.d3 (time, at1) values (2, 31)");
- statement.execute("insert into root.db.d4 (time, at1) values (2, 41)");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d1 (time, at1) values (2, 11)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d2 (time, at1) values (2, 21)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d3 (time, at1) values (2, 31)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "insert into root.db.d4 (time, at1) values (2, 41)");
try (Connection connection = receiverEnv.getConnection();
Statement statement = connection.createStatement()) {
@@ -528,19 +511,15 @@ public class IoTDBPipeExtractorIT {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
- try (Connection connection = senderEnv.getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute(
- "insert into root.db.d1 (time, at1)"
- + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)");
- statement.execute(
- "insert into root.db.d2 (time, at1)"
- + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)");
- statement.execute("flush");
- } catch (SQLException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "insert into root.db.d1 (time, at1)"
+ + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)");
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv,
+ "insert into root.db.d2 (time, at1)"
+ + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)");
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();