You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/04/09 08:43:48 UTC
(iotdb) branch load--1-time-partition updated: Update IoTDBPipeClusterIT.java
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch load--1-time-partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load--1-time-partition by this push:
new 96c84365e56 Update IoTDBPipeClusterIT.java
96c84365e56 is described below
commit 96c84365e569d93ad03a67cb28efd1831f8660e9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 9 16:43:40 2024 +0800
Update IoTDBPipeClusterIT.java
---
.../pipe/it/autocreate/IoTDBPipeClusterIT.java | 63 ++++++++++++++++++++++
1 file changed, 63 insertions(+)
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index e0194f8c591..9113e3669ba 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -862,4 +862,67 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualAutoIT {
Assert.assertEquals(pipeCount, showPipeResult.size());
}
}
+
+ @Test
+ public void testNegativeTimestamp() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (0, 1)",
+ "insert into root.db.d1(time, s1) values (-1, 2)",
+ "insert into root.db.d1(time, s1) values (1960-01-02T10:00:00+08:00, 2)",
+ "flush"))) {
+ return;
+ }
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor", "iotdb-extractor");
+
+ processorAttributes.put("processor", "do-nothing-processor");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("3,"));
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (-123, 3)",
+ "insert into root.db.d1(time, s1) values (now(), 3)",
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.**",
+ "count(root.db.d1.s1),",
+ Collections.singleton("5,"));
+ }
+ }
}