You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/07 18:05:53 UTC
[iotdb] 01/04: merge master
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch IOTDB-5787
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f39f67447738f09099bc14bc17b36303153493d1
Merge: a9ca134f3b6 478e4d17952
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 7 21:29:09 2023 +0800
merge master
.../confignode/client/DataNodeRequestType.java | 7 +--
.../client/async/AsyncDataNodeClientPool.java | 15 ++---
.../client/async/handlers/AsyncClientHandler.java | 1 +
.../confignode/persistence/pipe/PipeTaskInfo.java | 4 ++
.../procedure/env/ConfigNodeProcedureEnv.java | 23 ++------
.../pipe/task/AbstractOperatePipeProcedureV2.java | 43 +++++++++++++-
.../impl/pipe/task/CreatePipeProcedureV2.java | 28 +---------
.../impl/pipe/task/DropPipeProcedureV2.java | 16 +-----
.../impl/pipe/task/StartPipeProcedureV2.java | 28 ++--------
.../impl/pipe/task/StopPipeProcedureV2.java | 28 ++--------
.../iotdb/confignode/persistence/PipeInfoTest.java | 1 +
.../iotdb/commons/pipe/task/meta/PipeMeta.java | 10 +++-
.../commons/pipe/task/meta/PipeMetaKeeper.java | 4 ++
.../commons/pipe/task/meta/PipeRuntimeMeta.java | 20 +------
.../commons/pipe/task/meta/PipeStaticMeta.java | 65 +++++++++++-----------
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 51 ++++++++++++++++-
.../iotdb/pipe/api/customizer/PipeParameters.java | 22 ++++++++
.../exception/PipeRuntimeCriticalException.java | 40 +++++++++++++
.../pipe/api/exception/PipeRuntimeException.java | 40 +++++++++++++
.../exception/PipeRuntimeNonCriticalException.java | 40 +++++++++++++
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 1 +
.../impl/DataNodeInternalRPCServiceImpl.java | 31 ++---------
thrift/src/main/thrift/datanode.thrift | 19 ++-----
23 files changed, 320 insertions(+), 217 deletions(-)
diff --cc node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index d04549c4c37,14185dd8a5e..eab74083860
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@@ -32,12 -32,8 +32,8 @@@ import java.util.Map
public class PipeStaticMeta {
private String pipeName;
- private long createTime;
+ private long creationTime;
- private Map<String, String> collectorAttributes = new HashMap<>();
- private Map<String, String> processorAttributes = new HashMap<>();
- private Map<String, String> connectorAttributes = new HashMap<>();
-
private PipeParameters collectorParameters;
private PipeParameters processorParameters;
private PipeParameters connectorParameters;
@@@ -51,10 -47,7 +47,7 @@@
Map<String, String> processorAttributes,
Map<String, String> connectorAttributes) {
this.pipeName = pipeName.toUpperCase();
- this.createTime = createTime;
+ this.creationTime = creationTime;
- this.collectorAttributes = collectorAttributes;
- this.processorAttributes = processorAttributes;
- this.connectorAttributes = connectorAttributes;
collectorParameters = new PipeParameters(collectorAttributes);
processorParameters = new PipeParameters(processorAttributes);
connectorParameters = new PipeParameters(connectorAttributes);
@@@ -89,10 -82,10 +82,10 @@@
public void serialize(DataOutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(pipeName, outputStream);
- ReadWriteIOUtils.write(createTime, outputStream);
+ ReadWriteIOUtils.write(creationTime, outputStream);
- outputStream.writeInt(collectorAttributes.size());
- for (Map.Entry<String, String> entry : collectorAttributes.entrySet()) {
+ outputStream.writeInt(collectorParameters.getAttribute().size());
+ for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
@@@ -117,12 -110,18 +110,18 @@@
final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
- pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
+ pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
+ pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
+ pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
+ pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+
int size = byteBuffer.getInt();
for (int i = 0; i < size; ++i) {
- pipeStaticMeta.collectorAttributes.put(
- ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+ pipeStaticMeta
+ .collectorParameters
+ .getAttribute()
+ .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
}
size = byteBuffer.getInt();
for (int i = 0; i < size; ++i) {
@@@ -152,10 -151,10 +151,10 @@@
}
PipeStaticMeta that = (PipeStaticMeta) obj;
return pipeName.equals(that.pipeName)
- && createTime == that.createTime
+ && creationTime == that.creationTime
- && collectorAttributes.equals(that.collectorAttributes)
- && processorAttributes.equals(that.processorAttributes)
- && connectorAttributes.equals(that.connectorAttributes);
+ && collectorParameters.equals(that.collectorParameters)
+ && processorParameters.equals(that.processorParameters)
+ && connectorParameters.equals(that.connectorParameters);
}
@Override
@@@ -169,14 -168,14 +168,14 @@@
+ "pipeName='"
+ pipeName
+ '\''
-- + ", createTime="
- + createTime
++ + ", creationTime="
+ + creationTime
- + ", collectorAttributes="
- + collectorAttributes
- + ", processorAttributes="
- + processorAttributes
- + ", connectorAttributes="
- + connectorAttributes
+ + ", collectorParameters="
+ + collectorParameters.getAttribute()
+ + ", processorParameters="
+ + processorParameters.getAttribute()
+ + ", connectorParameters="
+ + connectorParameters.getAttribute()
+ '}';
}
}
diff --cc server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index fd401a325ef,9fffa867062..c9be5f146a1
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@@ -19,14 -19,8 +19,15 @@@
package org.apache.iotdb.db.pipe.agent.task;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
++
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PipeTaskAgent {