You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/17 07:51:58 UTC
[iotdb] branch master updated: [IOTDB-4135] Merge thrift-sync into ClientRPC (#7004)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 42b00d63c1 [IOTDB-4135] Merge thrift-sync into ClientRPC (#7004)
42b00d63c1 is described below
commit 42b00d63c17a0f232fc97253f737c59bc48be27a
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Wed Aug 17 15:51:53 2022 +0800
[IOTDB-4135] Merge thrift-sync into ClientRPC (#7004)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 15 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 -
.../db/integration/sync/IoTDBSyncReceiverIT.java | 25 +--
node-commons/pom.xml | 5 -
.../apache/iotdb/commons/sync/SyncConstant.java | 6 -
.../apache/iotdb/commons/sync/SyncPathUtil.java | 6 +-
pom.xml | 1 -
.../resources/conf/iotdb-datanode.properties | 4 -
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 --
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 -
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 31 ----
.../org/apache/iotdb/db/qp/logical/Operator.java | 2 -
.../db/qp/logical/sys/ShowPipeServerOperator.java | 38 -----
.../db/qp/logical/sys/StartPipeServerOperator.java | 38 -----
.../db/qp/logical/sys/StopPipeServerOperator.java | 38 -----
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 8 -
.../db/qp/physical/sys/ShowPipeServerPlan.java | 26 ---
.../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 1 -
.../db/qp/physical/sys/StartPipeServerPlan.java | 56 -------
.../db/qp/physical/sys/StopPipeServerPlan.java | 56 -------
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 18 --
.../service/thrift/impl/ClientRPCServiceImpl.java | 23 +++
.../db/service/thrift/impl/TSServiceImpl.java | 21 +++
.../java/org/apache/iotdb/db/sync/SyncService.java | 127 ++++++--------
.../iotdb/db/sync/common/ISyncInfoFetcher.java | 8 -
.../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 28 ----
.../org/apache/iotdb/db/sync/common/SyncInfo.java | 33 +---
.../db/sync/common/persistence/SyncLogReader.java | 13 --
.../db/sync/common/persistence/SyncLogWriter.java | 14 --
.../db/sync/sender/service/TransportHandler.java | 4 +-
.../db/sync/transport/client/ClientWrapper.java | 25 ++-
...rtClient.java => IoTDBSinkTransportClient.java} | 52 +++---
...nsportServiceImpl.java => ReceiverManager.java} | 183 +++++++++++++--------
.../transport/server/TransportServerManager.java | 137 ---------------
.../server/TransportServerManagerMBean.java | 34 ----
.../server/TransportServerThriftHandler.java | 71 --------
.../iotdb/db/qp/physical/PhysicalPlanTest.java | 28 ----
.../db/sync/receiver/manager/SyncInfoTest.java | 1 -
.../db/sync/receiver/recovery/SyncLogTest.java | 4 -
...portServiceTest.java => SyncTransportTest.java} | 21 +--
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +
thrift-sync/README.md | 22 ---
thrift-sync/pom.xml | 62 -------
thrift-sync/rpc-changelist.md | 181 --------------------
thrift-sync/src/main/thrift/transport.thrift | 63 -------
thrift/src/main/thrift/client.thrift | 33 ++++
46 files changed, 310 insertions(+), 1279 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 18c4115d9c..71f210c4c9 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -65,8 +65,7 @@ utilityStatement
| loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile;
syncStatement
- : startPipeServer | stopPipeServer | showPipeServer
- | createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
+ : createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
| createPipe | showPipe | stopPipe | startPipe | dropPipe;
/**
@@ -764,18 +763,6 @@ syncAttributeClauses
: attributePair (COMMA attributePair)*
;
-// sync receiver
-startPipeServer
- : START PIPESERVER
- ;
-
-stopPipeServer
- : STOP PIPESERVER
- ;
-
-showPipeServer
- : SHOW PIPESERVER
- ;
/**
* 7. Common Clauses
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 5ad902f8e5..65efbcecf8 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -379,10 +379,6 @@ PIPES
: P I P E S
;
-PIPESERVER
- : P I P E S E R V E R
- ;
-
PIPESINK
: P I P E S I N K
;
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 016ccb84c7..67a4ab4618 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -21,19 +21,17 @@ package org.apache.iotdb.db.integration.sync;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.exception.sync.PipeServerException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
+import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -51,7 +49,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -71,7 +68,7 @@ public class IoTDBSyncReceiverIT {
String remoteIp1;
long createdTime1 = System.currentTimeMillis();
String showPipeSql = "SHOW PIPE";
- IoTDBSInkTransportClient client;
+ IoTDBSinkTransportClient client;
@Before
public void setUp() throws Exception {
@@ -94,15 +91,9 @@ public class IoTDBSyncReceiverIT {
FileUtils.moveDirectory(srcDir, tmpDir);
EnvironmentUtils.cleanEnv();
EnvironmentUtils.envSetUp();
- try {
- SyncService.getInstance().startPipeServer(true);
- new Socket("localhost", 6670).close();
- } catch (Exception e) {
- Assert.fail("Failed to start pipe server because " + e.getMessage());
- }
Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
remoteIp1 = "127.0.0.1";
- client = new IoTDBSInkTransportClient(pipe, remoteIp1, 6670, "127.0.0.1");
+ client = new IoTDBSinkTransportClient(pipe, remoteIp1, 6667, "127.0.0.1");
client.handshake();
}
@@ -120,16 +111,6 @@ public class IoTDBSyncReceiverIT {
EnvironmentUtils.cleanEnv();
}
- @Test
- public void testStopPipeServer() {
- logger.info("testStopPipeServerCheck");
- try {
- SyncService.getInstance().stopPipeServer();
- } catch (PipeServerException e) {
- Assert.fail("Can not stop pipe server");
- }
- }
-
@Test
public void testReceiveDataAndLoad() {
logger.info("testReceiveDataAndLoad");
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index 21ae448050..737ade8e06 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -77,11 +77,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-thrift</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift-confignode</artifactId>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
index 8abb84765f..bf9f4afb09 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
@@ -72,12 +72,6 @@ public class SyncConstant {
public static final int DATA_CHUNK_SIZE =
Math.min(16 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
- public static final int SUCCESS_CODE = 1;
- public static final int ERROR_CODE = -1;
- public static final int REBASE_CODE = -2;
- public static final int RETRY_CODE = -3;
- public static final int CONFLICT_CODE = -4;
-
/** receiver */
public static final String RECEIVER_DIR_NAME = "receiver";
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java
index 323cc635b0..f10fc43f2d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.commons.sync;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import java.io.File;
import java.io.IOException;
@@ -105,12 +105,12 @@ public class SyncPathUtil {
+ SyncConstant.FILE_DATA_DIR_NAME;
}
- public static String getFileDataDirPath(IdentityInfo identityInfo) {
+ public static String getFileDataDirPath(TSyncIdentityInfo identityInfo) {
return SyncPathUtil.getReceiverFileDataDir(
identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
}
- public static String getPipeLogDirPath(IdentityInfo identityInfo) {
+ public static String getPipeLogDirPath(TSyncIdentityInfo identityInfo) {
return SyncPathUtil.getReceiverPipeLogDir(
identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
}
diff --git a/pom.xml b/pom.xml
index 198403d10c..d0f831bdbe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,6 @@
<module>thrift-commons</module>
<module>thrift-confignode</module>
<module>thrift-multi-leader-consensus</module>
- <module>thrift-sync</module>
<module>thrift-influxdb</module>
<module>service-rpc</module>
<module>jdbc</module>
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 9a70ba7680..e24b1d9ed8 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -675,10 +675,6 @@ timestamp_precision=ms
####################
### PIPE Server Configuration
####################
-# PIPE server port to listen
-# Datatype: int
-# pipe_server_port=6670
-
# White IP list of Sync client.
# Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16
# If there are more than one IP segment, please separate them by commas
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 2aa2b771ac..39bbe10647 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -501,9 +501,6 @@ public class IoTDBConfig {
*/
private int externalSortThreshold = 1000;
- /** If this IoTDB instance is a receiver of sync, set the server port. */
- private int pipeServerPort = 6670;
-
/** White list for sync */
private String ipWhiteList = "0.0.0.0/0";
@@ -1441,14 +1438,6 @@ public class IoTDBConfig {
this.mRemoteSchemaCacheSize = mRemoteSchemaCacheSize;
}
- public int getPipeServerPort() {
- return pipeServerPort;
- }
-
- public void setPipeServerPort(int pipeServerPort) {
- this.pipeServerPort = pipeServerPort;
- }
-
public int getMaxNumberOfSyncFileRetry() {
return maxNumberOfSyncFileRetry;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index dec138deb1..dc0db70e5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -417,11 +417,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"session_timeout_threshold",
Integer.toString(conf.getSessionTimeoutThreshold()))));
- conf.setPipeServerPort(
- Integer.parseInt(
- properties
- .getProperty("pipe_server_port", Integer.toString(conf.getPipeServerPort()))
- .trim()));
conf.setMaxNumberOfSyncFileRetry(
Integer.parseInt(
properties
@@ -1417,10 +1412,6 @@ public class IoTDBDescriptor {
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
// update sync config
- conf.setPipeServerPort(
- Integer.parseInt(
- properties.getProperty(
- "pipe_server_port", String.valueOf(conf.getPipeServerPort()))));
conf.setMaxNumberOfSyncFileRetry(
Integer.parseInt(
properties
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 04f57311fb..96bdb1bc88 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -62,7 +62,6 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sync.PipeException;
-import org.apache.iotdb.db.exception.sync.PipeServerException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
@@ -133,7 +132,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowNodesInTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPathsSetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPathsUsingTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
@@ -433,10 +431,6 @@ public class PlanExecutor implements IPlanExecutor {
case DROP_PIPESINK:
dropPipeSink((DropPipeSinkPlan) plan);
return true;
- case START_PIPE_SERVER:
- return operateStartPipeServer();
- case STOP_PIPE_SERVER:
- return operateStopPipeServer();
case CREATE_PIPE:
createPipe((CreatePipePlan) plan);
return true;
@@ -451,24 +445,6 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- private boolean operateStopPipeServer() throws QueryProcessException {
- try {
- SyncService.getInstance().stopPipeServer();
- } catch (PipeServerException e) {
- throw new QueryProcessException(e);
- }
- return true;
- }
-
- private boolean operateStartPipeServer() throws QueryProcessException {
- try {
- SyncService.getInstance().startPipeServer(false);
- } catch (PipeServerException e) {
- throw new QueryProcessException(e);
- }
- return true;
- }
-
private boolean createTemplate(CreateTemplatePlan createTemplatePlan)
throws QueryProcessException {
try {
@@ -764,8 +740,6 @@ public class PlanExecutor implements IPlanExecutor {
return processShowPipeSink((ShowPipeSinkPlan) showPlan);
case PIPESINKTYPE:
return processShowPipeSinkType();
- case PIPESERVER:
- return processShowPipeServer((ShowPipeServerPlan) showPlan);
case PIPE:
return processShowPipes((ShowPipePlan) showPlan);
default:
@@ -773,10 +747,6 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) {
- return SyncService.getInstance().showPipeServer(plan);
- }
-
private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
int num =
getNodesNumInGivenLevel(
@@ -1344,7 +1314,6 @@ public class PlanExecutor implements IPlanExecutor {
TSDataType.TEXT,
TSDataType.TEXT));
SyncService.getInstance().showPipe(plan, listDataSet);
- SyncService.getInstance().showPipe(plan, listDataSet);
// sort by create time
listDataSet.sort(Comparator.comparing(o -> o.getFields().get(0).getStringValue()));
return listDataSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 55cf2828b6..48fad9c0a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -205,8 +205,6 @@ public abstract class Operator {
STOP_PIPE,
START_PIPE,
DROP_PIPE,
- START_PIPE_SERVER,
- STOP_PIPE_SERVER,
ACTIVATE_TEMPLATE_IN_CLUSTER
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
deleted file mode 100644
index 310c909692..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.logical.sys;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
-import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-
-public class ShowPipeServerOperator extends ShowOperator {
-
- public ShowPipeServerOperator(int tokenIntType) {
- super(tokenIntType);
- }
-
- @Override
- public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
- throws QueryProcessException {
- return new ShowPipeServerPlan();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java
deleted file mode 100644
index e23833a0dd..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.logical.sys;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
-import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-
-public class StartPipeServerOperator extends Operator {
- public StartPipeServerOperator(int tokenIntType) {
- super(tokenIntType);
- operatorType = OperatorType.START_PIPE_SERVER;
- }
-
- @Override
- public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
- throws QueryProcessException {
- return new StartPipeServerPlan();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java
deleted file mode 100644
index 76746c5fcb..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.logical.sys;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
-import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-
-public class StopPipeServerOperator extends Operator {
- public StopPipeServerOperator(int tokenIntType) {
- super(tokenIntType);
- operatorType = OperatorType.STOP_PIPE_SERVER;
- }
-
- @Override
- public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
- throws QueryProcessException {
- return new StopPipeServerPlan();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index d74059e946..e49be4304e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -70,9 +70,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
-import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
@@ -490,12 +488,6 @@ public abstract class PhysicalPlan implements IConsensusRequest {
case SET_SYSTEM_MODE:
plan = new SetSystemModePlan();
break;
- case START_PIPE_SERVER:
- plan = new StartPipeServerPlan();
- break;
- case STOP_PIPE_SERVER:
- plan = new StopPipeServerPlan();
- break;
case ACTIVATE_TEMPLATE_IN_CLUSTER:
plan = new ActivateTemplateInClusterPlan();
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
deleted file mode 100644
index 67b664c1b7..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-public class ShowPipeServerPlan extends ShowPlan {
-
- public ShowPipeServerPlan() {
- super(ShowContentType.PIPESERVER);
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index c7c7b7a019..0398a2f014 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -130,6 +130,5 @@ public class ShowPlan extends PhysicalPlan {
PIPESINK,
PIPESINKTYPE,
PIPE,
- PIPESERVER
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
deleted file mode 100644
index b3f2f8b4c6..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-public class StartPipeServerPlan extends PhysicalPlan {
-
- public StartPipeServerPlan() {
- super(Operator.OperatorType.START_PIPE_SERVER);
- canBeSplit = false;
- }
-
- @Override
- public List<? extends PartialPath> getPaths() {
- return Collections.emptyList();
- }
-
- @Override
- public void serialize(DataOutputStream stream) throws IOException {
- stream.writeByte((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
- }
-
- @Override
- public void serializeImpl(ByteBuffer buffer) {
- buffer.put((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
- }
-
- @Override
- public void deserialize(ByteBuffer buffer) throws IllegalPathException {}
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
deleted file mode 100644
index 2fe41ba880..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-public class StopPipeServerPlan extends PhysicalPlan {
-
- public StopPipeServerPlan() {
- super(Operator.OperatorType.STOP_PIPE_SERVER);
- canBeSplit = false;
- }
-
- @Override
- public List<? extends PartialPath> getPaths() {
- return Collections.emptyList();
- }
-
- @Override
- public void serialize(DataOutputStream stream) throws IOException {
- stream.writeByte((byte) PhysicalPlanType.STOP_PIPE_SERVER.ordinal());
- }
-
- @Override
- public void serializeImpl(ByteBuffer buffer) {
- buffer.put((byte) PhysicalPlanType.STOP_PIPE_SERVER.ordinal());
- }
-
- @Override
- public void deserialize(ByteBuffer buffer) throws IllegalPathException {}
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index f348f44e9e..fd3ce0b205 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -133,7 +133,6 @@ import org.apache.iotdb.db.qp.logical.sys.ShowOperator;
import org.apache.iotdb.db.qp.logical.sys.ShowPathsSetTemplateOperator;
import org.apache.iotdb.db.qp.logical.sys.ShowPathsUsingTemplateOperator;
import org.apache.iotdb.db.qp.logical.sys.ShowPipeOperator;
-import org.apache.iotdb.db.qp.logical.sys.ShowPipeServerOperator;
import org.apache.iotdb.db.qp.logical.sys.ShowPipeSinkOperator;
import org.apache.iotdb.db.qp.logical.sys.ShowPipeSinkTypeOperator;
import org.apache.iotdb.db.qp.logical.sys.ShowQueryResourceOperate;
@@ -143,10 +142,8 @@ import org.apache.iotdb.db.qp.logical.sys.ShowTemplatesOperator;
import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.ShowTriggersOperator;
import org.apache.iotdb.db.qp.logical.sys.StartPipeOperator;
-import org.apache.iotdb.db.qp.logical.sys.StartPipeServerOperator;
import org.apache.iotdb.db.qp.logical.sys.StartTriggerOperator;
import org.apache.iotdb.db.qp.logical.sys.StopPipeOperator;
-import org.apache.iotdb.db.qp.logical.sys.StopPipeServerOperator;
import org.apache.iotdb.db.qp.logical.sys.StopTriggerOperator;
import org.apache.iotdb.db.qp.logical.sys.UnSetTTLOperator;
import org.apache.iotdb.db.qp.logical.sys.UnloadFileOperator;
@@ -2531,21 +2528,6 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
}
}
- @Override
- public Operator visitStartPipeServer(IoTDBSqlParser.StartPipeServerContext ctx) {
- return new StartPipeServerOperator(SQLConstant.TOK_PIPE_SERVER_START);
- }
-
- @Override
- public Operator visitStopPipeServer(IoTDBSqlParser.StopPipeServerContext ctx) {
- return new StopPipeServerOperator(SQLConstant.TOK_PIPE_SERVER_STOP);
- }
-
- @Override
- public Operator visitShowPipeServer(IoTDBSqlParser.ShowPipeServerContext ctx) {
- return new ShowPipeServerOperator(SQLConstant.TOK_SHOW_PIPE_SERVER);
- }
-
/** 7. Common Clauses */
// IoTDB Objects
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 3377225280..f8511ce804 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.db.query.control.SessionTimeoutManager;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Operation;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -103,6 +104,8 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -111,6 +114,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
@@ -1462,6 +1466,24 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
throw new UnsupportedOperationException();
}
+ @Override
+ public TSStatus handshake(TSyncIdentityInfo info) throws TException {
+ // TODO(sync): Check permissions here
+ return SyncService.getInstance().handshake(info);
+ }
+
+ @Override
+ public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
+ throws TException {
+ return SyncService.getInstance().transportData(metaInfo, buff, digest);
+ }
+
+ @Override
+ public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+ throws TException {
+ return SyncService.getInstance().checkFileDigest(metaInfo, digest);
+ }
+
@Override
public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
long t1 = System.currentTimeMillis();
@@ -1550,6 +1572,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
closeSession(req);
}
+ SyncService.getInstance().handleClientExit();
}
private void cleanupQueryExecution(Long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index b32a23c28e..55aa206c5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Operation;
+import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -122,6 +123,8 @@ import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -1119,6 +1122,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
closeSession(req);
}
+ SyncService.getInstance().handleClientExit();
}
@Override
@@ -2103,6 +2107,23 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
return status != null ? status : executeNonQueryPlan(plan);
}
+ @Override
+ public TSStatus handshake(TSyncIdentityInfo info) throws TException {
+ return SyncService.getInstance().handshake(info);
+ }
+
+ @Override
+ public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
+ throws TException {
+ return SyncService.getInstance().transportData(metaInfo, buff, digest);
+ }
+
+ @Override
+ public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+ throws TException {
+ return SyncService.getInstance().checkFileDigest(metaInfo, digest);
+ }
+
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
try {
return serviceProvider.executeNonQuery(plan)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index f78978855d..46eaa93eab 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -23,18 +23,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.exception.sync.PipeException;
-import org.apache.iotdb.db.exception.sync.PipeServerException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
@@ -50,27 +47,26 @@ import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.sender.service.TransportHandler;
-import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
+import org.apache.iotdb.db.sync.transport.server.ReceiverManager;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.Collections;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESERVER_STATUS;
-
public class SyncService implements IService {
private static final Logger logger = LoggerFactory.getLogger(SyncService.class);
@@ -83,7 +79,12 @@ public class SyncService implements IService {
private ISyncInfoFetcher syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
- private SyncService() {}
+ /* handle rpc in receiver-side*/
+ private ReceiverManager receiverManager;
+
+ private SyncService() {
+ receiverManager = new ReceiverManager();
+ }
private static class SyncServiceHolder {
private static final SyncService INSTANCE = new SyncService();
@@ -95,6 +96,30 @@ public class SyncService implements IService {
return SyncServiceHolder.INSTANCE;
}
+ // region Interfaces and Implementation of Transport Layer
+
+ public TSStatus handshake(TSyncIdentityInfo identityInfo) {
+ return receiverManager.handshake(identityInfo);
+ }
+
+ public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
+ throws TException {
+ return receiverManager.transportData(metaInfo, buff, digest);
+ }
+
+ // TODO: this will be deleted later
+ public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+ throws TException {
+ return receiverManager.checkFileDigest(metaInfo, digest);
+ }
+
+ public void handleClientExit() {
+ // Handle client exit here.
+ receiverManager.handleClientExit();
+ }
+
+ // endregion
+
// region Interfaces and Implementation of PipeSink
public PipeSink getPipeSink(String name) {
@@ -121,60 +146,6 @@ public class SyncService implements IService {
// endregion
- // region Interfaces and Implementation of PipeServer
-
- /**
- * start receiver service
- *
- * @param isRecovery if isRecovery, it will ignore check and force a start
- */
- public synchronized void startPipeServer(boolean isRecovery) throws PipeServerException {
- if (syncInfoFetcher.isPipeServerEnable() && !isRecovery) {
- return;
- }
- try {
- TransportServerManager.getInstance().startService();
- TSStatus status = syncInfoFetcher.startPipeServer();
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeServerException("Failed to start pipe server because " + status.getMessage());
- }
- } catch (StartupException e) {
- throw new PipeServerException("Failed to start pipe server because " + e.getMessage());
- }
- }
-
- /** stop receiver service */
- public synchronized void stopPipeServer() throws PipeServerException {
- if (!syncInfoFetcher.isPipeServerEnable()) {
- return;
- }
- TransportServerManager.getInstance().stopService();
- TSStatus status = syncInfoFetcher.stopPipeServer();
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeServerException("Failed to stop pipe server because " + status.getMessage());
- }
- }
-
- /**
- * query by sql SHOW PIPESERVER STATUS
- *
- * @return QueryDataSet contained one column: enable
- */
- public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
- ListDataSet dataSet =
- new ListDataSet(
- Collections.singletonList(new PartialPath(COLUMN_PIPESERVER_STATUS, false)),
- Collections.singletonList(TSDataType.BOOLEAN));
- RowRecord rowRecord = new RowRecord(0);
- Field status = new Field(TSDataType.BOOLEAN);
- status.setBoolV(syncInfoFetcher.isPipeServerEnable());
- rowRecord.addField(status);
- dataSet.putRecord(rowRecord);
- return dataSet;
- }
-
- // endregion
-
// region Interfaces and Implementation of Pipe
public synchronized void addPipe(CreatePipePlan plan) throws PipeException {
@@ -334,6 +305,7 @@ public class SyncService implements IService {
public void showPipe(ShowPipePlan plan, ListDataSet listDataSet) {
boolean showAll = "".equals(plan.getPipeName());
+ // show pipe in sender
for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
if (showAll || plan.getPipeName().equals(pipe.getPipeName())) {
RowRecord record = new RowRecord(0);
@@ -376,7 +348,23 @@ public class SyncService implements IService {
listDataSet.putRecord(record);
}
}
- // TODO: implement show pipe in receiver
+ // show pipe in receiver
+ List<TSyncIdentityInfo> identityInfoList = receiverManager.getAllTSyncIdentityInfos();
+ for (TSyncIdentityInfo identityInfo : identityInfoList) {
+ // TODO(sync): Removing duplicate rows
+ RowRecord record = new RowRecord(0);
+ record.addField(
+ Binary.valueOf(DatetimeUtils.convertLongToDate(identityInfo.getCreateTime())),
+ TSDataType.TEXT);
+ record.addField(Binary.valueOf(identityInfo.getPipeName()), TSDataType.TEXT);
+ record.addField(Binary.valueOf(IoTDBConstant.SYNC_RECEIVER_ROLE), TSDataType.TEXT);
+ record.addField(Binary.valueOf(identityInfo.getAddress()), TSDataType.TEXT);
+ record.addField(Binary.valueOf(Pipe.PipeStatus.RUNNING.name()), TSDataType.TEXT);
+ record.addField(Binary.valueOf(""), TSDataType.TEXT);
+ record.addField(Binary.valueOf("N/A"), TSDataType.TEXT);
+ record.addField(Binary.valueOf("N/A"), TSDataType.TEXT);
+ listDataSet.putRecord(record);
+ }
}
// endregion
@@ -432,15 +420,6 @@ public class SyncService implements IService {
/** IService * */
@Override
public void start() throws StartupException {
- // recover receiver
- if (syncInfoFetcher.isPipeServerEnable()) {
- try {
- startPipeServer(true);
- } catch (PipeServerException e) {
- throw new StartupException(e.getMessage());
- }
- }
- // recover sender
// == Check whether loading extPipe plugin successfully.
ExtPipePluginRegister extPipePluginRegister = ExtPipePluginRegister.getInstance();
if (extPipePluginRegister == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index b1bf9e1c90..f13483444d 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -29,14 +29,6 @@ import java.util.List;
public interface ISyncInfoFetcher {
- // region Interfaces of PipeServer
- TSStatus startPipeServer();
-
- TSStatus stopPipeServer();
-
- boolean isPipeServerEnable();
- // endregion
-
// region Interfaces of PipeSink
// TODO: use CreatePipeSinkNode as parameter
TSStatus addPipeSink(CreatePipeSinkPlan plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 93e12d4c38..e74c6748ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -46,34 +46,6 @@ public class LocalSyncInfoFetcher implements ISyncInfoFetcher {
syncInfo = new SyncInfo();
}
- // region Interfaces of PipeServer
- @Override
- public TSStatus startPipeServer() {
- try {
- syncInfo.startServer();
- } catch (IOException e) {
- RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- }
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- }
-
- @Override
- public TSStatus stopPipeServer() {
- try {
- syncInfo.stopServer();
- } catch (IOException e) {
- RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- }
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- }
-
- @Override
- public boolean isPipeServerEnable() {
- return syncInfo.isPipeServerEnable();
- }
-
- // endregion
-
// region Implement of PipeSink
@Override
public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
index 8f00ca3ec9..936d4a7ea7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
@@ -47,7 +47,6 @@ public class SyncInfo {
protected static final Logger LOGGER = LoggerFactory.getLogger(SyncInfo.class);
- protected boolean pipeServerEnable;
// <pipeFolderName, pipeMsg>
protected Map<String, List<PipeMessage>> pipeMessageMap;
@@ -60,21 +59,19 @@ public class SyncInfo {
public SyncInfo() {
syncLogWriter = SyncLogWriter.getInstance();
- SyncLogReader analyzer = new SyncLogReader();
+ SyncLogReader logReader = new SyncLogReader();
try {
- analyzer.recover();
- pipeSinks = analyzer.getAllPipeSinks();
- pipes = analyzer.getAllPipeInfos();
- runningPipe = analyzer.getRunningPipeInfo();
- pipeServerEnable = analyzer.isPipeServerEnable();
- pipeMessageMap = analyzer.getPipeMessageMap();
+ logReader.recover();
+ pipeSinks = logReader.getAllPipeSinks();
+ pipes = logReader.getAllPipeInfos();
+ runningPipe = logReader.getRunningPipeInfo();
+ pipeMessageMap = logReader.getPipeMessageMap();
} catch (StartupException e) {
LOGGER.error(
"Cannot recover ReceiverInfo because {}. Use default info values.", e.getMessage());
pipeSinks = new ConcurrentHashMap<>();
pipes = new ArrayList<>();
pipeMessageMap = new ConcurrentHashMap<>();
- pipeServerEnable = false;
}
}
@@ -82,20 +79,6 @@ public class SyncInfo {
syncLogWriter.close();
}
- // region Implement of PipeServer
-
- public void startServer() throws IOException {
- pipeServerEnable = true;
- syncLogWriter.startPipeServer();
- }
-
- public void stopServer() throws IOException {
- pipeServerEnable = false;
- syncLogWriter.stopPipeServer();
- }
-
- // endregion
-
// region Implement of PipeSink
private boolean isPipeSinkExist(String name) {
return pipeSinks.containsKey(name);
@@ -280,10 +263,6 @@ public class SyncInfo {
return message;
}
- public boolean isPipeServerEnable() {
- return pipeServerEnable;
- }
-
private void createDir(String pipeName, String remoteIp, long createTime) {
File f = new File(SyncPathUtil.getReceiverFileDataDir(pipeName, remoteIp, createTime));
if (!f.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
index 66f8fe3a03..ac5cd39397 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
@@ -44,8 +44,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class SyncLogReader {
private static final Logger logger = LoggerFactory.getLogger(SyncLogReader.class);
- // record recovery result of receiver server status
- private boolean pipeServerEnable = false;
// <pipeFolderName, pipeMsg>
private Map<String, List<PipeMessage>> pipeMessageMap = new ConcurrentHashMap<>();
// <pipeSinkName, PipeSink>
@@ -56,7 +54,6 @@ public class SyncLogReader {
public void recover() throws StartupException {
logger.info("Start to recover all sync state for sync.");
this.pipeMessageMap = new ConcurrentHashMap<>();
- this.pipeServerEnable = false;
this.pipeSinks = new ConcurrentHashMap<>();
this.pipes = new ArrayList<>();
File serviceLogFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
@@ -86,10 +83,6 @@ public class SyncLogReader {
}
}
- public boolean isPipeServerEnable() {
- return pipeServerEnable;
- }
-
public Map<String, List<PipeMessage>> getPipeMessageMap() {
return pipeMessageMap;
}
@@ -149,12 +142,6 @@ public class SyncLogReader {
case DROP_PIPE:
runningPipe.drop();
break;
- case START_PIPE_SERVER:
- pipeServerEnable = true;
- break;
- case STOP_PIPE_SERVER:
- pipeServerEnable = false;
- break;
default:
throw new UnsupportedOperationException(
String.format("Can not recognize type %s.", type.name()));
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
index 8cda3e7669..3408d2d5ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
@@ -54,20 +54,6 @@ public class SyncLogWriter {
}
}
- public void startPipeServer() throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(Operator.OperatorType.START_PIPE_SERVER.name());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
- public void stopPipeServer() throws IOException {
- getBufferedWriter();
- pipeInfoWriter.write(Operator.OperatorType.STOP_PIPE_SERVER.name());
- pipeInfoWriter.newLine();
- pipeInfoWriter.flush();
- }
-
public synchronized void addPipeSink(CreatePipeSinkPlan plan) throws IOException {
getBufferedWriter();
pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPESINK.name());
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
index 6fb16770a1..d8f96ac28e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.transport.client.ITransportClient;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
+import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +63,7 @@ public class TransportHandler {
this.localIP = getLocalIP(pipeSink);
this.transportClient =
- new IoTDBSInkTransportClient(pipe, pipeSink.getIp(), pipeSink.getPort(), localIP);
+ new IoTDBSinkTransportClient(pipe, pipeSink.getIp(), pipeSink.getPort(), localIP);
}
private String getLocalIP(IoTDBPipeSink pipeSink) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
index 670c3b6ad6..b96d6b3ead 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.sync.transport.client;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -25,9 +26,9 @@ import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -38,14 +39,12 @@ import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iotdb.commons.sync.SyncConstant.SUCCESS_CODE;
-
public class ClientWrapper {
private static final Logger logger = LoggerFactory.getLogger(ClientWrapper.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private TTransport transport = null;
- private volatile TransportService.Client serviceClient = null;
+ private volatile IClientRPCService.Client serviceClient = null;
/* remote IP address*/
private final String ipAddress;
@@ -63,7 +62,7 @@ public class ClientWrapper {
this.localIP = localIP;
}
- public TransportService.Client getClient() {
+ public IClientRPCService.Client getClient() {
return serviceClient;
}
@@ -93,19 +92,19 @@ public class ClientWrapper {
} else {
protocol = new TBinaryProtocol(transport);
}
- serviceClient = new TransportService.Client(protocol);
+ serviceClient = new IClientRPCService.Client(protocol);
// Underlay socket open.
if (!transport.isOpen()) {
transport.open();
}
- IdentityInfo identityInfo =
- new IdentityInfo(
+ TSyncIdentityInfo identityInfo =
+ new TSyncIdentityInfo(
localIP, pipe.getName(), pipe.getCreateTime(), config.getIoTDBMajorVersion());
- TransportStatus status = serviceClient.handshake(identityInfo);
- if (status.code != SUCCESS_CODE) {
- logger.error("The receiver rejected the synchronization task because {}", status.msg);
+ TSStatus status = serviceClient.handshake(identityInfo);
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.error("The receiver rejected the synchronization task because {}", status.message);
return false;
}
} catch (TException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
rename to server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
index b68cd44cf4..14c5da4b50 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
@@ -19,6 +19,7 @@
*/
package org.apache.iotdb.db.sync.transport.client;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -29,9 +30,9 @@ import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.service.transport.thrift.MetaInfo;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
-import org.apache.iotdb.service.transport.thrift.Type;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportType;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -49,14 +50,11 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
-import static org.apache.iotdb.commons.sync.SyncConstant.REBASE_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.RETRY_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.SUCCESS_CODE;
import static org.apache.iotdb.db.sync.transport.conf.TransportConfig.isCheckFileDegistAgain;
-public class IoTDBSInkTransportClient implements ITransportClient {
+public class IoTDBSinkTransportClient implements ITransportClient {
- private static final Logger logger = LoggerFactory.getLogger(IoTDBSInkTransportClient.class);
+ private static final Logger logger = LoggerFactory.getLogger(IoTDBSinkTransportClient.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -80,7 +78,7 @@ public class IoTDBSInkTransportClient implements ITransportClient {
* @param port remote port
* @param localIP local ip address
*/
- public IoTDBSInkTransportClient(Pipe pipe, String ipAddress, int port, String localIP) {
+ public IoTDBSinkTransportClient(Pipe pipe, String ipAddress, int port, String localIP) {
RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
this.pipe = pipe;
this.ipAddress = ipAddress;
@@ -258,9 +256,10 @@ public class IoTDBSInkTransportClient implements ITransportClient {
messageDigest.reset();
messageDigest.update(buffer, 0, dataLength);
ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength);
- MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), position);
+ TSyncTransportMetaInfo metaInfo =
+ new TSyncTransportMetaInfo(TSyncTransportType.FILE, file.getName(), position);
- TransportStatus status = null;
+ TSStatus status = null;
int retryCount = 0;
while (true) {
retryCount++;
@@ -283,21 +282,21 @@ public class IoTDBSInkTransportClient implements ITransportClient {
break;
}
- if (status.code == REBASE_CODE) {
- position = Long.parseLong(status.msg);
+ if (status.code == TSStatusCode.SYNC_FILE_REBASE.getStatusCode()) {
+ position = Long.parseLong(status.message);
continue outer;
- } else if (status.code == RETRY_CODE) {
+ } else if (status.code == TSStatusCode.SYNC_FILE_RETRY.getStatusCode()) {
logger.info(
"Receiver failed to receive data from {} because {}, retry.",
file.getAbsoluteFile(),
- status.msg);
+ status.message);
continue outer;
- } else if (status.code != SUCCESS_CODE) {
+ } else if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
logger.info(
"Receiver failed to receive data from {} because {}, abort.",
file.getAbsoluteFile(),
- status.msg);
- throw new SyncConnectionException(status.msg);
+ status.message);
+ throw new SyncConnectionException(status.message);
} else { // Success
position += dataLength;
if (position >= limit) {
@@ -339,8 +338,10 @@ public class IoTDBSInkTransportClient implements ITransportClient {
}
}
- MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), 0);
- TransportStatus status;
+ TSyncTransportMetaInfo metaInfo =
+ new TSyncTransportMetaInfo(TSyncTransportType.FILE, file.getName(), 0);
+
+ TSStatus status;
int retryCount = 0;
while (true) {
@@ -364,7 +365,7 @@ public class IoTDBSInkTransportClient implements ITransportClient {
break;
}
- if (status.code != SUCCESS_CODE) {
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
logger.error("Digest check of tsfile {} failed, retry", file.getAbsoluteFile());
return false;
}
@@ -392,14 +393,15 @@ public class IoTDBSInkTransportClient implements ITransportClient {
messageDigest.update(buffer);
ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
- MetaInfo metaInfo =
- new MetaInfo(Type.findByValue(pipeData.getType().ordinal()), "fileName", 0);
- TransportStatus status =
+ TSyncTransportMetaInfo metaInfo =
+ new TSyncTransportMetaInfo(
+ TSyncTransportType.findByValue(pipeData.getType().ordinal()), "fileName", 0);
+ TSStatus status =
serviceClient
.getClient()
.transportData(metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
- if (status.code == SUCCESS_CODE) {
+ if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
break;
} else {
logger.error("Digest check of pipeData failed, retry");
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
rename to server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index b4eabd3b2d..2a970a67b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -19,6 +19,7 @@
*/
package org.apache.iotdb.db.sync.transport.server;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
@@ -27,11 +28,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
-import org.apache.iotdb.service.transport.thrift.MetaInfo;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
-import org.apache.iotdb.service.transport.thrift.Type;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportType;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -52,31 +53,42 @@ import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.DecimalFormat;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.iotdb.commons.sync.SyncConstant.CONFLICT_CODE;
import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
-import static org.apache.iotdb.commons.sync.SyncConstant.ERROR_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.REBASE_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.RETRY_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.SUCCESS_CODE;
-public class TransportServiceImpl implements TransportService.Iface {
- private static Logger logger = LoggerFactory.getLogger(TransportServiceImpl.class);
+/**
+ * This class is responsible for implementing the RPC processing on the receiver-side. It should
+ * only be accessed by the {@linkplain org.apache.iotdb.db.sync.SyncService}
+ */
+public class ReceiverManager {
+ private static Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String RECORD_SUFFIX = ".record";
private static final String PATCH_SUFFIX = ".patch";
- private final ThreadLocal<IdentityInfo> identityInfoThreadLocal;
- private final Map<IdentityInfo, Integer> identityInfoCounter;
- public TransportServiceImpl() {
- identityInfoThreadLocal = new ThreadLocal<>();
- identityInfoCounter = new ConcurrentHashMap<>();
+ // When the client abnormally exits, we can still know who to disconnect
+ private final ThreadLocal<Long> currentConnectionId;
+ // Record the remote message for every rpc connection
+ private final Map<Long, TSyncIdentityInfo> connectionIdToIdentityInfoMap;
+
+ // The sync connectionId is unique in one IoTDB instance.
+ private final AtomicLong connectionIdGenerator;
+
+ public ReceiverManager() {
+ currentConnectionId = new ThreadLocal<>();
+ connectionIdToIdentityInfoMap = new ConcurrentHashMap<>();
+ connectionIdGenerator = new AtomicLong();
}
+ // region Interfaces and Implementation of Index Checker
+
private class CheckResult {
boolean result;
String index;
@@ -138,21 +150,22 @@ public class TransportServiceImpl implements TransportService.Iface {
return new CheckResult(true, "0");
}
- @Override
- public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
+ // endregion
+
+ // region Interfaces and Implementation of RPC Handler
+
+ public TSStatus handshake(TSyncIdentityInfo identityInfo) {
logger.debug("Invoke handshake method from client ip = {}", identityInfo.address);
- identityInfoThreadLocal.set(identityInfo);
- identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 1 : v + 1);
// check ip address
if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
- return new TransportStatus(
- ERROR_CODE,
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPESERVER_ERROR,
"Sender IP is not in the white list of receiver IP and synchronization tasks are not allowed.");
}
// Version check
if (!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion())) {
- return new TransportStatus(
- ERROR_CODE,
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPESERVER_ERROR,
String.format(
"Version mismatch: the sender <%s>, the receiver <%s>",
identityInfo.version, config.getIoTDBVersion()));
@@ -161,7 +174,8 @@ public class TransportServiceImpl implements TransportService.Iface {
if (!new File(SyncPathUtil.getFileDataDirPath(identityInfo)).exists()) {
new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
}
- return new TransportStatus(SUCCESS_CODE, "");
+ createConnection(identityInfo);
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
/**
@@ -204,26 +218,29 @@ public class TransportServiceImpl implements TransportService.Iface {
return ipAddressBinary.equals(ipSegmentBinary);
}
- @Override
- public TransportStatus transportData(MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
- IdentityInfo identityInfo = identityInfoThreadLocal.get();
+ public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
+ throws TException {
+ TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
+ if (identityInfo == null) {
+ throw new TException("Thrift connection is not alive.");
+ }
logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
- Type type = metaInfo.type;
+ TSyncTransportType type = metaInfo.type;
String fileName = metaInfo.fileName;
long startIndex = metaInfo.startIndex;
// Check file start index valid
- if (type == Type.FILE) {
+ if (type == TSyncTransportType.FILE) {
try {
CheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex);
if (!result.isResult()) {
- return new TransportStatus(REBASE_CODE, result.getIndex());
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REBASE, result.getIndex());
}
} catch (IOException e) {
logger.error(e.getMessage());
- return new TransportStatus(ERROR_CODE, e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
}
}
@@ -234,23 +251,23 @@ public class TransportServiceImpl implements TransportService.Iface {
messageDigest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
logger.error(e.getMessage());
- return new TransportStatus(ERROR_CODE, e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
}
messageDigest.update(buff);
byte[] digestBytes = new byte[digest.capacity()];
digest.get(digestBytes);
if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
- return new TransportStatus(RETRY_CODE, "Data digest check error, retry.");
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_RETRY, "Data digest check error");
}
- if (type != Type.FILE) {
+ if (type != TSyncTransportType.FILE) {
buff.position(pos);
int length = buff.capacity();
byte[] byteArray = new byte[length];
buff.get(byteArray);
try {
PipeData pipeData = PipeData.createPipeData(byteArray);
- if (type == Type.TSFILE) {
+ if (type == TSyncTransportType.TSFILE) {
// Do with file
handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
}
@@ -264,10 +281,12 @@ public class TransportServiceImpl implements TransportService.Iface {
"Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber());
} catch (IOException | IllegalPathException e) {
logger.error("Pipe data transport error, {}", e.getMessage());
- return new TransportStatus(RETRY_CODE, "Data digest transport error " + e.getMessage());
+ return RpcUtils.getStatus(
+ TSStatusCode.SYNC_FILE_RETRY, "Data digest transport error " + e.getMessage());
} catch (PipeDataLoadException e) {
logger.error("Fail to load pipeData because {}.", e.getMessage());
- return new TransportStatus(ERROR_CODE, "Fail to load pipeData because " + e.getMessage());
+ return RpcUtils.getStatus(
+ TSStatusCode.SYNC_FILE_ERROR, "Fail to load pipeData because " + e.getMessage());
}
} else {
// Write buff to {file}.patch
@@ -290,15 +309,18 @@ public class TransportServiceImpl implements TransportService.Iface {
+ " is done.");
} catch (IOException e) {
logger.error(e.getMessage());
- return new TransportStatus(ERROR_CODE, e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
}
}
- return new TransportStatus(SUCCESS_CODE, "");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
- @Override
- public TransportStatus checkFileDigest(MetaInfo metaInfo, ByteBuffer digest) throws TException {
- IdentityInfo identityInfo = identityInfoThreadLocal.get();
+ public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+ throws TException {
+ TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
+ if (identityInfo == null) {
+ throw new TException("Thrift connection is not alive.");
+ }
logger.debug("Invoke checkFileDigest method from client ip = {}", identityInfo.address);
String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
synchronized (fileDir.intern()) {
@@ -308,7 +330,7 @@ public class TransportServiceImpl implements TransportService.Iface {
messageDigest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
logger.error(e.getMessage());
- return new TransportStatus(ERROR_CODE, e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, e.getMessage());
}
try (InputStream inputStream =
@@ -330,14 +352,14 @@ public class TransportServiceImpl implements TransportService.Iface {
localDigest,
digest);
new File(fileDir, fileName + RECORD_SUFFIX).delete();
- return new TransportStatus(CONFLICT_CODE, "File digest check error.");
+ return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, "File digest check error.");
}
} catch (IOException e) {
logger.error(e.getMessage());
- return new TransportStatus(ERROR_CODE, e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, e.getMessage());
}
- return new TransportStatus(SUCCESS_CODE, "");
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
}
@@ -349,25 +371,6 @@ public class TransportServiceImpl implements TransportService.Iface {
Files.move(tmpFile.toPath(), recordFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
- /**
- * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
- */
- public void handleClientExit() {
- // Handle client exit here.
- IdentityInfo identityInfo = identityInfoThreadLocal.get();
- if (identityInfo != null) {
- // if all connections exit, stop pipe
- identityInfoThreadLocal.remove();
- synchronized (identityInfoCounter) {
- identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 0 : v - 1);
- if (identityInfoCounter.get(identityInfo) == 0) {
- identityInfoCounter.remove(identityInfo);
- // TODO:发送端退出
- }
- }
- }
- }
-
/**
* handle when successfully receive tsFilePipeData. Rename .patch file and reset tsFilePipeData's
* path.
@@ -400,4 +403,50 @@ public class TransportServiceImpl implements TransportService.Iface {
String.format("Delete record file %s error, because %s.", recordFile.getPath(), e));
}
}
+
+ // endregion
+
+ // region Interfaces and Implementation of Connection Manager
+
+ /** Check if the connection is legally established by handshaking */
+ private boolean checkConnection() {
+ return currentConnectionId.get() != null;
+ }
+
+ /**
+ * Get current TSyncIdentityInfo
+ *
+ * @return null if connection has been exited
+ */
+ private TSyncIdentityInfo getCurrentTSyncIdentityInfo() {
+ Long id = currentConnectionId.get();
+ if (id != null) {
+ return connectionIdToIdentityInfoMap.get(id);
+ } else {
+ return null;
+ }
+ }
+
+ private void createConnection(TSyncIdentityInfo identityInfo) {
+ long connectionId = connectionIdGenerator.incrementAndGet();
+ currentConnectionId.set(connectionId);
+ connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
+ }
+
+ /**
+ * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
+ */
+ public void handleClientExit() {
+ if (checkConnection()) {
+ long id = currentConnectionId.get();
+ connectionIdToIdentityInfoMap.remove(id);
+ currentConnectionId.remove();
+ }
+ }
+
+ public List<TSyncIdentityInfo> getAllTSyncIdentityInfos() {
+ return new ArrayList<>(connectionIdToIdentityInfoMap.values());
+ }
+
+ // endregion
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java
deleted file mode 100644
index c0663f8d13..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport.server;
-
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.service.AbstractThriftServiceThread;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.service.ThriftService;
-import org.apache.iotdb.commons.service.ThriftServiceThread;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.service.metrics.MetricService;
-import org.apache.iotdb.db.service.metrics.enums.Metric;
-import org.apache.iotdb.db.service.metrics.enums.Tag;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransportServerManager extends ThriftService
- implements Runnable, TransportServerManagerMBean {
-
- private static final Logger logger = LoggerFactory.getLogger(TransportServerManager.class);
- private TransportServiceImpl serviceImpl;
-
- @Override
- public void run() {
- TransportServerManager serverManager = new TransportServerManager();
- try {
- serverManager.start();
- } catch (StartupException e) {
- e.printStackTrace();
- }
- }
-
- private static class ServiceManagerHolder {
- private static final TransportServerManager INSTANCE = new TransportServerManager();
- }
-
- public static TransportServerManager getInstance() {
- return TransportServerManager.ServiceManagerHolder.INSTANCE;
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.SYNC_RPC_SERVICE;
- }
-
- @Override
- public void initTProcessor() {
- initSyncedServiceImpl(null);
- serviceImpl = new TransportServiceImpl();
- processor = new TransportService.Processor<>(serviceImpl);
- }
-
- @Override
- public void initThriftServiceThread() {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- thriftServiceThread =
- new ThriftServiceThread(
- processor,
- getID().getName(),
- ThreadName.SYNC_CLIENT.getName(),
- config.getRpcAddress(),
- config.getPipeServerPort(),
- Integer.MAX_VALUE,
- config.getThriftServerAwaitTimeForStopService(),
- new TransportServerThriftHandler(serviceImpl),
- config.isRpcThriftCompressionEnable());
- thriftServiceThread.setName(ThreadName.SYNC_SERVER.getName());
- MetricService.getInstance()
- .getOrCreateAutoGauge(
- Metric.THRIFT_ACTIVE_THREADS.toString(),
- MetricLevel.CORE,
- thriftServiceThread,
- AbstractThriftServiceThread::getActiveThreadCount,
- Tag.NAME.toString(),
- ThreadName.SYNC_SERVER.getName());
- }
-
- @Override
- public String getBindIP() {
- // TODO: Whether to change this config here
- return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
- }
-
- @Override
- public int getBindPort() {
- // TODO: Whether to change this config here
- return IoTDBDescriptor.getInstance().getConfig().getPipeServerPort();
- }
-
- // @Override
- public int getRPCPort() {
- return getBindPort();
- }
-
- @Override
- public void startService() throws StartupException {
- // TODO: Whether to change this config here
- super.startService();
- }
-
- @Override
- public void stopService() {
- // TODO: Whether to change this config here
- super.stopService();
- }
-
- @TestOnly
- public static void main(String[] args) throws TTransportException, StartupException {
- logger.info("Transport server for testing only.");
- TransportServerManager serverManager = new TransportServerManager();
- serverManager.start();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManagerMBean.java
deleted file mode 100644
index 4559ea07f9..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManagerMBean.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport.server;
-
-import org.apache.iotdb.commons.exception.StartupException;
-
-public interface TransportServerManagerMBean {
- String getRPCServiceStatus();
-
- int getRPCPort();
-
- void startService() throws StartupException;
-
- void restartService() throws StartupException;
-
- void stopService();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerThriftHandler.java
deleted file mode 100644
index 1045a71c6b..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerThriftHandler.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport.server;
-
-import org.apache.iotdb.db.service.metrics.MetricService;
-import org.apache.iotdb.db.service.metrics.enums.Metric;
-import org.apache.iotdb.db.service.metrics.enums.Tag;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
-
-public class TransportServerThriftHandler implements TServerEventHandler {
-
- private TransportServiceImpl serviceImpl;
-
- public TransportServerThriftHandler(TransportServiceImpl serviceImpl) {
- this.serviceImpl = serviceImpl;
- }
-
- @Override
- public void preServe() {}
-
- @Override
- public ServerContext createContext(TProtocol input, TProtocol output) {
- MetricService.getInstance()
- .getOrCreateGauge(
- Metric.THRIFT_CONNECTIONS.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- "Transport")
- .incr(1L);
- return null;
- }
-
- @Override
- public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
- // release query resources.
- serviceImpl.handleClientExit();
- MetricService.getInstance()
- .getOrCreateGauge(
- Metric.THRIFT_CONNECTIONS.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- "Transport")
- .decr(1L);
- }
-
- @Override
- public void processContext(
- ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {}
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
index 9fce97b490..ec05de5d72 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SQLParserException;
import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -55,12 +54,9 @@ import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowContinuousQueriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
-import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
-import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.db.service.IoTDB;
@@ -1214,30 +1210,6 @@ public class PhysicalPlanTest {
Assert.assertEquals(ShowPlan.ShowContentType.TRIGGERS, plan.getShowContentType());
}
- @Test
- public void testShowPipeServer() throws QueryProcessException {
- String sql1 = "SHOW PIPESERVER";
- ShowPipeServerPlan plan1 = (ShowPipeServerPlan) processor.parseSQLToPhysicalPlan(sql1);
- Assert.assertTrue(plan1.isQuery());
- Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER, plan1.getShowContentType());
- }
-
- @Test
- public void testStartPipeServer() throws QueryProcessException {
- String sql = "START PIPESERVER";
- StartPipeServerPlan plan = (StartPipeServerPlan) processor.parseSQLToPhysicalPlan(sql);
- Assert.assertFalse(plan.isQuery());
- Assert.assertEquals(Operator.OperatorType.START_PIPE_SERVER, plan.getOperatorType());
- }
-
- @Test
- public void testStopPipeServer() throws QueryProcessException {
- String sql = "STOP PIPESERVER";
- StopPipeServerPlan plan = (StopPipeServerPlan) processor.parseSQLToPhysicalPlan(sql);
- Assert.assertFalse(plan.isQuery());
- Assert.assertEquals(OperatorType.STOP_PIPE_SERVER, plan.getOperatorType());
- }
-
@Test
public void testCreateCQ1() throws QueryProcessException {
String sql =
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
index f21190a275..fc68a92324 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
@@ -54,7 +54,6 @@ public class SyncInfoTest {
public void testOperatePipe() throws Exception {
SyncInfo syncInfo = new SyncInfo();
try {
- syncInfo.startServer();
CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo", "iotdb");
createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index ad15ad606b..0d53ecf0dc 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -64,15 +64,12 @@ public class SyncLogTest {
public void testServiceLog() {
try {
SyncLogWriter log = SyncLogWriter.getInstance();
- log.startPipeServer();
CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo", "iotdb");
createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
log.addPipeSink(createPipeSinkPlan);
log.addPipe(new CreatePipePlan(pipe1, "demo"), 1);
log.operatePipe(pipe1, Operator.OperatorType.DROP_PIPE);
- log.stopPipeServer();
- log.startPipeServer();
log.addPipe(new CreatePipePlan(pipe2, "demo"), 2);
log.operatePipe(pipe1, Operator.OperatorType.STOP_PIPE);
@@ -83,7 +80,6 @@ public class SyncLogTest {
List<PipeInfo> pipes = syncLogReader.getAllPipeInfos();
Map<String, PipeSink> allPipeSinks = syncLogReader.getAllPipeSinks();
PipeInfo runningPipe = syncLogReader.getRunningPipeInfo();
- Assert.assertTrue(syncLogReader.isPipeServerEnable());
Assert.assertEquals(1, allPipeSinks.size());
Assert.assertEquals(2, pipes.size());
Assert.assertEquals(pipe2, runningPipe.getPipeName());
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/transport/TransportServiceTest.java b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
similarity index 93%
rename from server/src/test/java/org/apache/iotdb/db/sync/transport/TransportServiceTest.java
rename to server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
index 33a6712908..afb932cf27 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/transport/TransportServiceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
@@ -31,8 +31,7 @@ import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
-import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
+import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
@@ -53,7 +52,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class TransportServiceTest {
+public class SyncTransportTest {
/** create tsfile and move to tmpDir for sync test */
File tmpDir = new File("target/synctest");
@@ -130,23 +129,17 @@ public class TransportServiceTest {
Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
pipeDataList.add(new DeletionPipeData(deletion, serialNum++));
- // 3. start server
- TransportServerManager.getInstance().startService();
-
- // 4. start client
+ // 3. start client
Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
- IoTDBSInkTransportClient client =
- new IoTDBSInkTransportClient(
- pipe,
- "127.0.0.1",
- IoTDBDescriptor.getInstance().getConfig().getPipeServerPort(),
- "127.0.0.1");
+ IoTDBSinkTransportClient client =
+ new IoTDBSinkTransportClient(
+ pipe, "127.0.0.1", IoTDBDescriptor.getInstance().getConfig().getRpcPort(), "127.0.0.1");
client.handshake();
for (PipeData pipeData : pipeDataList) {
client.senderTransport(pipeData);
}
- // 5. check result
+ // 4. check result
checkResult(
"select ** from root.vehicle",
new String[] {"Time", "root.vehicle.d0.s0"},
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b455746b09..7827dae848 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -70,6 +70,9 @@ public enum TSStatusCode {
MEASUREMENT_ALREADY_EXIST(338),
TEMPLATE_NOT_EXIST(339),
CREATE_TEMPLATE_ERROR(340),
+ SYNC_FILE_REBASE(341),
+ SYNC_FILE_RETRY(342),
+ SYNC_FILE_ERROR(343),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),
diff --git a/thrift-sync/README.md b/thrift-sync/README.md
deleted file mode 100644
index 94c1de6654..0000000000
--- a/thrift-sync/README.md
+++ /dev/null
@@ -1,22 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-This modules maintains all RPC interfaces for data synchronization among servers.
\ No newline at end of file
diff --git a/thrift-sync/pom.xml b/thrift-sync/pom.xml
deleted file mode 100644
index 28958a86e8..0000000000
--- a/thrift-sync/pom.xml
+++ /dev/null
@@ -1,62 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-parent</artifactId>
- <version>0.14.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <artifactId>iotdb-thrift-sync</artifactId>
- <name>rpc-thrift-sync</name>
- <description>RPC (Thrift) framework among servers for data synchronization.</description>
- <dependencies>
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.2.0</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.build.directory}/generated-sources/thrift</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/thrift-sync/rpc-changelist.md b/thrift-sync/rpc-changelist.md
deleted file mode 100644
index 797c0b6767..0000000000
--- a/thrift-sync/rpc-changelist.md
+++ /dev/null
@@ -1,181 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-# 0.11.x(version-2) -> 0.12.x(version-1)
-
-Last Updated on 2021.01.19 by Xiangwei Wei.
-
-
-## 1. Delete Old
-
-| Latest Changes | Related Committers |
-| ---------------------------------- | ------------------ |
-
-
-## 2. Add New
-
-| Latest Changes | Related Committers |
-| ------------------------------------------------------------ | ---------------------- |
-| Add timeout in TSFetchResultsReq and TSExecuteStatementReq | Xiangwei Wei |
-
-
-## 3. Update
-
-| Latest Changes | Related Committers |
-| ------------------------------------------------------------ | ---------------------- |
-
-
-# 0.10.x (version-2) -> 0.11.x (version-3)
-
-Last Updated on 2020-10-27 by Xiangwei Wei.
-
-
-## 1. Delete Old
-
-| Latest Changes | Related Committers |
-| ---------------------------------- | ------------------ |
-| Remove TSBatchExecuteStatementResp | Tian Jiang |
-
-
-## 2. Add New
-
-| Latest Changes | Related Committers |
-| ------------------------------------------------------------ | ---------------------- |
-| set the input/output as TFramedTransport | Tian Jiang |
-| Add timeout(optional) in TSFetchResultsReq and TSExecuteStatementReq | Xiangwei Wei |
-
-
-## 3. Update
-
-| Latest Changes | Related Committers |
-| ------------------------------------------------------------ | ---------------------- |
-| Add sub-status in TSStatus | Tian Jiang |
-| Change the result of executeBatchStatement as TSStatus | Tian Jiang |
-| Change TSDeleteDataReq, delete timestamp and add startTime and endTime | Wei Shao |
-| Add zoneId in TSOpenSessionReq | Xiangwei Wei |
-
-
-# 0.9.x (version-1) -> 0.10.x (version-2)
-
-Last Updated on 2020-5-25 by Kaifeng Xue.
-
-
-## 1. Delete Old
-
-| Latest Changes | Related Committers |
-| ---------------------------------- | ------------------ |
-| Remove TS_SessionHandle,TSHandleIdentifier | Tian Jiang |
-| Remove TSStatus,TSExecuteInsertRowInBatchResp | Jialin Qiao|
-
-
-## 2. Add New
-
-| Latest Changes | Related Committers |
-| ------------------------------------------------------------ | ---------------------------------- |
-| Add parameter sessionId in getTimeZone, getProperties, setStorageGroup, createTimeseries... | Tian Jiang|
-| Add struct TSQueryNonAlignDataSet | Haonan Hou|
-| Add struct TSInsertTabletsReq | Jialin Qiao|
-| Add method insertTablets | Jialin Qiao|
-| Add method testInsertTablets | Xiangdong Huang |
-| add new field `inferType` in TSInsertRecordReq | Jialin Qiao |
-
-## 3. Update
-
-| Latest Changes | Related Committers |
-| ------------------------------------------------------------ | ---------------------- |
-| Replace TS_SessionHandles with SessionIds, TSOperationHandle with queryIds | Tian Jiang |
-| Add optional TSQueryNonAlignDataSet in TSExecuteStatementResp, TSFetchResultsResp and required bool isAlign in TSFetchResultsReq | Haonan Hou |
-| Rename TSStatusType to TSStatus | Jialin Qiao |
-| Remove sessionId in TSExecuteBatchStatementResp | Jialin Qiao |
-| Rename insertRows to insertReords, insert to insertRecord, insertBatch to insertTablet | Jialin Qiao |
-| Use TsDataType and binary rather than string in TSInsertInBatchReq and TSInsertReq | Kaifeng Xue |
-
-
-
-# 0.8.x -> 0.9.x (version-1)
-
-Last Updated on 2019-10-27 by Lei Rui.
-
-
-## 1. Delete Old
-
-| Latest Changes | Related Committers |
-| ---------------------------------- | ------------------ |
-| Delete struct TSSetStorageGroupReq | Jialin Qiao |
-| Remove struct TSDataValue | Lei Rui |
-| Remove struct TSRowRecord | Lei Rui |
-| Remove optional string version in TSFetchMetadataResp | Genius_pig |
-| Remove optional set<string> childPaths, nodesList, storageGroups, devices in TSFetchMetadataResp | Genius_pig |
-| Remove optional map<string, string> nodeTimeseriesNum in TSFetchMetadataResp | Genius_pig |
-| Remove optional list<list<string>> timeseriesList in TSFetchMetadataResp | Genius_pig |
-| Remove optinoal optional i32 timeseriesNum in TSFetchMetadataResp | Genius_pig |
-| Remove optional i32 nodeLevel in TSFetchMetadataReq | Genius_pig |
-
-
-## 2. Add New
-
-| Latest Changes | Related Committers |
-| ------------------------------------------------------------ | ---------------------------------- |
-| Add struct TSBatchInsertionReq | qiaojialin |
-| Add method TSExecuteBatchStatementResp insertBatch(1:TSBatchInsertionReq req) | qiaojialin |
-| Add Struct TSStatusType | Zesong Sun |
-| Add TSCreateTimeseriesReq | Zesong Sun |
-| Add method TSStatus setStorageGroup(1:string storageGroup) | Zesong Sun, Jialin Qiao |
-| Add method TSStatus createTimeseries(1:TSCreateTimeseriesReq req) | Zesong Sun |
-| Add struct TSInsertReq | qiaojialin |
-| Add method TSRPCResp insertRow(1:TSInsertReq req) | qiaojialin |
-| Add struct TSDeleteDataReq | Jack Tsai, qiaojialin |
-| Add method TSStatus deleteData(1:TSDeleteDataReq req) | Jack Tsai, Jialin Qiao, qiaojialin |
-| Add method TSStatus deleteTimeseries(1:list\<string> path) | qiaojialin |
-| Add method TSStatus deleteStorageGroups(1:list\<string> storageGroup) | Yi Tao |
-| Add Struct TSExecuteInsertRowInBatchResp | Kaifeng Xue |
-| Add method insertRowInBatch(1:TSInsertInBatchReq req); | Kaifeng Xue |
-| Add method testInsertRowInBatch(1:TSInsertInBatchReq req); | Kaifeng Xue |
-| Add method testInsertRow(1:TSInsertReq req); | Kaifeng Xue |
-| Add method testInsertBatch(1:TSBatchInsertionReq req); | Kaifeng Xue |
-| Add struct TSCreateMultiTimeseriesReq | qiaojialin |
-| Add method createMultiTimeseries(1:TSCreateMultiTimeseriesReq req); | qiaojialin |
-
-
-## 3. Update
-
-| Latest Changes | Related Committers |
-| ------------------------------------------------------------ | ---------------------- |
-| Add required string timestampPrecision in ServerProperties | 1160300922 |
-| Add optional list\<string\> dataTypeList in TSExecuteStatementResp | suyue |
-| Update TSStatus to use TSStatusType, instead of using ~~TS_StatusCode, errorCode and errorMessage~~ | Zesong Sun |
-| Rename item in enum TSProtocolVersion from ~~TSFILE_SERVICE_PROTOCOL_V1~~ to IOTDB_SERVICE_PROTOCOL_V1 | qiaojialin |
-| Rename method name from ~~TSExecuteStatementResp executeInsertion(1:TSInsertionReq req)~~ to TSExecuteStatementResp insert(1:TSInsertionReq req) | qiaojialin |
-| Add required i32 compressor in TSCreateTimeseriesReq | Jialin Qiao |
-| Add optional list\<string> nodesList, optional map\<string, string> nodeTimeseriesNum in TSFetchMetadataResp | jack870131 |
-| Add optional i32 nodeLevel in TSFetchMetadataReq | jack870131, Zesong Sun |
-| Change the following methods' returned type to be TSStatus: <br />TSStatus closeSession(1:TSCloseSessionReq req), <br />TSStatus cancelOperation(1:TSCancelOperationReq req), <br />TSStatus closeOperation(1:TSCloseOperationReq req), <br />TSStatus setTimeZone(1:TSSetTimeZoneReq req), <br />TSStatus setStorageGroup(1:string storageGroup), <br />TSStatus createTimeseries(1:TSCreateTimeseriesReq req), <br />TSStatus insertRow(1:TSInsertReq req), <br />TSStatus deleteData(1:TSDeleteDataReq [...]
-| Change from ~~required string path~~ to required list\<string> paths in TSDeleteDataReq | qiaojialin |
-| Add optional set\<string> devices in TSFetchMetadataResp | Zesong Sun |
-| Rename some fields in TSFetchMetadataResp: ~~ColumnsList~~ to columnsList, ~~showTimeseriesList~~ to timeseriesList, ~~showStorageGroups~~ to storageGroups | Zesong Sun |
-| Change struct TSQueryDataSet to eliminate row-wise rpc writing | Lei Rui |
-| Add optional i32 timeseriesNum in TSFetchMetadataResp | Jack Tsai |
-| Add required i64 queryId in TSHandleIdentifier | Yuan Tian |
-| Add optional set\<string> childPaths in TSFetchMetadataResp | Haonan Hou |
-| Add optional string version in TSFetchMetadataResp | Genius_pig |
-| Add required i64 statementId in TSExecuteStatementReq | Yuan Tian |
-| Add required binary time, required list<binary> valueList, required list<binary> bitmapList and remove required binary values, required i32 rowCount in TSQueryDataSet| Yuan Tian |
-| Add optional i32 fetchSize in TSExecuteStatementReq,<br />Add optional TSQueryDataSet in TSExecuteStatementResp| liutaohua |
-| Add optional map<string, string> props, optional map<string, string> tags, optional map<string, string> attributes and optional string aliasPath in TSCreateTimeseriesReq | Yuan Tian |
diff --git a/thrift-sync/src/main/thrift/transport.thrift b/thrift-sync/src/main/thrift/transport.thrift
deleted file mode 100644
index 8218a7813a..0000000000
--- a/thrift-sync/src/main/thrift/transport.thrift
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-namespace java org.apache.iotdb.service.transport.thrift
-namespace py iotdb.thrift.transport
-
-struct TransportStatus{
- 1:required i32 code
- 2:required string msg
-}
-
-// The sender and receiver need to check some info to confirm validity
-struct IdentityInfo{
- // Check whether the ip of sender is in the white list of receiver.
- 1:required string address
-
- // Sender needs to tell receiver its identity.
- 2:required string pipeName
- 3:required i64 createTime
-
- // The version of sender and receiver need to be the same.
- 4:required string version
-
-}
-
-enum Type {
- TSFILE,
- DELETION,
- PHYSICALPLAN,
- FILE
-}
-
-struct MetaInfo{
- // The type of the pipeData in sending.
- 1:required Type type
-
- // The name of the file in sending.
- 2:required string fileName
-
- // The start index of the file slice in sending.
- 3:required i64 startIndex
-}
-
-service TransportService{
- TransportStatus handshake(IdentityInfo info);
- TransportStatus transportData(1:MetaInfo metaInfo, 2:binary buff, 3:binary digest);
- TransportStatus checkFileDigest(1:MetaInfo metaInfo, 2:binary digest);
-}
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index f2ecbeedea..014bda77dd 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -409,6 +409,33 @@ struct TSDropSchemaTemplateReq {
2: required string templateName
}
+// The sender and receiver need to check some info to confirm validity
+struct TSyncIdentityInfo{
+ // Check whether the ip of sender is in the white list of receiver.
+ 1:required string address
+ // Sender needs to tell receiver its identity.
+ 2:required string pipeName
+ 3:required i64 createTime
+ // The version of sender and receiver need to be the same.
+ 4:required string version
+}
+
+enum TSyncTransportType {
+ TSFILE,
+ DELETION,
+ PHYSICALPLAN,
+ FILE
+}
+
+struct TSyncTransportMetaInfo{
+ // The type of the pipeData in sending.
+ 1:required TSyncTransportType type
+ // The name of the file in sending.
+ 2:required string fileName
+ // The start index of the file slice in sending.
+ 3:required i64 startIndex
+}
+
service IClientRPCService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
@@ -499,4 +526,10 @@ service IClientRPCService {
common.TSStatus unsetSchemaTemplate(1:TSUnsetSchemaTemplateReq req);
common.TSStatus dropSchemaTemplate(1:TSDropSchemaTemplateReq req);
+
+ common.TSStatus handshake(TSyncIdentityInfo info);
+
+ common.TSStatus transportData(1:TSyncTransportMetaInfo metaInfo, 2:binary buff, 3:binary digest);
+
+ common.TSStatus checkFileDigest(1:TSyncTransportMetaInfo metaInfo, 2:binary digest);
}
\ No newline at end of file