You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2024/04/09 08:43:48 UTC

(iotdb) branch load--1-time-partition updated: Update IoTDBPipeClusterIT.java

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch load--1-time-partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/load--1-time-partition by this push:
     new 96c84365e56 Update IoTDBPipeClusterIT.java
96c84365e56 is described below

commit 96c84365e569d93ad03a67cb28efd1831f8660e9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 9 16:43:40 2024 +0800

    Update IoTDBPipeClusterIT.java
---
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     | 63 ++++++++++++++++++++++
 1 file changed, 63 insertions(+)

diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index e0194f8c591..9113e3669ba 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -862,4 +862,67 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualAutoIT {
       Assert.assertEquals(pipeCount, showPipeResult.size());
     }
   }
+
+  @Test
+  public void testNegativeTimestamp() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values (0, 1)",
+              "insert into root.db.d1(time, s1) values (-1, 2)",
+              "insert into root.db.d1(time, s1) values (1960-01-02T10:00:00+08:00, 2)",
+              "flush"))) {
+        return;
+      }
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor", "iotdb-extractor");
+
+      processorAttributes.put("processor", "do-nothing-processor");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("3,"));
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values (-123, 3)",
+              "insert into root.db.d1(time, s1) values (now(), 3)",
+              "flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("5,"));
+    }
+  }
 }