You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/20 07:14:36 UTC
[iotdb] branch master updated: [IOTDB-5847] Pipe: IoTDB Thrift Connector and PipeReceiverAgent (#9829)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 738a1389842 [IOTDB-5847] Pipe: IoTDB Thrift Connector and PipeReceiverAgent (#9829)
738a1389842 is described below
commit 738a1389842b06ecd3747045c57dc3d5c7dd50bc
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sat May 20 15:14:29 2023 +0800
[IOTDB-5847] Pipe: IoTDB Thrift Connector and PipeReceiverAgent (#9829)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../persistence/pipe/PipePluginInfo.java | 44 ++--
.../confignode/persistence/pipe/PipeTaskInfo.java | 67 ++---
.../runtime/PipeHandleLeaderChangeProcedure.java | 3 +-
.../pipe/task/AbstractOperatePipeProcedureV2.java | 11 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 16 +-
.../impl/pipe/task/DropPipeProcedureV2.java | 4 +-
.../impl/pipe/task/StartPipeProcedureV2.java | 4 +-
.../impl/pipe/task/StopPipeProcedureV2.java | 4 +-
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 6 +-
.../{DefaultCollector.java => IoTDBCollector.java} | 2 +-
.../IoTDBThriftConnector.java} | 48 ++--
.../org/apache/iotdb/pipe/api/PipeConnector.java | 4 +
.../org/apache/iotdb/db/engine/StorageEngine.java | 41 +++
.../db/engine/storagegroup/TsFileResource.java | 2 +-
.../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 12 +
.../db/pipe/agent/plugin/PipePluginAgent.java | 2 +-
.../receiver/IoTDBThriftReceiver.java} | 19 +-
.../db/pipe/agent/receiver/PipeReceiverAgent.java | 90 +++++++
.../apache/iotdb/db/pipe/config/PipeConfig.java | 16 ++
.../db/pipe/config/PipeConnectorConstant.java | 3 +
.../core/collector/IoTDBDataRegionCollector.java | 39 ++-
.../PipeHistoricalDataRegionTsFileCollector.java | 13 +-
.../PipeRealtimeDataRegionHybridCollector.java | 6 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 3 +-
.../listener/PipeInsertionDataNodeListener.java | 4 +-
.../impl/iotdb/IoTDBThriftConnectorClient.java | 70 ++++++
.../impl/iotdb/IoTDBThriftConnectorVersion.java} | 16 +-
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 231 +++++++++++++++++
.../impl/iotdb/v1/IoTDBThriftReceiverV1.java | 275 +++++++++++++++++++++
.../connector/impl/iotdb/v1/PipeRequestType.java | 56 +++++
.../v1/reponse/PipeTransferFilePieceResp.java | 80 ++++++
.../iotdb/v1/request/PipeTransferFilePieceReq.java | 88 +++++++
.../iotdb/v1/request/PipeTransferFileSealReq.java | 79 ++++++
.../iotdb/v1/request/PipeTransferHandshakeReq.java | 71 ++++++
.../v1/request/PipeTransferInsertNodeReq.java | 102 ++++++++
.../PipeConnectorSubtaskLifeCycle.java | 2 +-
.../{ => manager}/PipeConnectorSubtaskManager.java | 18 +-
.../core/event/impl/PipeTabletInsertionEvent.java | 4 +
.../core/event/impl/PipeTsFileInsertionEvent.java | 37 ++-
.../realtime/PipeRealtimeCollectEventFactory.java | 6 +-
.../event/view/collector/PipeEventCollector.java | 3 +-
.../execution/executor/PipeSubtaskExecutor.java | 15 +-
.../org/apache/iotdb/db/pipe/task/PipeBuilder.java | 8 +-
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 25 +-
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 11 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 19 +-
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 29 ++-
.../db/pipe/task/subtask/PipeProcessorSubtask.java | 21 +-
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 28 ++-
.../service/thrift/impl/ClientRPCServiceImpl.java | 9 +
.../pipe/core/connector/PipeThriftRequestTest.java | 118 +++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 9 +-
thrift/src/main/thrift/client.thrift | 13 +
53 files changed, 1727 insertions(+), 179 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index c954df5770e..f839f6251dc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -56,8 +56,8 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DEFAULT_COLLECTOR;
import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR;
+import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_COLLECTOR;
public class PipePluginInfo implements SnapshotProcessor {
@@ -121,17 +121,19 @@ public class PipePluginInfo implements SnapshotProcessor {
return !pipePluginMetaKeeper.containsJar(jarName);
}
- public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
+ public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
final PipeParameters collectorParameters =
new PipeParameters(createPipeRequest.getCollectorAttributes());
final String collectorPluginName =
collectorParameters.getStringOrDefault(
- PipeCollectorConstant.COLLECTOR_KEY, DEFAULT_COLLECTOR.getPipePluginName());
+ PipeCollectorConstant.COLLECTOR_KEY, IOTDB_COLLECTOR.getPipePluginName());
if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) {
- LOGGER.warn(
- "Failed to create pipe, the pipe collector plugin {} does not exist",
- collectorPluginName);
- return false;
+ final String exceptionMessage =
+ String.format(
+ "Failed to create pipe, the pipe collector plugin %s does not exist",
+ collectorPluginName);
+ LOGGER.warn(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
final PipeParameters processorParameters =
@@ -140,28 +142,32 @@ public class PipePluginInfo implements SnapshotProcessor {
processorParameters.getStringOrDefault(
PipeProcessorConstant.PROCESSOR_KEY, DO_NOTHING_PROCESSOR.getPipePluginName());
if (!pipePluginMetaKeeper.containsPipePlugin(processorPluginName)) {
- LOGGER.warn(
- "Failed to create pipe, the pipe processor plugin {} does not exist",
- processorPluginName);
- return false;
+ final String exceptionMessage =
+ String.format(
+ "Failed to create pipe, the pipe processor plugin %s does not exist",
+ processorPluginName);
+ LOGGER.warn(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
final PipeParameters connectorParameters =
new PipeParameters(createPipeRequest.getConnectorAttributes());
if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
- LOGGER.warn("Failed to create pipe, the pipe connector plugin is not specified");
- return false;
+ final String exceptionMessage =
+ "Failed to create pipe, the pipe connector plugin is not specified";
+ LOGGER.warn(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
final String connectorPluginName =
connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY);
if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
- LOGGER.warn(
- "Failed to create pipe, the pipe connector plugin {} does not exist",
- connectorPluginName);
- return false;
+ final String exceptionMessage =
+ String.format(
+ "Failed to create pipe, the pipe connector plugin %s does not exist",
+ connectorPluginName);
+ LOGGER.warn(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
-
- return true;
}
/////////////////////////////// Pipe Plugin Management ///////////////////////////////
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 528725b1327..5ac5b7fca21 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStat
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -71,59 +72,67 @@ public class PipeTaskInfo implements SnapshotProcessor {
/////////////////////////////// Validator ///////////////////////////////
- public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
+ public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest)
+ throws PipeManagementException {
if (!isPipeExisted(createPipeRequest.getPipeName())) {
- return true;
+ return;
}
- LOGGER.info(
+ final String exceptionMessage =
String.format(
- "Failed to create pipe [%s], the pipe with the same name has been created",
- createPipeRequest.getPipeName()));
- return false;
+ "Failed to create pipe %s, the pipe with the same name has been created",
+ createPipeRequest.getPipeName());
+ LOGGER.info(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
- public boolean checkBeforeStartPipe(String pipeName) {
+ public void checkBeforeStartPipe(String pipeName) throws PipeManagementException {
if (!isPipeExisted(pipeName)) {
- LOGGER.info(String.format("Failed to start pipe [%s], the pipe does not exist", pipeName));
- return false;
+ final String exceptionMessage =
+ String.format("Failed to start pipe %s, the pipe does not exist", pipeName);
+ LOGGER.info(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
final PipeStatus pipeStatus = getPipeStatus(pipeName);
if (pipeStatus == PipeStatus.RUNNING) {
- LOGGER.info(
- String.format("Failed to start pipe [%s], the pipe is already running", pipeName));
- return false;
+ final String exceptionMessage =
+ String.format("Failed to start pipe %s, the pipe is already running", pipeName);
+ LOGGER.info(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
if (pipeStatus == PipeStatus.DROPPED) {
- LOGGER.info(
- String.format("Failed to start pipe [%s], the pipe is already dropped", pipeName));
- return false;
+ final String exceptionMessage =
+ String.format("Failed to start pipe %s, the pipe is already dropped", pipeName);
+ LOGGER.info(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
-
- return true;
}
- public boolean checkBeforeStopPipe(String pipeName) {
+ public void checkBeforeStopPipe(String pipeName) throws PipeManagementException {
if (!isPipeExisted(pipeName)) {
- LOGGER.info(String.format("Failed to stop pipe [%s], the pipe does not exist", pipeName));
- return false;
+ final String exceptionMessage =
+ String.format("Failed to stop pipe %s, the pipe does not exist", pipeName);
+ LOGGER.info(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
final PipeStatus pipeStatus = getPipeStatus(pipeName);
if (pipeStatus == PipeStatus.STOPPED) {
- LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already stop", pipeName));
- return false;
+ final String exceptionMessage =
+ String.format("Failed to stop pipe %s, the pipe is already stop", pipeName);
+ LOGGER.info(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
if (pipeStatus == PipeStatus.DROPPED) {
- LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already dropped", pipeName));
- return false;
+ final String exceptionMessage =
+ String.format("Failed to stop pipe %s, the pipe is already dropped", pipeName);
+ LOGGER.info(exceptionMessage);
+ throw new PipeManagementException(exceptionMessage);
}
-
- return true;
}
- public boolean checkBeforeDropPipe(String pipeName) {
+ public void checkBeforeDropPipe(String pipeName) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Check before drop pipe {}, pipe exists: {}.",
@@ -131,8 +140,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
isPipeExisted(pipeName) ? "true" : "false");
}
// no matter whether the pipe exists, we allow the drop operation executed on all nodes to
- // ensure the consistency
- return true;
+ // ensure the consistency.
+ // DO NOTHING HERE!
}
private boolean isPipeExisted(String pipeName) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 42de84cc9e5..3fbe3091101 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -65,11 +65,10 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur
}
@Override
- protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
+ protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromValidateTask");
// nothing needs to be checked
- return true;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 402a9d43bf3..15065499562 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -66,8 +65,7 @@ public abstract class AbstractOperatePipeProcedureV2
*
* @return true if procedure can finish directly
*/
- protected abstract boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
- throws PipeException, PipeSinkException;
+ protected abstract void executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeException;
/** Execute at state CALCULATE_INFO_FOR_TASK */
protected abstract void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env)
@@ -88,10 +86,7 @@ public abstract class AbstractOperatePipeProcedureV2
switch (state) {
case VALIDATE_TASK:
env.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
- if (!executeFromValidateTask(env)) {
- env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
- return Flow.NO_MORE_STATE;
- }
+ executeFromValidateTask(env);
setNextState(OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
break;
case CALCULATE_INFO_FOR_TASK:
@@ -107,7 +102,7 @@ public abstract class AbstractOperatePipeProcedureV2
env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
return Flow.NO_MORE_STATE;
}
- } catch (PipeException | PipeSinkException | IOException e) {
+ } catch (Exception e) {
if (isRollbackSupported(state)) {
LOGGER.error("Fail in OperatePipeProcedure", e);
setFailure(new ProcedureException(e.getMessage()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index ec45d1c26b4..e0de835fb07 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -69,19 +69,17 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
}
@Override
- protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
+ protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
+ throws PipeManagementException {
LOGGER.info(
"CreatePipeProcedureV2: executeFromValidateTask({})", createPipeRequest.getPipeName());
final PipeManager pipeManager = env.getConfigManager().getPipeManager();
- return pipeManager
- .getPipePluginCoordinator()
- .getPipePluginInfo()
- .checkBeforeCreatePipe(createPipeRequest)
- && pipeManager
- .getPipeTaskCoordinator()
- .getPipeTaskInfo()
- .checkBeforeCreatePipe(createPipeRequest);
+ pipeManager
+ .getPipePluginCoordinator()
+ .getPipePluginInfo()
+ .checkBeforeCreatePipe(createPipeRequest);
+ pipeManager.getPipeTaskCoordinator().getPipeTaskInfo().checkBeforeCreatePipe(createPipeRequest);
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index d5826af1081..5a715ae3534 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -55,11 +55,11 @@ public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
}
@Override
- protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
+ protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
throws PipeManagementException {
LOGGER.info("DropPipeProcedureV2: executeFromValidateTask({})", pipeName);
- return env.getConfigManager()
+ env.getConfigManager()
.getPipeManager()
.getPipeTaskCoordinator()
.getPipeTaskInfo()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 8163f625ef2..ecd4cdc57c0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -56,11 +56,11 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
}
@Override
- protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
+ protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
throws PipeManagementException {
LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
- return env.getConfigManager()
+ env.getConfigManager()
.getPipeManager()
.getPipeTaskCoordinator()
.getPipeTaskInfo()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 4a8279d583b..fcd015fb940 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -56,11 +56,11 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
}
@Override
- protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
+ protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
throws PipeManagementException {
LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
- return env.getConfigManager()
+ env.getConfigManager()
.getPipeManager()
.getPipeTaskCoordinator()
.getPipeTaskInfo()
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 6e59cab398e..d1a95bb45ba 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -19,20 +19,22 @@
package org.apache.iotdb.commons.pipe.plugin.builtin;
-import org.apache.iotdb.commons.pipe.plugin.builtin.collector.DefaultCollector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
public enum BuiltinPipePlugin {
// collectors
- DEFAULT_COLLECTOR("default_collector", DefaultCollector.class),
+ IOTDB_COLLECTOR("iotdb_collector", IoTDBCollector.class),
// processors
DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
// connectors
DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
+ IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class);
;
private final String pipePluginName;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
similarity index 97%
copy from node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
index fed08ae2197..9902cce4ca0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.pipe.api.event.Event;
* imported here. The pipe agent in the server module will replace this class with the real
* implementation when initializing the collector.
*/
-public class DefaultCollector implements PipeCollector {
+public class IoTDBCollector implements PipeCollector {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
similarity index 51%
rename from node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index fed08ae2197..e252e5be726 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -17,44 +17,62 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.plugin.builtin.collector;
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
-import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
/**
- * This class is a placeholder and should not be initialized. It represents the default collector
- * when no collector is specified. There is a real implementation in the server module but cannot be
- * imported here. The pipe agent in the server module will replace this class with the real
- * implementation when initializing the collector.
+ * This class is a placeholder and should not be initialized. It represents the IoTDB Thrift
+ * connector. There is a real implementation in the server module but cannot be imported here. The
+ * pipe agent in the server module will replace this class with the real implementation when
+ * initializing the IoTDB Thrift connector.
*/
-public class DefaultCollector implements PipeCollector {
+public class IoTDBThriftConnector implements PipeConnector {
+
+ @Override
+ public void validate(PipeParameterValidator validator) {
+ throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) {
+ throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+ }
+
+ @Override
+ public void handshake() {
+ throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+ }
+
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
+ public void heartbeat() {
throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
}
@Override
- public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
- throws Exception {
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) {
throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
}
@Override
- public void start() throws Exception {
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
}
@Override
- public Event supply() throws Exception {
+ public void transfer(DeletionEvent deletionEvent) {
throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
}
@Override
- public void close() throws Exception {
+ public void close() {
throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
}
}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 2c9bbff5aa6..5502de8bb37 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfig
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
/**
* PipeConnector
@@ -114,6 +115,7 @@ public interface PipeConnector extends PipePlugin {
* This method is used to transfer the TabletInsertionEvent.
*
* @param tabletInsertionEvent TabletInsertionEvent to be transferred
+ * @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;
@@ -122,6 +124,7 @@ public interface PipeConnector extends PipePlugin {
* This method is used to transfer the TsFileInsertionEvent.
*
* @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
+ * @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception;
@@ -130,6 +133,7 @@ public interface PipeConnector extends PipePlugin {
* This method is used to transfer the DeletionEvent.
*
* @param deletionEvent DeletionEvent to be transferred
+ * @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
void transfer(DeletionEvent deletionEvent) throws Exception;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6ebfff6ffca..c8131ebf286 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -94,6 +94,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
@@ -700,6 +701,46 @@ public class StorageEngine implements IService {
}
}
+ /**
+ * run the runnable if the region is absent. if the region is present, do nothing.
+ *
+ * <p>we don't use computeIfAbsent because we don't want to create a new region if the region is
+ * absent, we just want to run the runnable in a synchronized way.
+ *
+ * @return true if the region is absent and the runnable is run. false if the region is present.
+ */
+ public boolean runIfAbsent(DataRegionId regionId, Runnable runnable) {
+ final AtomicBoolean result = new AtomicBoolean(false);
+ dataRegionMap.computeIfAbsent(
+ regionId,
+ k -> {
+ runnable.run();
+ result.set(true);
+ return null;
+ });
+ return result.get();
+ }
+
+ /**
+ * run the consumer if the region is present. if the region is absent, do nothing.
+ *
+ * <p>we don't use computeIfPresent because we don't want to remove the region if the consumer
+ * returns null, we just want to run the consumer in a synchronized way.
+ *
+ * @return true if the region is present and the consumer is run. false if the region is absent.
+ */
+ public boolean runIfPresent(DataRegionId regionId, Consumer<DataRegion> consumer) {
+ final AtomicBoolean result = new AtomicBoolean(false);
+ dataRegionMap.computeIfPresent(
+ regionId,
+ (id, region) -> {
+ consumer.accept(region);
+ result.set(true);
+ return region;
+ });
+ return result.get();
+ }
+
public DataRegion getDataRegion(DataRegionId regionId) {
return dataRegionMap.get(regionId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index e91fe3ffc87..6b03f3ddede 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -482,7 +482,7 @@ public class TsFileResource {
timeIndex.close();
}
- TsFileProcessor getProcessor() {
+ public TsFileProcessor getProcessor() {
return processor;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 501cbf0016d..2db8738c557 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent;
import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.receiver.PipeReceiverAgent;
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
@@ -29,12 +30,14 @@ public class PipeAgent {
private final PipePluginAgent pipePluginAgent;
private final PipeTaskAgent pipeTaskAgent;
private final PipeRuntimeAgent pipeRuntimeAgent;
+ private final PipeReceiverAgent pipeReceiverAgent;
/** Private constructor to prevent users from creating a new instance. */
private PipeAgent() {
pipePluginAgent = new PipePluginAgent();
pipeTaskAgent = new PipeTaskAgent();
pipeRuntimeAgent = new PipeRuntimeAgent();
+ pipeReceiverAgent = new PipeReceiverAgent();
}
/** The singleton holder of PipeAgent. */
@@ -68,4 +71,13 @@ public class PipeAgent {
public static PipeRuntimeAgent runtime() {
return PipeAgentHolder.HANDLE.pipeRuntimeAgent;
}
+
+ /**
+ * Get the singleton instance of PipeReceiverAgent.
+ *
+ * @return the singleton instance of PipeReceiverAgent
+ */
+ public static PipeReceiverAgent receiver() {
+ return PipeAgentHolder.HANDLE.pipeReceiverAgent;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 3474edddf3d..b0fe93b4533 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -198,7 +198,7 @@ public class PipePluginAgent {
reflect(
collectorParameters.getStringOrDefault(
PipeCollectorConstant.COLLECTOR_KEY,
- BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName()));
+ BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName()));
}
public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
index 67a637503b2..81d6fef23d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
@@ -17,13 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.agent.receiver;
-public class PipeConnectorConstant {
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
- public static final String CONNECTOR_KEY = "connector";
+public interface IoTDBThriftReceiver {
- private PipeConnectorConstant() {
- throw new IllegalStateException("Utility class");
- }
+ IoTDBThriftConnectorVersion getVersion();
+
+ TPipeTransferResp handleTransferReq(
+ TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher);
+
+ void handleExit();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
new file mode 100644
index 00000000000..ab48a9f6149
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.receiver;
+
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftReceiverV1;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeReceiverAgent {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverAgent.class);
+
+ private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new ThreadLocal<>();
+
+ public TPipeTransferResp transfer(
+ TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+ final byte reqVersion = req.getVersion();
+ if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
+ return getReceiver(reqVersion).handleTransferReq(req, partitionFetcher, schemaFetcher);
+ } else {
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_VERSION_ERROR,
+ String.format("Unsupported pipe version %d", reqVersion)));
+ }
+ }
+
+ private IoTDBThriftReceiver getReceiver(byte reqVersion) {
+ if (receiverThreadLocal.get() == null) {
+ return setAndGetReceiver(reqVersion);
+ }
+
+ final byte receiverThreadLocalVersion = receiverThreadLocal.get().getVersion().getVersion();
+ if (receiverThreadLocalVersion != reqVersion) {
+ LOGGER.warn(
+ "The receiver version {} is different from the sender version {},"
+ + " the receiver will be reset to the sender version.",
+ receiverThreadLocalVersion,
+ reqVersion);
+ receiverThreadLocal.get().handleExit();
+ receiverThreadLocal.remove();
+ return setAndGetReceiver(reqVersion);
+ }
+
+ return receiverThreadLocal.get();
+ }
+
+ private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
+ if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
+ receiverThreadLocal.set(new IoTDBThriftReceiverV1());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unsupported pipe version %d", reqVersion));
+ }
+ return receiverThreadLocal.get();
+ }
+
+ public void handleClientExit() {
+ final IoTDBThriftReceiver receiver = receiverThreadLocal.get();
+ if (receiver != null) {
+ receiver.handleExit();
+ receiverThreadLocal.remove();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
index c0018f8767d..5690692e165 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.db.pipe.config;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import java.io.File;
+
// TODO: make these parameters configurable
// TODO: make all pipe related parameters in one place
// TODO: set the default value of the parameters in IoTDBDescriptor
@@ -41,6 +45,8 @@ public class PipeConfig {
private final int realtimeCollectorPendingQueueTabletLimit =
realtimeCollectorPendingQueueCapacity / 2;
+ private final int readFileBufferSize = 8388608;
+
public int getDefaultRingBufferSize() {
return defaultRingBufferSize;
}
@@ -57,6 +63,16 @@ public class PipeConfig {
return realtimeCollectorPendingQueueTabletLimit;
}
+ public String getReceiveFileDir() {
+ return IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ + File.separator
+ + "pipe"; // TODO: replace with resource manager
+ }
+
+ public int getReadFileBufferSize() {
+ return readFileBufferSize;
+ }
+
/////////////////////////////// Singleton ///////////////////////////////
private PipeConfig() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
index 67a637503b2..7b88c56ef20 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
@@ -23,6 +23,9 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_KEY = "connector";
+ public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
+ public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index d7526fa1877..fe1c92638be 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.pipe.core.collector;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
@@ -39,6 +42,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
// TODO: support pattern in historical collector
private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
+ private int dataRegionId;
+
public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event> collectorPendingQueue) {
hasBeenStarted = new AtomicBoolean(false);
realtimeCollector = new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
@@ -47,6 +52,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
+ validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
+
// TODO: require more attributes
realtimeCollector.validate(validator);
historicalCollector.validate(validator);
@@ -55,6 +62,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+ dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
+
realtimeCollector.customize(parameters, configuration);
historicalCollector.customize(parameters, configuration);
}
@@ -66,8 +75,36 @@ public class IoTDBDataRegionCollector implements PipeCollector {
}
hasBeenStarted.set(true);
- realtimeCollector.start();
+ while (true) {
+ // try to start collectors in the data region ...
+ // first try to run if data region exists, then try to run if data region does not exist.
+ // both conditions fail is not common, which means the data region is created during the
+ // runIfPresent and runIfAbsent operations. in this case, we need to retry.
+ if (StorageEngine.getInstance()
+ .runIfPresent(
+ new DataRegionId(dataRegionId),
+ (dataRegion -> {
+ dataRegion.writeLock(
+ String.format(
+ "Pipe: starting %s", IoTDBDataRegionCollector.class.getName()));
+ try {
+ startHistoricalCollectorAndRealtimeCollector();
+ } finally {
+ dataRegion.writeUnlock();
+ }
+ }))
+ || StorageEngine.getInstance()
+ .runIfAbsent(
+ new DataRegionId(dataRegionId),
+ this::startHistoricalCollectorAndRealtimeCollector)) {
+ return;
+ }
+ }
+ }
+
+ public void startHistoricalCollectorAndRealtimeCollector() {
historicalCollector.start();
+ realtimeCollector.start();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 4559ca8a537..ed0d37c6c14 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -56,6 +56,11 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
public synchronized void start() {
final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
+ if (dataRegion == null) {
+ pendingQueue = new ArrayDeque<>();
+ return;
+ }
+
dataRegion.writeLock("Pipe: collect historical TsFile");
try {
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -66,12 +71,16 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false));
pendingQueue.addAll(
tsFileManager.getTsFileList(true).stream()
- .map(o -> new PipeTsFileInsertionEvent(o.getTsFile()))
+ .map(PipeTsFileInsertionEvent::new)
.collect(Collectors.toList()));
pendingQueue.addAll(
tsFileManager.getTsFileList(false).stream()
- .map(o -> new PipeTsFileInsertionEvent(o.getTsFile()))
+ .map(PipeTsFileInsertionEvent::new)
.collect(Collectors.toList()));
+ pendingQueue.forEach(
+ event ->
+ event.increaseReferenceCount(
+ PipeHistoricalDataRegionTsFileCollector.class.getName()));
} finally {
tsFileManager.readUnlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index ab7b705ce87..67d461e8c8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
-import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -123,7 +122,6 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
}
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
-
if (suppliedEvent != null) {
return suppliedEvent;
}
@@ -144,7 +142,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
(state.equals(TsFileEpoch.State.EMPTY)) ? TsFileEpoch.State.USING_TABLET : state);
if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
- if (event.increaseReferenceCount(PipeProcessorSubtask.class.getName())) {
+ if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName())) {
return event.getEvent();
} else {
// if the event's reference count can not be increased, it means the data represented by
@@ -174,7 +172,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
});
if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
- if (event.increaseReferenceCount(PipeProcessorSubtask.class.getName())) {
+ if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName())) {
return event.getEvent();
} else {
// if the event's reference count can not be increased, it means the data represented by
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 4fc03f371cd..2d805310f30 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
-import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.PipeDataRegionMatcher;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -54,7 +53,7 @@ public class PipeDataRegionAssigner {
.match(event)
.forEach(
collector -> {
- event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
+ event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
collector.collect(event);
});
event.gcSchemaInfo();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index bfa764c3711..778a583f1c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -81,9 +81,7 @@ public class PipeInsertionDataNodeListener {
return;
}
- assigner.publishToAssign(
- PipeRealtimeCollectEventFactory.createCollectEvent(
- tsFileResource.getTsFile(), tsFileResource));
+ assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource));
}
// TODO: check whether the method is called on the right place.
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorClient.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorClient.java
new file mode 100644
index 00000000000..16ebfb2d74d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorClient.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb;
+
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class IoTDBThriftConnectorClient extends IClientRPCService.Client
+ implements ThriftClient, AutoCloseable {
+
+ public IoTDBThriftConnectorClient(ThriftClientProperty property, String ipAddress, int port)
+ throws TTransportException {
+ super(
+ property
+ .getProtocolFactory()
+ .getProtocol(
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ ipAddress,
+ port,
+ property.getConnectionTimeoutMs()))));
+ getInputProtocol().getTransport().open();
+ }
+
+ @Override
+ public void close() throws Exception {
+ invalidate();
+ }
+
+ @Override
+ public void invalidate() {
+ if (getInputProtocol().getTransport().isOpen()) {
+ getInputProtocol().getTransport().close();
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ invalidate();
+ }
+
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return true;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorVersion.java
similarity index 74%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorVersion.java
index 67a637503b2..d05fd374544 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorVersion.java
@@ -17,13 +17,19 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb;
-public class PipeConnectorConstant {
+public enum IoTDBThriftConnectorVersion {
+ VERSION_ONE((byte) 1),
+ ;
- public static final String CONNECTOR_KEY = "connector";
+ private final byte version;
- private PipeConnectorConstant() {
- throw new IllegalStateException("Utility class");
+ IoTDBThriftConnectorVersion(byte type) {
+ this.version = type;
+ }
+
+ public byte getVersion() {
+ return version;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
new file mode 100644
index 00000000000..32408769579
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1;
+
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorClient;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+
+public class IoTDBThriftConnectorV1 implements PipeConnector {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV1.class);
+
+ private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ private String ipAddress;
+ private int port;
+
+ private IoTDBThriftConnectorClient client;
+
+ public IoTDBThriftConnectorV1() {}
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ validator
+ .validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY)
+ .validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY);
+ }
+
+ @Override
+ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
+ throws Exception {
+ this.ipAddress = parameters.getString(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY);
+ this.port = parameters.getInt(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY);
+ }
+
+ @Override
+ public void handshake() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+
+ client =
+ new IoTDBThriftConnectorClient(
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
+ .build(),
+ ipAddress,
+ port);
+
+ final TPipeTransferResp resp =
+ client.pipeTransfer(
+ PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision()));
+ if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(String.format("Handshake error, result status %s.", resp.status));
+ }
+ }
+
+ @Override
+ public void heartbeat() throws Exception {}
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
+ // TODO: support more TabletInsertionEvent
+ // PipeProcessor can change the type of TabletInsertionEvent
+ if (!(tabletInsertionEvent instanceof PipeTabletInsertionEvent)) {
+ throw new NotImplementedException(
+ "IoTDBThriftConnectorV1 only support PipeTabletInsertionEvent.");
+ }
+
+ try {
+ doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
+ } catch (TException e) {
+ LOGGER.error(
+ "Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e);
+ // the connection may be broken, try to reconnect by catching PipeConnectionException
+ throw new PipeConnectionException("Network error when transfer tablet insertion event.", e);
+ }
+ }
+
+ private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
+ throws PipeException, TException {
+ final TPipeTransferResp resp =
+ client.pipeTransfer(
+ PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
+
+ if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Transfer tablet insertion event %s error, result status %s",
+ pipeTabletInsertionEvent, resp.status));
+ }
+ }
+
+ @Override
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
+ // TODO: support more TsFileInsertionEvent
+ // PipeProcessor can change the type of TabletInsertionEvent
+ if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+ throw new NotImplementedException(
+ "IoTDBThriftConnectorV1 only support PipeTsFileInsertionEvent.");
+ }
+
+ try {
+ doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+ } catch (TException e) {
+ LOGGER.error(
+ "Network error when transfer tsfile insertion event: {}.", tsFileInsertionEvent, e);
+ // the connection may be broken, try to reconnect by catching PipeConnectionException
+ throw new PipeConnectionException("Network error when transfer tsfile insertion event.", e);
+ }
+ }
+
+ private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
+ throws PipeException, TException, InterruptedException, IOException {
+ pipeTsFileInsertionEvent.waitForTsFileClose();
+
+ final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+
+ // 1. transfer file piece by piece
+ final int readFileBufferSize = PipeConfig.getInstance().getReadFileBufferSize();
+ final byte[] readBuffer = new byte[readFileBufferSize];
+ long position = 0;
+ try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
+ while (true) {
+ final int readLength = reader.read(readBuffer);
+ if (readLength == -1) {
+ break;
+ }
+
+ final PipeTransferFilePieceResp resp =
+ PipeTransferFilePieceResp.fromTPipeTransferResp(
+ client.pipeTransfer(
+ PipeTransferFilePieceReq.toTPipeTransferReq(
+ tsFile.getName(),
+ position,
+ readLength == readFileBufferSize
+ ? readBuffer
+ : Arrays.copyOfRange(readBuffer, 0, readLength))));
+ position += readLength;
+
+ // this case only happens when the connection is broken, and the connector is reconnected
+ // to the receiver, then the receiver will redirect the file position to the last position
+ if (resp.getStatus().getCode()
+ == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
+ position = resp.getEndWritingOffset();
+ reader.seek(position);
+ LOGGER.info(String.format("Redirect file position to %s.", position));
+ continue;
+ }
+
+ if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format("Transfer file %s error, result status %s.", tsFile, resp.getStatus()));
+ }
+ }
+ }
+
+ // 2. transfer file seal signal, which means the file is transferred completely
+ final TPipeTransferResp resp =
+ client.pipeTransfer(
+ PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()));
+ if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()));
+ }
+ }
+
+ @Override
+ public void transfer(DeletionEvent deletionEvent) throws Exception {
+ throw new NotImplementedException("Not implement for deletion event.");
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
new file mode 100644
index 00000000000..2cbcc748db7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
+
+ private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private static final String RECEIVE_DIR = PipeConfig.getInstance().getReceiveFileDir();
+
+ private File writingFile;
+ private RandomAccessFile writingFileWriter;
+
+ @Override
+ public synchronized TPipeTransferResp handleTransferReq(
+ TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+ final short rawRequestType = req.getType();
+ if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
+ switch (PipeRequestType.valueOf(rawRequestType)) {
+ case HANDSHAKE:
+ return handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
+ case TRANSFER_INSERT_NODE:
+ return handleTransferInsertNode(
+ PipeTransferInsertNodeReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
+ case TRANSFER_FILE_PIECE:
+ return handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(req));
+ case TRANSFER_FILE_SEAL:
+ return handleTransferFileSeal(
+ PipeTransferFileSealReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
+ default:
+ break;
+ }
+ }
+
+ // unknown request type, which means the request can not be handled by this receiver,
+ // maybe the version of the receiver is not compatible with the sender
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TYPE_ERROR,
+ String.format("Unknown transfer type %s.", rawRequestType)));
+ }
+
+ private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq req) {
+ if (!IOTDB_CONFIG.getTimestampPrecision().equals(req.getTimestampPrecision())) {
+ String msg =
+ String.format(
+ "IoTDB receiver's timestamp precision %s, connector's timestamp precision %s. validation fails.",
+ IOTDB_CONFIG.getTimestampPrecision(), req.getTimestampPrecision());
+ LOGGER.warn(msg);
+ return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_HANDSHAKE_ERROR, msg));
+ }
+
+ LOGGER.info("Handshake successfully.");
+ return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
+ }
+
+ private TPipeTransferResp handleTransferInsertNode(
+ PipeTransferInsertNodeReq req,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
+ return new TPipeTransferResp(
+ executeStatement(req.constructStatement(), partitionFetcher, schemaFetcher));
+ }
+
+ private TSStatus executeStatement(
+ Statement statement, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+ final long queryId = SessionManager.getInstance().requestQueryId();
+ final ExecutionResult result =
+ Coordinator.getInstance()
+ .execute(
+ statement,
+ queryId,
+ null,
+ "",
+ partitionFetcher,
+ schemaFetcher,
+ IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "failed to execute statement, statement: {}, result status is: {}",
+ statement,
+ result.status);
+ }
+ return result.status;
+ }
+
+ private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq req) {
+ try {
+ updateWritingFileIfNeeded(req.getFileName());
+
+ if (!isWritingFileOffsetCorrect(req.getStartWritingOffset())) {
+ return PipeTransferFilePieceResp.toTPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET,
+ String.format(
+ "request sender reset file reader's offset from %s to %s.",
+ req.getStartWritingOffset(), writingFileWriter.length())),
+ writingFileWriter.length());
+ }
+
+ writingFileWriter.write(req.getFilePiece());
+ return PipeTransferFilePieceResp.toTPipeTransferResp(
+ RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
+ } catch (Exception e) {
+ LOGGER.warn(String.format("failed to write file piece from req %s.", req), e);
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+ String.format("failed to write file piece, because %s", e.getMessage()));
+ try {
+ return PipeTransferFilePieceResp.toTPipeTransferResp(
+ status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
+ } catch (IOException ex) {
+ return PipeTransferFilePieceResp.toTPipeTransferResp(status);
+ }
+ }
+ }
+
+ private void updateWritingFileIfNeeded(String fileName) throws IOException {
+ if (isFileExistedAndNameCorrect(fileName)) {
+ return;
+ }
+
+ if (writingFileWriter != null) {
+ writingFileWriter.close();
+ writingFileWriter = null;
+ }
+ if (writingFile != null && writingFile.exists()) {
+ final boolean ignored = writingFile.delete();
+ LOGGER.info(String.format("original file %s was deleted.", writingFile.getPath()));
+ writingFile = null;
+ }
+
+ final File receiveDir = new File(RECEIVE_DIR);
+ if (!receiveDir.exists()) {
+ boolean ignored = receiveDir.mkdirs();
+ }
+ writingFile = new File(RECEIVE_DIR, fileName);
+ writingFileWriter = new RandomAccessFile(writingFile, "rw");
+ LOGGER.info(String.format("start to write transferring file %s.", writingFile.getPath()));
+ }
+
+ private boolean isFileExistedAndNameCorrect(String fileName) {
+ return writingFile != null && writingFile.getName().equals(fileName);
+ }
+
+ private boolean isWritingFileOffsetCorrect(long offset) throws IOException {
+ return writingFileWriter.length() == offset;
+ }
+
+ private TPipeTransferResp handleTransferFileSeal(
+ PipeTransferFileSealReq req,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
+ try {
+ if (!isWritingFileAvailable()) {
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+ String.format(
+ "failed to seal file, because writing file %s is not available.",
+ req.getFileName())));
+ }
+
+ if (!isFileExistedAndNameCorrect(req.getFileName())) {
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+ String.format(
+ "failed to seal file %s, but writing file is %s.",
+ req.getFileName(), writingFile)));
+ }
+
+ if (!isWritingFileOffsetCorrect(req.getFileLength())) {
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+ String.format(
+ "failed to seal file because the length of file is not correct. "
+ + "the original file has length %s, but receiver file has length %s.",
+ req.getFileLength(), writingFileWriter.length())));
+ }
+
+ writingFileWriter.close();
+
+ final LoadTsFileStatement statement = new LoadTsFileStatement(writingFile.getAbsolutePath());
+ statement.setDeleteAfterLoad(true);
+ statement.setVerifySchema(true);
+ statement.setAutoCreateDatabase(false);
+ return new TPipeTransferResp(executeStatement(statement, partitionFetcher, schemaFetcher));
+ } catch (IOException e) {
+ LOGGER.warn(String.format("failed to seal file %s from req %s.", writingFile, req), e);
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+ String.format("failed to seal file %s because %s", writingFile, e.getMessage())));
+ }
+ }
+
+ private boolean isWritingFileAvailable() {
+ return writingFile != null && writingFile.exists() && writingFileWriter != null;
+ }
+
+ @Override
+ public synchronized void handleExit() {
+ try {
+ if (writingFileWriter != null) {
+ writingFileWriter.close();
+ }
+ if (writingFile != null) {
+ if (!writingFile.delete()) {
+ LOGGER.warn(
+ String.format(
+ "IoTDBThriftReceiverV1#handleExit: delete file %s error.",
+ writingFile.getPath()));
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("IoTDBThriftReceiverV1#handleExit: meeting errors on handleExit().", e);
+ }
+ }
+
+ @Override
+ public IoTDBThriftConnectorVersion getVersion() {
+ return IoTDBThriftConnectorVersion.VERSION_ONE;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
new file mode 100644
index 00000000000..982243cd87e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PipeRequestType {
+ HANDSHAKE((short) 1),
+
+ TRANSFER_INSERT_NODE((short) 2),
+
+ TRANSFER_FILE_PIECE((short) 3),
+ TRANSFER_FILE_SEAL((short) 4),
+ ;
+
+ private final short type;
+
+ PipeRequestType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ private static final Map<Short, PipeRequestType> TYPE_MAP =
+ Arrays.stream(PipeRequestType.values())
+ .collect(HashMap::new, (map, type) -> map.put(type.getType(), type), HashMap::putAll);
+
+ public static boolean isValidatedRequestType(short type) {
+ return TYPE_MAP.containsKey(type);
+ }
+
+ public static PipeRequestType valueOf(short type) {
+ return TYPE_MAP.get(type);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/reponse/PipeTransferFilePieceResp.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/reponse/PipeTransferFilePieceResp.java
new file mode 100644
index 00000000000..56c3b8a4cd5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/reponse/PipeTransferFilePieceResp.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferFilePieceResp extends TPipeTransferResp {
+
+ public static final long ERROR_END_OFFSET = -1;
+
+ private long endWritingOffset;
+
+ private PipeTransferFilePieceResp() {}
+
+ public long getEndWritingOffset() {
+ return endWritingOffset;
+ }
+
+ public static PipeTransferFilePieceResp toTPipeTransferResp(
+ TSStatus status, long endWritingOffset) throws IOException {
+ final PipeTransferFilePieceResp filePieceResp = new PipeTransferFilePieceResp();
+
+ filePieceResp.status = status;
+
+ filePieceResp.endWritingOffset = endWritingOffset;
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(endWritingOffset, outputStream);
+ filePieceResp.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ return filePieceResp;
+ }
+
+ public static PipeTransferFilePieceResp toTPipeTransferResp(TSStatus status) {
+ final PipeTransferFilePieceResp filePieceResp = new PipeTransferFilePieceResp();
+
+ filePieceResp.status = status;
+
+ return filePieceResp;
+ }
+
+ public static PipeTransferFilePieceResp fromTPipeTransferResp(TPipeTransferResp transferResp) {
+ final PipeTransferFilePieceResp filePieceResp = new PipeTransferFilePieceResp();
+
+ filePieceResp.status = transferResp.status;
+
+ if (transferResp.isSetBody()) {
+ filePieceResp.endWritingOffset = ReadWriteIOUtils.readLong(transferResp.body);
+ filePieceResp.body = transferResp.body;
+ }
+
+ return filePieceResp;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFilePieceReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFilePieceReq.java
new file mode 100644
index 00000000000..6f6266fa1e9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFilePieceReq.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferFilePieceReq extends TPipeTransferReq {
+
+ private String fileName;
+ private long startWritingOffset;
+ private byte[] filePiece;
+
+ private PipeTransferFilePieceReq() {}
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public long getStartWritingOffset() {
+ return startWritingOffset;
+ }
+
+ public byte[] getFilePiece() {
+ return filePiece;
+ }
+
+ public static PipeTransferFilePieceReq toTPipeTransferReq(
+ String fileName, long startWritingOffset, byte[] filePiece) throws IOException {
+ final PipeTransferFilePieceReq filePieceReq = new PipeTransferFilePieceReq();
+
+ filePieceReq.fileName = fileName;
+ filePieceReq.startWritingOffset = startWritingOffset;
+ filePieceReq.filePiece = filePiece;
+
+ filePieceReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(fileName, outputStream);
+ ReadWriteIOUtils.write(startWritingOffset, outputStream);
+ ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+ filePieceReq.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ return filePieceReq;
+ }
+
+ public static PipeTransferFilePieceReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferFilePieceReq filePieceReq = new PipeTransferFilePieceReq();
+
+ filePieceReq.fileName = ReadWriteIOUtils.readString(transferReq.body);
+ filePieceReq.startWritingOffset = ReadWriteIOUtils.readLong(transferReq.body);
+ filePieceReq.filePiece = ReadWriteIOUtils.readBinary(transferReq.body).getValues();
+
+ filePieceReq.version = transferReq.version;
+ filePieceReq.type = transferReq.type;
+ filePieceReq.body = transferReq.body;
+
+ return filePieceReq;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFileSealReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFileSealReq.java
new file mode 100644
index 00000000000..935131db950
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFileSealReq.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferFileSealReq extends TPipeTransferReq {
+
+ private String fileName;
+ private long fileLength;
+
+ private PipeTransferFileSealReq() {}
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public long getFileLength() {
+ return fileLength;
+ }
+
+ public static PipeTransferFileSealReq toTPipeTransferReq(String fileName, long fileLength)
+ throws IOException {
+ final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
+
+ fileSealReq.fileName = fileName;
+ fileSealReq.fileLength = fileLength;
+
+ fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(fileName, outputStream);
+ ReadWriteIOUtils.write(fileLength, outputStream);
+ fileSealReq.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ return fileSealReq;
+ }
+
+ public static PipeTransferFileSealReq fromTPipeTransferReq(TPipeTransferReq req) {
+ final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
+
+ fileSealReq.fileName = ReadWriteIOUtils.readString(req.body);
+ fileSealReq.fileLength = ReadWriteIOUtils.readLong(req.body);
+
+ fileSealReq.version = req.version;
+ fileSealReq.type = req.type;
+ fileSealReq.body = req.body;
+
+ return fileSealReq;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferHandshakeReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferHandshakeReq.java
new file mode 100644
index 00000000000..f258c5036f9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferHandshakeReq.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferHandshakeReq extends TPipeTransferReq {
+
+ private String timestampPrecision;
+
+ private PipeTransferHandshakeReq() {}
+
+ public String getTimestampPrecision() {
+ return timestampPrecision;
+ }
+
+ public static PipeTransferHandshakeReq toTPipeTransferReq(String timestampPrecision)
+ throws IOException {
+ final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq();
+
+ handshakeReq.timestampPrecision = timestampPrecision;
+
+ handshakeReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(timestampPrecision, outputStream);
+ handshakeReq.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ return handshakeReq;
+ }
+
+ public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq();
+
+ handshakeReq.timestampPrecision = ReadWriteIOUtils.readString(transferReq.body);
+
+ handshakeReq.version = transferReq.version;
+ handshakeReq.type = transferReq.type;
+ handshakeReq.body = transferReq.body;
+
+ return handshakeReq;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferInsertNodeReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferInsertNodeReq.java
new file mode 100644
index 00000000000..90af3b2fae6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferInsertNodeReq.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+public class PipeTransferInsertNodeReq extends TPipeTransferReq {
+
+ private InsertNode insertNode;
+
+ private PipeTransferInsertNodeReq() {}
+
+ public InsertNode getInsertNode() {
+ return insertNode;
+ }
+
+ public Statement constructStatement() {
+ if (insertNode instanceof InsertRowNode) {
+ final InsertRowNode node = (InsertRowNode) insertNode;
+
+ final InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(node.getDevicePath());
+ statement.setTime(node.getTime());
+ statement.setMeasurements(node.getMeasurements());
+ statement.setDataTypes(node.getDataTypes());
+ statement.setValues(node.getValues());
+ statement.setNeedInferType(true);
+ statement.setAligned(node.isAligned());
+ return statement;
+ }
+
+ if (insertNode instanceof InsertTabletNode) {
+ final InsertTabletNode node = (InsertTabletNode) insertNode;
+
+ final InsertTabletStatement statement = new InsertTabletStatement();
+ statement.setDevicePath(node.getDevicePath());
+ statement.setMeasurements(node.getMeasurements());
+ statement.setTimes(node.getTimes());
+ statement.setColumns(node.getColumns());
+ statement.setBitMaps(node.getBitMaps());
+ statement.setRowCount(node.getRowCount());
+ statement.setDataTypes(node.getDataTypes());
+ statement.setAligned(node.isAligned());
+ return statement;
+ }
+
+ throw new UnsupportedOperationException(
+ String.format(
+ "unknown InsertNode type %s when constructing statement from insert node.",
+ insertNode));
+ }
+
+ public static PipeTransferInsertNodeReq toTPipeTransferReq(InsertNode insertNode) {
+ final PipeTransferInsertNodeReq req = new PipeTransferInsertNodeReq();
+
+ req.insertNode = insertNode;
+
+ req.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+ req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
+ req.body = insertNode.serializeToByteBuffer();
+
+ return req;
+ }
+
+ public static PipeTransferInsertNodeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferInsertNodeReq insertNodeReq = new PipeTransferInsertNodeReq();
+
+ insertNodeReq.insertNode = (InsertNode) PlanNodeType.deserialize(transferReq.body);
+
+ insertNodeReq.version = transferReq.version;
+ insertNodeReq.type = transferReq.type;
+ insertNodeReq.body = transferReq.body;
+
+ return insertNodeReq;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index 9ed53ae310e..a805844f855 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.connector;
+package org.apache.iotdb.db.pipe.core.connector.manager;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index 33a298c5d02..06d9942f525 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -17,9 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.core.connector;
+package org.apache.iotdb.db.pipe.core.connector.manager;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftConnectorV1;
import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
@@ -46,15 +49,24 @@ public class PipeConnectorSubtaskManager {
new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
- // 1. construct, validate and customize PipeConnector
+ // TODO: construct all PipeConnector with the same reflection method, avoid using if-else
+ // 1. construct, validate and customize PipeConnector, and then handshake (create connection)
+ // with the target
final PipeConnector pipeConnector =
- PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+ pipeConnectorParameters
+ .getStringOrDefault(
+ PipeConnectorConstant.CONNECTOR_KEY,
+ BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+ .equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+ ? new IoTDBThriftConnectorV1()
+ : PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
try {
pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
final PipeConnectorRuntimeConfiguration runtimeConfiguration =
new PipeConnectorRuntimeConfiguration();
pipeConnector.customize(pipeConnectorParameters, runtimeConfiguration);
// TODO: use runtimeConfiguration to configure PipeConnector
+ pipeConnector.handshake();
} catch (Exception e) {
throw new PipeManagementException(
"Failed to construct PipeConnector, because of " + e.getMessage(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index e97e0da9b18..62c599b66c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -41,6 +41,10 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedE
this.referenceCount = new AtomicInteger(0);
}
+ public InsertNode getInsertNode() {
+ return insertNode;
+ }
+
@Override
public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
throw new UnsupportedOperationException("Not implemented yet");
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 1a460b6b0a2..65f78cebac2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.core.event.impl;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -28,15 +30,46 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
public class PipeTsFileInsertionEvent implements TsFileInsertionEvent, EnrichedEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
private File tsFile;
+ private final AtomicBoolean isClosed;
- public PipeTsFileInsertionEvent(File tsFile) {
- this.tsFile = tsFile;
+ public PipeTsFileInsertionEvent(TsFileResource resource) {
+ tsFile = resource.getTsFile();
+
+ isClosed = new AtomicBoolean(resource.isClosed());
+ // register close listener if TsFile is not closed
+ if (!isClosed.get()) {
+ final TsFileProcessor processor = resource.getProcessor();
+ if (processor != null) {
+ processor.addCloseFileListener(
+ o -> {
+ synchronized (isClosed) {
+ isClosed.set(true);
+ isClosed.notifyAll();
+ }
+ });
+ }
+ }
+ }
+
+ public void waitForTsFileClose() throws InterruptedException {
+ if (!isClosed.get()) {
+ synchronized (isClosed) {
+ while (!isClosed.get()) {
+ isClosed.wait();
+ }
+ }
+ }
+ }
+
+ public File getTsFile() {
+ return tsFile;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 29cbefa8d14..4c98c5193be 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -24,15 +24,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
-import java.io.File;
-
public class PipeRealtimeCollectEventFactory {
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager();
- public static PipeRealtimeCollectEvent createCollectEvent(File tsFile, TsFileResource resource) {
+ public static PipeRealtimeCollectEvent createCollectEvent(TsFileResource resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
- new PipeTsFileInsertionEvent(tsFile), resource);
+ new PipeTsFileInsertionEvent(resource), resource);
}
public static PipeRealtimeCollectEvent createCollectEvent(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 8bde5dda46c..a1f443a9d43 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.view.collector;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
-import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -65,7 +64,7 @@ public class PipeEventCollector implements EventCollector {
private synchronized void collect(Event event) {
if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event).increaseReferenceCount(PipeConnectorSubtask.class.getName());
+ ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
}
while (!bufferQueue.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index bf4f8daf2e1..2952476dbbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -29,13 +29,10 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.NotThreadSafe;
-
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-@NotThreadSafe
public abstract class PipeSubtaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutor.class);
@@ -71,7 +68,7 @@ public abstract class PipeSubtaskExecutor {
subtask.bindExecutors(subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor);
}
- public final void start(String subTaskID) {
+ public final synchronized void start(String subTaskID) {
if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
LOGGER.warn("The subtask {} is not registered.", subTaskID);
return;
@@ -79,7 +76,9 @@ public abstract class PipeSubtaskExecutor {
final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID);
if (subtask.isSubmittingSelf()) {
- LOGGER.info("The subtask {} is already running.", subTaskID);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("The subtask {} is already running.", subTaskID);
+ }
} else {
subtask.allowSubmittingSelf();
subtask.submitSelf();
@@ -87,7 +86,7 @@ public abstract class PipeSubtaskExecutor {
}
}
- public final void stop(String subTaskID) {
+ public final synchronized void stop(String subTaskID) {
if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
LOGGER.warn("The subtask {} is not registered.", subTaskID);
return;
@@ -96,7 +95,7 @@ public abstract class PipeSubtaskExecutor {
registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf();
}
- public final void deregister(String subTaskID) {
+ public final synchronized void deregister(String subTaskID) {
stop(subTaskID);
final PipeSubtask subtask = registeredIdSubtaskMapper.remove(subTaskID);
@@ -122,7 +121,7 @@ public abstract class PipeSubtaskExecutor {
/////////////////////// executor management ///////////////////////
- public final void shutdown() {
+ public final synchronized void shutdown() {
if (isShutdown()) {
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index b8257845266..447ae5bdf09 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -47,14 +47,14 @@ public class PipeBuilder {
final Map<TConsensusGroupId, PipeTask> consensusGroupIdToPipeTaskMap = new HashMap<>();
final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
- for (Map.Entry<TConsensusGroupId, PipeTaskMeta> consensusGroupIdPipeTaskMeta :
+ for (Map.Entry<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToPipeTaskMeta :
pipeRuntimeMeta.getConsensusGroupIdToTaskMetaMap().entrySet()) {
consensusGroupIdToPipeTaskMap.put(
- consensusGroupIdPipeTaskMeta.getKey(),
+ consensusGroupIdToPipeTaskMeta.getKey(),
new PipeTaskBuilder(
pipeName,
- Integer.toString(consensusGroupIdPipeTaskMeta.getValue().getRegionLeader()),
- // TODO: consensusGroupIdPipeTaskMeta.getValue().getProgressIndex() is not used
+ Integer.toString(consensusGroupIdToPipeTaskMeta.getKey().getId()),
+ // TODO: consensusGroupIdToPipeTaskMeta.getValue().getProgressIndex() is not used
collectorParameters,
processorParameters,
connectorParameters)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 31e935256ff..a4866e87e8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfig
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import java.util.HashMap;
+
public class PipeTaskCollectorStage extends PipeTaskStage {
private final PipeParameters collectorParameters;
@@ -52,19 +54,28 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
private final PipeCollector pipeCollector;
public PipeTaskCollectorStage(String dataRegionId, PipeParameters collectorParameters) {
- this.collectorParameters = collectorParameters;
- // set data region id to collector parameters, so that collector can get data region id inside
- // collector
- collectorParameters.getAttribute().put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
-
+ // TODO: avoid if-else, use reflection to create collector all the time
if (collectorParameters
.getStringOrDefault(
PipeCollectorConstant.COLLECTOR_KEY,
- BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())
- .equals(BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())) {
+ BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
+ .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())) {
+ // we want to pass data region id to collector, so we need to create a new collector
+ // parameters and put data region id into it. we can't put data region id into collector
+ // parameters directly, because the given collector parameters may be used by other pipe task.
+ this.collectorParameters =
+ new PipeParameters(new HashMap<>(collectorParameters.getAttribute()));
+ // set data region id to collector parameters, so that collector can get data region id inside
+ // collector
+ this.collectorParameters
+ .getAttribute()
+ .put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
+
collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
} else {
+ this.collectorParameters = collectorParameters;
+
this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 62cadbde87d..7f147b130bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.task.stage;
-import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
+import org.apache.iotdb.db.pipe.core.connector.manager.PipeConnectorSubtaskManager;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -34,17 +34,16 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
this.pipeConnectorParameters = pipeConnectorParameters;
- }
-
- @Override
- public void createSubtask() throws PipeException {
connectorSubtaskId =
PipeConnectorSubtaskManager.instance()
.register(
PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
- pipeConnectorParameters);
+ pipeConnectorParameters); // TODO: should split to create
}
+ @Override
+ public void createSubtask() throws PipeException {}
+
@Override
public void startSubtask() throws PipeException {
PipeConnectorSubtaskManager.instance().start(connectorSubtaskId);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 3ba031a23a4..6bbec4480c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -79,14 +79,18 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
pipeProcessor,
pipeConnectorOutputEventCollector);
+ final PipeTaskStage pipeTaskStage = this;
this.pipeCollectorInputPendingQueue =
pipeCollectorInputPendingQueue != null
? pipeCollectorInputPendingQueue
.registerEmptyToNotEmptyListener(
taskId,
() -> {
- if (status == PipeStatus.RUNNING) {
- executor.start(pipeProcessorSubtask.getTaskID());
+ // status can be changed by other threads calling pipeTaskStage's methods
+ synchronized (pipeTaskStage) {
+ if (status == PipeStatus.RUNNING) {
+ executor.start(pipeProcessorSubtask.getTaskID());
+ }
}
})
.registerNotEmptyToEmptyListener(
@@ -99,10 +103,13 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
.registerFullToNotFullListener(
taskId,
() -> {
- // only start when the pipe is running
- if (status == PipeStatus.RUNNING) {
- pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
- executor.start(pipeProcessorSubtask.getTaskID());
+ // status can be changed by other threads calling pipeTaskStage's methods
+ synchronized (pipeTaskStage) {
+ // only start when the pipe is running
+ if (status == PipeStatus.RUNNING) {
+ pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
+ executor.start(pipeProcessorSubtask.getTaskID());
+ }
}
});
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index d9f2ea84bc7..80f366eb096 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -53,7 +52,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
// TODO: for a while
@Override
- protected void executeForAWhile() {
+ protected synchronized void executeForAWhile() {
try {
// TODO: reduce the frequency of heartbeat
outputPipeConnector.heartbeat();
@@ -62,7 +61,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
"PipeConnector: failed to connect to the target system.", e);
}
- final Event event = inputPendingQueue.poll();
+ final Event event = lastEvent != null ? lastEvent : inputPendingQueue.poll();
+ // record this event for retrying on connection failure or other exceptions
+ lastEvent = event;
if (event == null) {
return;
}
@@ -82,15 +83,15 @@ public class PipeConnectorSubtask extends PipeSubtask {
throw new UnsupportedOperationException(
"Unsupported event type: " + event.getClass().getName());
}
+
+ releaseLastEvent();
+ } catch (PipeConnectionException e) {
+ throw e;
} catch (Exception e) {
e.printStackTrace();
throw new PipeException(
"Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
e);
- } finally {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event).decreaseReferenceCount(PipeConnectorSubtask.class.getName());
- }
}
}
@@ -125,7 +126,13 @@ public class PipeConnectorSubtask extends PipeSubtask {
MAX_RETRY_TIMES,
taskID);
lastFailedCause = throwable;
+
PipeAgent.runtime().report(this);
+
+ // although the pipe task will be stopped, we still don't release the last event here
+ // because we need to keep it for the next retry. if user wants to restart the task,
+ // the last event will be processed again. the last event will be released when the task
+ // is dropped or the process is running normally.
return;
}
}
@@ -135,9 +142,15 @@ public class PipeConnectorSubtask extends PipeSubtask {
}
@Override
- public void close() {
+ // synchronized for outputPipeConnector.close() and releaseLastEvent() in super.close()
+ // make sure that the lastEvent will not be updated after pipeProcessor.close() to avoid
+ // resource leak because of the lastEvent is not released.
+ public synchronized void close() {
try {
outputPipeConnector.close();
+
+ // should be called after outputPipeConnector.close()
+ super.close();
} catch (Exception e) {
e.printStackTrace();
LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 4d15d3a3528..feb584fcaff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.task.subtask;
-import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -52,8 +51,10 @@ public class PipeProcessorSubtask extends PipeSubtask {
}
@Override
- protected void executeForAWhile() throws Exception {
- Event event = inputEventSupplier.supply();
+ protected synchronized void executeForAWhile() throws Exception {
+ final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply();
+ // record the last event for retry when exception occurs
+ lastEvent = event;
if (event == null) {
return;
}
@@ -73,22 +74,26 @@ public class PipeProcessorSubtask extends PipeSubtask {
throw new UnsupportedOperationException(
"Unsupported event type: " + event.getClass().getName());
}
+
+ releaseLastEvent();
} catch (Exception e) {
e.printStackTrace();
throw new PipeException(
"Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.",
e);
- } finally {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event).decreaseReferenceCount(PipeProcessorSubtask.class.getName());
- }
}
}
@Override
- public void close() {
+ // synchronized for pipeProcessor.close() and releaseLastEvent() in super.close().
+ // make sure that the lastEvent will not be updated after pipeProcessor.close() to avoid
+ // resource leak because of the lastEvent is not released.
+ public synchronized void close() {
try {
pipeProcessor.close();
+
+ // should be called after pipeProcessor.close()
+ super.close();
} catch (Exception e) {
e.printStackTrace();
LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index e01c1390186..c770b983b9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -42,15 +44,14 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
private ListeningExecutorService subtaskWorkerThreadPoolExecutor;
private ExecutorService subtaskCallbackListeningExecutor;
-
private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
+ private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
protected static final int MAX_RETRY_TIMES = 5;
private final AtomicInteger retryCount = new AtomicInteger(0);
-
protected Throwable lastFailedCause;
- private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
+ protected Event lastEvent;
public PipeSubtask(String taskID) {
super();
@@ -95,7 +96,13 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
retryCount,
throwable);
lastFailedCause = throwable;
+
PipeAgent.runtime().report(this);
+
+ // although the pipe task will be stopped, we still don't release the last event here
+ // because we need to keep it for the next retry. if user wants to restart the task,
+ // the last event will be processed again. the last event will be released when the task
+ // is dropped or the process is running normally.
}
}
@@ -114,6 +121,7 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
}
public void allowSubmittingSelf() {
+ retryCount.set(0);
shouldStopSubmittingSelf.set(false);
}
@@ -125,6 +133,20 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
return !shouldStopSubmittingSelf.get();
}
+ @Override
+ public synchronized void close() {
+ releaseLastEvent();
+ }
+
+ protected void releaseLastEvent() {
+ if (lastEvent != null) {
+ if (lastEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent) lastEvent).decreaseReferenceCount(PipeSubtask.class.getName());
+ }
+ lastEvent = null;
+ }
+ }
+
public String getTaskID() {
return taskID;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index c5e26996703..28bec0979bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemp
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.quotas.DataNodeThrottleQuotaManager;
@@ -75,6 +76,8 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
@@ -2106,6 +2109,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return SyncService.getInstance().transportFile(metaInfo, buff);
}
+ @Override
+ public TPipeTransferResp pipeTransfer(TPipeTransferReq req) {
+ return PipeAgent.receiver().transfer(req, partitionFetcher, schemaFetcher);
+ }
+
@Override
public TSBackupConfigurationResp getBackupConfiguration() {
return new TSBackupConfigurationResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
@@ -2244,5 +2252,6 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
closeSession(req);
}
SyncService.getInstance().handleClientExit();
+ PipeAgent.receiver().handleClientExit();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/connector/PipeThriftRequestTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/connector/PipeThriftRequestTest.java
new file mode 100644
index 00000000000..a3b65a66bfd
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/connector/PipeThriftRequestTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class PipeThriftRequestTest {
+
+ private static final String TIME_PRECISION = "ms";
+
+ @Test
+ public void testPipeValidateHandshakeReq() throws IOException {
+ PipeTransferHandshakeReq req = PipeTransferHandshakeReq.toTPipeTransferReq(TIME_PRECISION);
+ PipeTransferHandshakeReq deserializeReq = PipeTransferHandshakeReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+ Assert.assertEquals(req.getTimestampPrecision(), deserializeReq.getTimestampPrecision());
+ }
+
+ @Test
+ public void testPipeTransferInsertNodeReq() {
+ PipeTransferInsertNodeReq req =
+ PipeTransferInsertNodeReq.toTPipeTransferReq(
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "sg", "d"}),
+ false,
+ new String[] {"s"},
+ new TSDataType[] {TSDataType.INT32},
+ 1,
+ new Object[] {1},
+ false));
+ PipeTransferInsertNodeReq deserializeReq = PipeTransferInsertNodeReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+ Assert.assertEquals(req.getInsertNode(), deserializeReq.getInsertNode());
+ }
+
+ @Test
+ public void testPipeTransferFilePieceReq() throws IOException {
+ byte[] body = "testPipeTransferFilePieceReq".getBytes();
+ String fileName = "1.tsfile";
+
+ PipeTransferFilePieceReq req = PipeTransferFilePieceReq.toTPipeTransferReq(fileName, 0, body);
+ PipeTransferFilePieceReq deserializeReq = PipeTransferFilePieceReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+ Assert.assertEquals(req.getFileName(), deserializeReq.getFileName());
+ Assert.assertEquals(req.getStartWritingOffset(), deserializeReq.getStartWritingOffset());
+ Assert.assertArrayEquals(req.getFilePiece(), deserializeReq.getFilePiece());
+ }
+
+ @Test
+ public void testPipeTransferFileSealReq() throws IOException {
+ String fileName = "1.tsfile";
+
+ PipeTransferFileSealReq req = PipeTransferFileSealReq.toTPipeTransferReq(fileName, 100);
+ PipeTransferFileSealReq deserializeReq = PipeTransferFileSealReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+ Assert.assertEquals(req.getFileName(), deserializeReq.getFileName());
+ Assert.assertEquals(req.getFileLength(), deserializeReq.getFileLength());
+ }
+
+ @Test
+ public void testPIpeTransferFilePieceResp() throws IOException {
+ PipeTransferFilePieceResp resp =
+ PipeTransferFilePieceResp.toTPipeTransferResp(RpcUtils.SUCCESS_STATUS, 100);
+ PipeTransferFilePieceResp deserializeResp =
+ PipeTransferFilePieceResp.fromTPipeTransferResp(resp);
+
+ Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus());
+ Assert.assertEquals(resp.getEndWritingOffset(), deserializeResp.getEndWritingOffset());
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 937b5ea676a..79871f857af 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -207,7 +207,14 @@ public enum TSStatusCode {
WRITE_SIZE_EXCEEDED(1705),
READ_SIZE_EXCEEDED(1706),
QUOTA_MEM_QUERY_NOT_ENOUGH(1707),
- QUERY_CPU_QUERY_NOT_ENOUGH(1708);
+ QUERY_CPU_QUERY_NOT_ENOUGH(1708),
+
+ // Pipe
+ PIPE_VERSION_ERROR(1800),
+ PIPE_TYPE_ERROR(1801),
+ PIPE_HANDSHAKE_ERROR(1802),
+ PIPE_TRANSFER_FILE_OFFSET_RESET(1803),
+ PIPE_TRANSFER_FILE_ERROR(1804);
private final int statusCode;
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 6779bbf89cc..8f6cf8e6297 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -455,6 +455,17 @@ struct TSyncTransportMetaInfo{
2:required i64 startIndex
}
+struct TPipeTransferReq {
+ 1:required i8 version
+ 2:required i16 type
+ 3:required binary body
+}
+
+struct TPipeTransferResp {
+ 1:required common.TSStatus status
+ 2:optional binary body
+}
+
struct TSBackupConfigurationResp {
1: required common.TSStatus status
2: optional bool enableOperationSync
@@ -595,6 +606,8 @@ service IClientRPCService {
common.TSStatus sendFile(1:TSyncTransportMetaInfo metaInfo, 2:binary buff);
+ TPipeTransferResp pipeTransfer(TPipeTransferReq req);
+
TSBackupConfigurationResp getBackupConfiguration();
TSConnectionInfoResp fetchAllConnectionsInfo();