You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/07 18:05:53 UTC

[iotdb] 01/04: merge master

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f39f67447738f09099bc14bc17b36303153493d1
Merge: a9ca134f3b6 478e4d17952
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 21:29:09 2023 +0800

    merge master

 .../confignode/client/DataNodeRequestType.java     |  7 +--
 .../client/async/AsyncDataNodeClientPool.java      | 15 ++---
 .../client/async/handlers/AsyncClientHandler.java  |  1 +
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  4 ++
 .../procedure/env/ConfigNodeProcedureEnv.java      | 23 ++------
 .../pipe/task/AbstractOperatePipeProcedureV2.java  | 43 +++++++++++++-
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 28 +---------
 .../impl/pipe/task/DropPipeProcedureV2.java        | 16 +-----
 .../impl/pipe/task/StartPipeProcedureV2.java       | 28 ++--------
 .../impl/pipe/task/StopPipeProcedureV2.java        | 28 ++--------
 .../iotdb/confignode/persistence/PipeInfoTest.java |  1 +
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     | 10 +++-
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  4 ++
 .../commons/pipe/task/meta/PipeRuntimeMeta.java    | 20 +------
 .../commons/pipe/task/meta/PipeStaticMeta.java     | 65 +++++++++++-----------
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 51 ++++++++++++++++-
 .../iotdb/pipe/api/customizer/PipeParameters.java  | 22 ++++++++
 .../exception/PipeRuntimeCriticalException.java    | 40 +++++++++++++
 .../pipe/api/exception/PipeRuntimeException.java   | 40 +++++++++++++
 .../exception/PipeRuntimeNonCriticalException.java | 40 +++++++++++++
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  1 +
 .../impl/DataNodeInternalRPCServiceImpl.java       | 31 ++---------
 thrift/src/main/thrift/datanode.thrift             | 19 ++-----
 23 files changed, 320 insertions(+), 217 deletions(-)

diff --cc node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index d04549c4c37,14185dd8a5e..eab74083860
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@@ -32,12 -32,8 +32,8 @@@ import java.util.Map
  public class PipeStaticMeta {
  
    private String pipeName;
 -  private long createTime;
 +  private long creationTime;
  
-   private Map<String, String> collectorAttributes = new HashMap<>();
-   private Map<String, String> processorAttributes = new HashMap<>();
-   private Map<String, String> connectorAttributes = new HashMap<>();
- 
    private PipeParameters collectorParameters;
    private PipeParameters processorParameters;
    private PipeParameters connectorParameters;
@@@ -51,10 -47,7 +47,7 @@@
        Map<String, String> processorAttributes,
        Map<String, String> connectorAttributes) {
      this.pipeName = pipeName.toUpperCase();
 -    this.createTime = createTime;
 +    this.creationTime = creationTime;
-     this.collectorAttributes = collectorAttributes;
-     this.processorAttributes = processorAttributes;
-     this.connectorAttributes = connectorAttributes;
      collectorParameters = new PipeParameters(collectorAttributes);
      processorParameters = new PipeParameters(processorAttributes);
      connectorParameters = new PipeParameters(connectorAttributes);
@@@ -89,10 -82,10 +82,10 @@@
  
    public void serialize(DataOutputStream outputStream) throws IOException {
      ReadWriteIOUtils.write(pipeName, outputStream);
 -    ReadWriteIOUtils.write(createTime, outputStream);
 +    ReadWriteIOUtils.write(creationTime, outputStream);
  
-     outputStream.writeInt(collectorAttributes.size());
-     for (Map.Entry<String, String> entry : collectorAttributes.entrySet()) {
+     outputStream.writeInt(collectorParameters.getAttribute().size());
+     for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) {
        ReadWriteIOUtils.write(entry.getKey(), outputStream);
        ReadWriteIOUtils.write(entry.getValue(), outputStream);
      }
@@@ -117,12 -110,18 +110,18 @@@
      final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
  
      pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
 -    pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
 +    pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
  
+     pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
+     pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
+     pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+ 
      int size = byteBuffer.getInt();
      for (int i = 0; i < size; ++i) {
-       pipeStaticMeta.collectorAttributes.put(
-           ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+       pipeStaticMeta
+           .collectorParameters
+           .getAttribute()
+           .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
      }
      size = byteBuffer.getInt();
      for (int i = 0; i < size; ++i) {
@@@ -152,10 -151,10 +151,10 @@@
      }
      PipeStaticMeta that = (PipeStaticMeta) obj;
      return pipeName.equals(that.pipeName)
 -        && createTime == that.createTime
 +        && creationTime == that.creationTime
-         && collectorAttributes.equals(that.collectorAttributes)
-         && processorAttributes.equals(that.processorAttributes)
-         && connectorAttributes.equals(that.connectorAttributes);
+         && collectorParameters.equals(that.collectorParameters)
+         && processorParameters.equals(that.processorParameters)
+         && connectorParameters.equals(that.connectorParameters);
    }
  
    @Override
@@@ -169,14 -168,14 +168,14 @@@
          + "pipeName='"
          + pipeName
          + '\''
--        + ", createTime="
 -        + createTime
++        + ", creationTime="
 +        + creationTime
-         + ", collectorAttributes="
-         + collectorAttributes
-         + ", processorAttributes="
-         + processorAttributes
-         + ", connectorAttributes="
-         + connectorAttributes
+         + ", collectorParameters="
+         + collectorParameters.getAttribute()
+         + ", processorParameters="
+         + processorParameters.getAttribute()
+         + ", connectorParameters="
+         + connectorParameters.getAttribute()
          + '}';
    }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index fd401a325ef,9fffa867062..c9be5f146a1
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@@ -19,14 -19,8 +19,15 @@@
  
  package org.apache.iotdb.db.pipe.agent.task;
  
 +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
  import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
  import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 +import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 +import org.apache.iotdb.pipe.api.exception.PipeManagementException;
++
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
  public class PipeTaskAgent {