You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/11 03:39:59 UTC
[iotdb] branch master updated: [IOTDB-4908] Clear deprecated code related to sync in old standalone (#7965)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 600e376bce [IOTDB-4908] Clear deprecated code related to sync in old standalone (#7965)
600e376bce is described below
commit 600e376bce5faf70bf379d44936fd45d33e08fe6
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Nov 11 11:39:52 2022 +0800
[IOTDB-4908] Clear deprecated code related to sync in old standalone (#7965)
---
.../db/integration/sync/IoTDBSyncReceiverIT.java | 59 -----------
.../sync/IoTDBSyncReceiverLoaderIT.java | 76 +-------------
.../db/integration/sync/IoTDBSyncSenderIT.java | 54 ++--------
.../apache/iotdb/commons/conf/IoTDBConstant.java | 15 ---
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 3 +-
.../sys/sync/CreatePipeSinkStatement.java | 7 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 77 ---------------
.../db/qp/logical/sys/CreatePipeOperator.java | 12 +--
.../db/qp/logical/sys/CreatePipeSinkOperator.java | 11 +--
.../iotdb/db/qp/logical/sys/DropPipeOperator.java | 4 +-
.../db/qp/logical/sys/DropPipeSinkOperator.java | 4 +-
.../iotdb/db/qp/logical/sys/ShowPipeOperator.java | 4 +-
.../db/qp/logical/sys/ShowPipeSinkOperator.java | 4 +-
.../qp/logical/sys/ShowPipeSinkTypeOperator.java | 4 +-
.../iotdb/db/qp/logical/sys/StartPipeOperator.java | 4 +-
.../iotdb/db/qp/logical/sys/StopPipeOperator.java | 4 +-
.../iotdb/db/qp/physical/sys/CreatePipePlan.java | 110 ---------------------
.../db/qp/physical/sys/CreatePipeSinkPlan.java | 98 ------------------
.../iotdb/db/qp/physical/sys/DropPipeSinkPlan.java | 45 ---------
.../iotdb/db/qp/physical/sys/OperatePipePlan.java | 45 ---------
.../iotdb/db/qp/physical/sys/ShowPipePlan.java | 33 -------
.../iotdb/db/qp/physical/sys/ShowPipeSinkPlan.java | 33 -------
.../db/qp/physical/sys/ShowPipeSinkTypePlan.java | 26 -----
.../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 3 -
.../java/org/apache/iotdb/db/sync/SyncService.java | 77 ---------------
.../db/sync/common/ClusterSyncInfoFetcher.java | 6 --
.../iotdb/db/sync/common/ISyncInfoFetcher.java | 3 -
.../apache/iotdb/db/sync/common/LocalSyncInfo.java | 10 --
.../iotdb/db/sync/common/LocalSyncInfoFetcher.java | 10 --
.../apache/iotdb/db/sync/pipedata/PipeData.java | 4 -
.../iotdb/db/sync/pipedata/SchemaPipeData.java | 100 -------------------
.../iotdb/db/sync/receiver/load/SchemaLoader.java | 71 -------------
.../iotdb/db/sync/receiver/load/TsFileLoader.java | 12 ---
.../apache/iotdb/db/utils/sync/SyncPipeUtil.java | 37 -------
.../db/sync/persistence/LocalSyncInfoTest.java | 16 ++-
35 files changed, 45 insertions(+), 1036 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index e07f0e2a86..876dc775ff 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -22,23 +22,14 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.transport.client.IoTDBSyncClient;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.commons.io.FileUtils;
import org.junit.After;
@@ -50,8 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
@Category({LocalStandaloneTest.class})
@@ -121,54 +110,6 @@ public class IoTDBSyncReceiverIT {
// 2. send pipe data
int serialNum = 0;
- List<PhysicalPlan> planList = new ArrayList<>();
- planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
- planList.add(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d0.s0"),
- new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
- planList.add(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d0.s1"),
- new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
- planList.add(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d1.s2"),
- new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
- planList.add(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d1.s3"),
- new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
- planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
- planList.add(
- new CreateAlignedTimeSeriesPlan(
- new PartialPath("root.sg1.d1"),
- Arrays.asList("s1", "s2", "s3", "s4", "s5"),
- Arrays.asList(
- TSDataType.FLOAT,
- TSDataType.INT32,
- TSDataType.INT64,
- TSDataType.BOOLEAN,
- TSDataType.TEXT),
- Arrays.asList(
- TSEncoding.RLE,
- TSEncoding.GORILLA,
- TSEncoding.RLE,
- TSEncoding.RLE,
- TSEncoding.PLAIN),
- Arrays.asList(
- CompressionType.SNAPPY,
- CompressionType.SNAPPY,
- CompressionType.SNAPPY,
- CompressionType.SNAPPY,
- CompressionType.SNAPPY),
- null,
- null,
- null));
- planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
- for (PhysicalPlan plan : planList) {
- client.send(new SchemaPipeData(plan, serialNum++));
- }
List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
SyncTestUtil.renameTsFiles(tsFiles);
for (File f : tsFiles) {
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
index 69a4e0763a..b2bf607184 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
@@ -21,20 +21,11 @@ package org.apache.iotdb.db.integration.sync;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.sync.receiver.load.DeletionLoader;
import org.apache.iotdb.db.sync.receiver.load.ILoader;
-import org.apache.iotdb.db.sync.receiver.load.SchemaLoader;
import org.apache.iotdb.db.sync.receiver.load.TsFileLoader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.commons.io.FileUtils;
import org.junit.After;
@@ -46,8 +37,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
@Category({LocalStandaloneTest.class})
@@ -96,62 +85,7 @@ public class IoTDBSyncReceiverLoaderIT {
EnvironmentUtils.cleanEnv();
EnvironmentUtils.envSetUp();
- // 2. test for SchemaLoader
- List<PhysicalPlan> planList = new ArrayList<>();
- planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
- planList.add(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d0.s0"),
- new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
- planList.add(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d0.s1"),
- new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
- planList.add(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d1.s2"),
- new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
- planList.add(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d1.s3"),
- new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
- planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
- planList.add(
- new CreateAlignedTimeSeriesPlan(
- new PartialPath("root.sg1.d1"),
- Arrays.asList("s1", "s2", "s3", "s4", "s5"),
- Arrays.asList(
- TSDataType.FLOAT,
- TSDataType.INT32,
- TSDataType.INT64,
- TSDataType.BOOLEAN,
- TSDataType.TEXT),
- Arrays.asList(
- TSEncoding.RLE,
- TSEncoding.GORILLA,
- TSEncoding.RLE,
- TSEncoding.RLE,
- TSEncoding.PLAIN),
- Arrays.asList(
- CompressionType.SNAPPY,
- CompressionType.SNAPPY,
- CompressionType.SNAPPY,
- CompressionType.SNAPPY,
- CompressionType.SNAPPY),
- null,
- null,
- null));
- for (PhysicalPlan plan : planList) {
- ILoader planLoader = new SchemaLoader(plan);
- try {
- planLoader.load();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- // 3. test for TsFileLoader
+ // 2. test for TsFileLoader
List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
for (File tsfile : tsFiles) {
ILoader tsFileLoader = new TsFileLoader(tsfile, "root.vehicle");
@@ -163,7 +97,7 @@ public class IoTDBSyncReceiverLoaderIT {
}
}
- // 4. test for DeletionPlanLoader
+ // 3. test for DeletionLoader
Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
ILoader deletionLoader = new DeletionLoader(deletion);
try {
@@ -173,8 +107,8 @@ public class IoTDBSyncReceiverLoaderIT {
Assert.fail();
}
- // 5. check result after loading
- // 5.1 check normal timeseries
+ // 4. check result after loading
+ // 4.1 check normal timeseries
String sql1 = "select * from root.vehicle.*";
String[] retArray1 =
new String[] {
@@ -190,7 +124,7 @@ public class IoTDBSyncReceiverLoaderIT {
"root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
};
SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
- // 5.2 check aligned timeseries
+ // 4.2 check aligned timeseries
String sql2 = "select * from root.sg1.d1";
String[] retArray2 =
new String[] {
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index a0de7ddd86..772b4ed4e9 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -22,12 +22,10 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkTypePlan;
import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
@@ -68,8 +66,6 @@ public class IoTDBSyncSenderIT {
private final Map<String, List<PipeData>> resultMap = new HashMap<>();
private static final TsFilePipeData simpleTsFilePipeData =
new TsFilePipeData("path", "tsfile", 0L);
- private static final SchemaPipeData simpleSchemaPipeData =
- new SchemaPipeData(new ShowPipeSinkTypePlan(), 0L);
private static final DeletionPipeData simpleDeletionPipeData =
new DeletionPipeData(new Deletion(new PartialPath(), 0L, 0L), 0L);
@@ -105,33 +101,6 @@ public class IoTDBSyncSenderIT {
EnvironmentUtils.cleanEnv();
}
- private void prepareSchema() throws Exception { // 8 schema plans
- try (Connection connection =
- DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
- statement.execute("set storage group to root.sg1");
- statement.execute("set storage group to root.sg2");
- statement.execute("create timeseries root.sg1.d1.s1 with datatype=int32, encoding=PLAIN");
- statement.execute("create timeseries root.sg1.d1.s2 with datatype=float, encoding=RLE");
- statement.execute("create timeseries root.sg1.d1.s3 with datatype=TEXT, encoding=PLAIN");
- statement.execute("create timeseries root.sg1.d2.s4 with datatype=int64, encoding=PLAIN");
- statement.execute("create timeseries root.sg2.d1.s0 with datatype=int32, encoding=PLAIN");
- statement.execute("create timeseries root.sg2.d2.s1 with datatype=boolean, encoding=PLAIN");
- }
-
- List<PipeData> resultList = new ArrayList<>();
- for (int i = 0; i < 4; i++) {
- resultList.add(simpleSchemaPipeData);
- }
- resultMap.put("schemaWithDel3InHistory", resultList); // del3 in history
-
- resultList = new ArrayList<>();
- for (int i = 0; i < 8; i++) {
- resultList.add(simpleSchemaPipeData);
- }
- resultMap.put("schema", resultList); // del3 do not in history
- }
-
private void prepareIns1() throws Exception { // add one seq tsfile in sg1
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -238,9 +207,6 @@ public class IoTDBSyncSenderIT {
for (int i = 0; i < 3; i++) {
resultList.add(simpleDeletionPipeData);
}
- for (int i = 0; i < 2; i++) {
- resultList.add(simpleSchemaPipeData);
- }
resultMap.put("del3", resultList);
}
@@ -288,9 +254,6 @@ public class IoTDBSyncSenderIT {
private void checkInsOnlyResult(List<PipeData> list) { // check ins1, ins2, ins3
Assert.assertEquals(13, list.size());
- for (int i = 0; i < 8; i++) {
- Assert.assertTrue(list.get(i) instanceof SchemaPipeData);
- }
for (int i = 9; i < list.size(); i++) {
Assert.assertTrue(list.get(i) instanceof TsFilePipeData);
}
@@ -313,7 +276,7 @@ public class IoTDBSyncSenderIT {
@Test
public void testHistoryInsert() {
try {
- prepareSchema(); // history
+ // history
prepareIns1();
prepareIns2();
prepareIns3();
@@ -340,7 +303,7 @@ public class IoTDBSyncSenderIT {
@Test
public void testHistoryAndRealTimeInsert() {
try {
- prepareSchema(); // history
+ // history
prepareIns1();
prepareIns2();
@@ -368,7 +331,7 @@ public class IoTDBSyncSenderIT {
@Test
public void testStopAndStartInsert() {
try {
- prepareSchema(); // history
+ // history
prepareIns1();
preparePipeAndSetMock(); // realtime
@@ -399,7 +362,6 @@ public class IoTDBSyncSenderIT {
try {
preparePipeAndSetMock(); // realtime
startPipe();
- prepareSchema();
prepareIns1();
stopPipe();
prepareIns2();
@@ -426,7 +388,6 @@ public class IoTDBSyncSenderIT {
@Test
public void testHistoryDel() {
try {
- prepareSchema(); // history
prepareIns1();
prepareIns2();
prepareIns3();
@@ -459,7 +420,6 @@ public class IoTDBSyncSenderIT {
@Ignore
public void testRealtimeDel() {
try {
- prepareSchema(); // history
prepareIns1();
preparePipeAndSetMock(); // realtime
@@ -494,7 +454,7 @@ public class IoTDBSyncSenderIT {
@Test
public void testRestartWhileRunning() {
try {
- prepareSchema(); // history
+ // history
prepareIns1();
prepareIns2();
@@ -522,7 +482,7 @@ public class IoTDBSyncSenderIT {
@Test
public void testRestartWhileStopping() {
try {
- prepareSchema(); // history
+ // history
prepareIns1();
preparePipeAndSetMock(); // realtime
@@ -552,7 +512,7 @@ public class IoTDBSyncSenderIT {
@Test
public void testRestartWithDel() {
try {
- prepareSchema(); // history
+ // history
prepareIns1();
prepareDel1();
@@ -589,7 +549,7 @@ public class IoTDBSyncSenderIT {
@Test
public void testRestartWithUnsealedTsFile() {
try {
- prepareSchema(); // history
+ // history
prepareIns1();
prepareIns2();
prepareDel1();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 9245336015..aa71c1acb4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -179,24 +179,9 @@ public class IoTDBConstant {
public static final String COLUMN_TRIGGER_STATUS_STARTED = "started";
public static final String COLUMN_TRIGGER_STATUS_STOPPED = "stopped";
- // sync module
- // TODO(sync): delete this in new-standalone version
- public static final String COLUMN_PIPESERVER_STATUS = "enable";
- public static final String COLUMN_PIPESINK_NAME = "name";
- public static final String COLUMN_PIPESINK_TYPE = "type";
- public static final String COLUMN_PIPESINK_ATTRIBUTES = "attributes";
- public static final String COLUMN_PIPE_NAME = "name";
- public static final String COLUMN_PIPE_CREATE_TIME = "create time";
- public static final String COLUMN_PIPE_ROLE = "role";
- public static final String COLUMN_PIPE_REMOTE = "remote";
- public static final String COLUMN_PIPE_STATUS = "status";
- public static final String COLUMN_PIPE_MSG = "message";
-
public static final String ONE_LEVEL_PATH_WILDCARD = "*";
public static final String MULTI_LEVEL_PATH_WILDCARD = "**";
public static final String TIME = "time";
- public static final String SYNC_SENDER_ROLE = "sender";
- public static final String SYNC_RECEIVER_ROLE = "receiver";
// sdt parameters
public static final String LOSS = "loss";
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 46eb80eadc..1ce8194f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -3200,8 +3200,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
@Override
public Statement visitCreatePipeSink(IoTDBSqlParser.CreatePipeSinkContext ctx) {
- CreatePipeSinkStatement createPipeSinkStatement =
- new CreatePipeSinkStatement(StatementType.CREATE_PIPESINK);
+ CreatePipeSinkStatement createPipeSinkStatement = new CreatePipeSinkStatement();
if (ctx.pipeSinkName != null) {
createPipeSinkStatement.setPipeSinkName(parseIdentifier(ctx.pipeSinkName.getText()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeSinkStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeSinkStatement.java
index 1ce63010f0..2659505985 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeSinkStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/sync/CreatePipeSinkStatement.java
@@ -41,8 +41,8 @@ public class CreatePipeSinkStatement extends Statement implements IConfigStateme
private Map<String, String> attributes;
- public CreatePipeSinkStatement(StatementType createPipeSinkStatement) {
- this.statementType = createPipeSinkStatement;
+ public CreatePipeSinkStatement() {
+ this.statementType = StatementType.CREATE_PIPESINK;
}
public String getPipeSinkName() {
@@ -90,8 +90,7 @@ public class CreatePipeSinkStatement extends Statement implements IConfigStateme
throw new IOException(
"Parsing CreatePipeSinkStatement error. Attributes is less than expected.");
}
- CreatePipeSinkStatement createPipeSinkStatement =
- new CreatePipeSinkStatement(StatementType.CREATE_PIPESINK);
+ CreatePipeSinkStatement createPipeSinkStatement = new CreatePipeSinkStatement();
createPipeSinkStatement.setPipeSinkName(split[0]);
createPipeSinkStatement.setPipeSinkType(split[1]);
int size = (Integer.parseInt(split[2]) << 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index fa78332e35..7488f0d5f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.udf.UDFInformation;
import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
@@ -67,8 +66,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowNodesInTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPathsSetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPathsUsingTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
@@ -85,7 +82,6 @@ import org.apache.iotdb.db.query.dataset.SingleDataSet;
import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.query.executor.QueryRouter;
import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -106,7 +102,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -126,15 +121,6 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ITEM;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESINK_ATTRIBUTES;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESINK_NAME;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESINK_TYPE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_CREATE_TIME;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_MSG;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_NAME;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_REMOTE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_ROLE;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPE_STATUS;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PRIVILEGE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_ROLE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_SCHEMA_TEMPLATE;
@@ -295,12 +281,6 @@ public class PlanExecutor implements IPlanExecutor {
return processShowPathsSetSchemaTemplate((ShowPathsSetTemplatePlan) showPlan);
case PATHS_USING_SCHEMA_TEMPLATE:
return processShowPathsUsingSchemaTemplate((ShowPathsUsingTemplatePlan) showPlan);
- case PIPESINK:
- return processShowPipeSink((ShowPipeSinkPlan) showPlan);
- case PIPESINKTYPE:
- return processShowPipeSinkType();
- case PIPE:
- return processShowPipes((ShowPipePlan) showPlan);
default:
throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
}
@@ -767,63 +747,6 @@ public class PlanExecutor implements IPlanExecutor {
listDataSet.putRecord(rowRecord);
}
- private QueryDataSet processShowPipeSink(ShowPipeSinkPlan plan) {
- ListDataSet listDataSet =
- new ListDataSet(
- Arrays.asList(
- new PartialPath(COLUMN_PIPESINK_NAME, false),
- new PartialPath(COLUMN_PIPESINK_TYPE, false),
- new PartialPath(COLUMN_PIPESINK_ATTRIBUTES, false)),
- Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
- boolean showAll = "".equals(plan.getPipeSinkName());
- for (PipeSink pipeSink : SyncService.getInstance().getAllPipeSink()) {
- if (showAll || plan.getPipeSinkName().equals(pipeSink.getPipeSinkName())) {
- RowRecord record = new RowRecord(0);
- record.addField(Binary.valueOf(pipeSink.getPipeSinkName()), TSDataType.TEXT);
- record.addField(Binary.valueOf(pipeSink.getType().name()), TSDataType.TEXT);
- record.addField(Binary.valueOf(pipeSink.showAllAttributes()), TSDataType.TEXT);
- listDataSet.putRecord(record);
- }
- }
- return listDataSet;
- }
-
- private QueryDataSet processShowPipeSinkType() {
- ListDataSet listDataSet =
- new ListDataSet(
- Arrays.asList(new PartialPath(COLUMN_PIPESINK_TYPE, false)),
- Arrays.asList(TSDataType.TEXT));
- for (PipeSink.PipeSinkType pipeSinkType : PipeSink.PipeSinkType.values()) {
- RowRecord record = new RowRecord(0);
- record.addField(Binary.valueOf(pipeSinkType.name()), TSDataType.TEXT);
- listDataSet.putRecord(record);
- }
- return listDataSet;
- }
-
- private QueryDataSet processShowPipes(ShowPipePlan plan) {
- ListDataSet listDataSet =
- new ListDataSet(
- Arrays.asList(
- new PartialPath(COLUMN_PIPE_CREATE_TIME, false),
- new PartialPath(COLUMN_PIPE_NAME, false),
- new PartialPath(COLUMN_PIPE_ROLE, false),
- new PartialPath(COLUMN_PIPE_REMOTE, false),
- new PartialPath(COLUMN_PIPE_STATUS, false),
- new PartialPath(COLUMN_PIPE_MSG, false)),
- Arrays.asList(
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT,
- TSDataType.TEXT));
- SyncService.getInstance().showPipe(plan, listDataSet);
- // sort by create time
- listDataSet.sort(Comparator.comparing(o -> o.getFields().get(0).getStringValue()));
- return listDataSet;
- }
-
// high Cognitive Complexity
protected QueryDataSet processAuthorQuery(AuthorPlan plan) throws QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java
index 44dc6ee3d7..911f8e5057 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java
@@ -23,13 +23,12 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
+// TODO: remove this
public class CreatePipeOperator extends Operator {
private String pipeName;
private String pipeSinkName;
@@ -57,13 +56,6 @@ public class CreatePipeOperator extends Operator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- CreatePipePlan plan = new CreatePipePlan(pipeName, pipeSinkName);
- plan.setDataStartTimestamp(startTime);
- Iterator<Map.Entry<String, String>> iterator = pipeAttributes.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<String, String> entry = iterator.next();
- plan.addPipeAttribute(entry.getKey(), entry.getValue());
- }
- return plan;
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java
index d0de62f325..bdf90daabf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java
@@ -23,13 +23,12 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
+// TODO: remove this
public class CreatePipeSinkOperator extends Operator {
private String pipeSinkName;
private String pipeSinkType;
@@ -50,12 +49,6 @@ public class CreatePipeSinkOperator extends Operator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- CreatePipeSinkPlan plan = new CreatePipeSinkPlan(pipeSinkName, pipeSinkType);
- Iterator<Map.Entry<String, String>> iterator = pipeSinkAttributes.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<String, String> entry = iterator.next();
- plan.addPipeSinkAttribute(entry.getKey(), entry.getValue());
- }
- return plan;
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java
index ea98dc017a..7cc8e5b4e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.OperatePipePlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+// TODO: remove this
public class DropPipeOperator extends Operator {
private String pipeName;
@@ -39,6 +39,6 @@ public class DropPipeOperator extends Operator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- return new OperatePipePlan(pipeName, operatorType);
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java
index 9341e06b56..6ea8e0bed1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.DropPipeSinkPlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+// TODO: remove this
public class DropPipeSinkOperator extends Operator {
private String pipeSinkName;
@@ -38,6 +38,6 @@ public class DropPipeSinkOperator extends Operator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- return new DropPipeSinkPlan(pipeSinkName);
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java
index b6070eea7a..5f77a1b3aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.qp.logical.sys;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+// TODO: remove this
public class ShowPipeOperator extends ShowOperator {
private String pipeName;
@@ -40,6 +40,6 @@ public class ShowPipeOperator extends ShowOperator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- return new ShowPipePlan(pipeName);
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java
index 7a528c1c5a..4554e38754 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.qp.logical.sys;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkPlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+// TODO: remove this
public class ShowPipeSinkOperator extends ShowOperator {
private String pipeSinkName;
@@ -40,6 +40,6 @@ public class ShowPipeSinkOperator extends ShowOperator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- return new ShowPipeSinkPlan(pipeSinkName);
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java
index 0ff83f1d52..429a96dece 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.qp.logical.sys;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkTypePlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+// TODO: remove this
public class ShowPipeSinkTypeOperator extends ShowOperator {
public ShowPipeSinkTypeOperator() {
super(SQLConstant.TOK_SHOW_PIPESINKTYPE, OperatorType.SHOW_PIPESINKTYPE);
@@ -33,6 +33,6 @@ public class ShowPipeSinkTypeOperator extends ShowOperator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- return new ShowPipeSinkTypePlan();
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java
index d471122d41..d4c0ceca63 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.OperatePipePlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+// TODO: remove this
public class StartPipeOperator extends Operator {
private String pipeName;
@@ -39,6 +39,6 @@ public class StartPipeOperator extends Operator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- return new OperatePipePlan(pipeName, operatorType);
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java
index c76500d46c..0a2a33a645 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.OperatePipePlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+// TODO: remove this
public class StopPipeOperator extends Operator {
private String pipeName;
@@ -39,6 +39,6 @@ public class StopPipeOperator extends Operator {
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
- return new OperatePipePlan(pipeName, operatorType);
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
deleted file mode 100644
index aac5972d96..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class CreatePipePlan extends PhysicalPlan {
- private String pipeName;
- private String pipeSinkName;
- private long dataStartTimestamp;
- private List<Pair<String, String>> pipeAttributes;
-
- public CreatePipePlan(String pipeName, String pipeSinkName) {
- super(Operator.OperatorType.CREATE_PIPE);
- this.pipeName = pipeName;
- this.pipeSinkName = pipeSinkName;
- dataStartTimestamp = 0;
- pipeAttributes = new ArrayList<>();
- }
-
- public void setDataStartTimestamp(long dataStartTimestamp) {
- this.dataStartTimestamp = dataStartTimestamp;
- }
-
- public void addPipeAttribute(String attr, String value) {
- pipeAttributes.add(new Pair<>(attr, value));
- }
-
- public String getPipeName() {
- return pipeName;
- }
-
- public String getPipeSinkName() {
- return pipeSinkName;
- }
-
- public long getDataStartTimestamp() {
- return dataStartTimestamp;
- }
-
- public List<Pair<String, String>> getPipeAttributes() {
- return pipeAttributes;
- }
-
- @Override
- public List<? extends PartialPath> getPaths() {
- return Collections.emptyList();
- }
-
- public static CreatePipePlan parseString(String parsedString) throws IOException {
- String[] attributes = parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- if (attributes.length < 4) {
- throw new IOException("Parsing CreatePipePlan error. Attributes is less than expected.");
- }
- CreatePipePlan plan = new CreatePipePlan(attributes[0], attributes[1]);
- plan.setDataStartTimestamp(Long.parseLong(attributes[2]));
- int size = (Integer.parseInt(attributes[3]) << 1);
- if (attributes.length != (size + 4)) {
- throw new IOException("Parsing CreatePipePlan error. Attributes number is wrong.");
- }
- for (int i = 0; i < size; i += 2) {
- plan.addPipeAttribute(attributes[i + 4], attributes[i + 5]);
- }
- return plan;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append(pipeName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- builder.append(dataStartTimestamp).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- builder.append(pipeAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- for (int i = 0; i < pipeAttributes.size(); i++) {
- builder
- .append(pipeAttributes.get(i).left)
- .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- builder
- .append(pipeAttributes.get(i).right)
- .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- }
- return builder.toString();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
deleted file mode 100644
index 61b0b36e65..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class CreatePipeSinkPlan extends PhysicalPlan {
- private String pipeSinkName;
- private String pipeSinkType;
- private List<Pair<String, String>> pipeSinkAttributes;
-
- public CreatePipeSinkPlan(String pipeSinkName, String pipeSinkType) {
- super(Operator.OperatorType.CREATE_PIPESINK);
- this.pipeSinkName = pipeSinkName;
- this.pipeSinkType = pipeSinkType;
- pipeSinkAttributes = new ArrayList<>();
- }
-
- public void addPipeSinkAttribute(String attr, String value) {
- pipeSinkAttributes.add(new Pair<>(attr, value));
- }
-
- public String getPipeSinkName() {
- return pipeSinkName;
- }
-
- public String getPipeSinkType() {
- return pipeSinkType;
- }
-
- public List<Pair<String, String>> getPipeSinkAttributes() {
- return pipeSinkAttributes;
- }
-
- @Override
- public List<? extends PartialPath> getPaths() {
- return Collections.emptyList();
- }
-
- public static CreatePipeSinkPlan parseString(String parsedString) throws IOException {
- String[] attributes = parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- if (attributes.length < 3) {
- throw new IOException("Parsing CreatePipeSinkPlan error. Attributes is less than expected.");
- }
- CreatePipeSinkPlan plan = new CreatePipeSinkPlan(attributes[0], attributes[1]);
- int size = (Integer.parseInt(attributes[2]) << 1);
- if (attributes.length != (size + 3)) {
- throw new IOException("Parsing CreatePipeSinkPlan error. Attributes number is wrong.");
- }
- for (int i = 0; i < size; i += 2) {
- plan.addPipeSinkAttribute(attributes[i + 3], attributes[i + 4]);
- }
- return plan;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- builder.append(pipeSinkType).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- builder.append(pipeSinkAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- for (int i = 0; i < pipeSinkAttributes.size(); i++) {
- builder
- .append(pipeSinkAttributes.get(i).left)
- .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- builder
- .append(pipeSinkAttributes.get(i).right)
- .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
- }
- return builder.toString();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropPipeSinkPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropPipeSinkPlan.java
deleted file mode 100644
index 4da8de3c9e..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropPipeSinkPlan.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import java.util.Collections;
-import java.util.List;
-
-public class DropPipeSinkPlan extends PhysicalPlan {
- private String pipeSinkName;
-
- public DropPipeSinkPlan(String pipeSinkName) {
- super(Operator.OperatorType.DROP_PIPESINK);
- this.pipeSinkName = pipeSinkName;
- }
-
- public String getPipeSinkName() {
- return pipeSinkName;
- }
-
- @Override
- public List<? extends PartialPath> getPaths() {
- return Collections.emptyList();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperatePipePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperatePipePlan.java
deleted file mode 100644
index 6af563c5c3..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperatePipePlan.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import java.util.Collections;
-import java.util.List;
-
-public class OperatePipePlan extends PhysicalPlan {
- private String pipeName;
-
- public OperatePipePlan(String pipeName, Operator.OperatorType type) {
- super(type);
- this.pipeName = pipeName;
- }
-
- public String getPipeName() {
- return pipeName;
- }
-
- @Override
- public List<? extends PartialPath> getPaths() {
- return Collections.emptyList();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipePlan.java
deleted file mode 100644
index bbdbdf2043..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipePlan.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-public class ShowPipePlan extends ShowPlan {
- private String pipeName;
-
- public ShowPipePlan(String pipeName) {
- super(ShowContentType.PIPE);
- this.pipeName = pipeName;
- }
-
- public String getPipeName() {
- return pipeName;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkPlan.java
deleted file mode 100644
index 038a681349..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkPlan.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-public class ShowPipeSinkPlan extends ShowPlan {
- private String pipeSinkName;
-
- public ShowPipeSinkPlan(String pipeSinkName) {
- super(ShowContentType.PIPESINK);
- this.pipeSinkName = pipeSinkName;
- }
-
- public String getPipeSinkName() {
- return pipeSinkName;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkTypePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkTypePlan.java
deleted file mode 100644
index fe29622c66..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeSinkTypePlan.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-public class ShowPipeSinkTypePlan extends ShowPlan {
- public ShowPipeSinkTypePlan() {
- super(ShowContentType.PIPESINKTYPE);
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 0398a2f014..86f3e95112 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -127,8 +127,5 @@ public class ShowPlan extends PhysicalPlan {
NODES_IN_SCHEMA_TEMPLATE,
PATHS_SET_SCHEMA_TEMPLATE,
PATHS_USING_SCHEMA_TEMPLATE,
- PIPESINK,
- PIPESINKTYPE,
- PIPE,
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 7cce9048df..1051723306 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.sync;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.sync.PipeException;
@@ -39,16 +38,12 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
-import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
-import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
@@ -60,9 +55,6 @@ import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
@@ -140,14 +132,6 @@ public class SyncService implements IService {
return syncInfoFetcher.getPipeSink(name);
}
- // TODO(sync): delete this in new-standalone version
- public void addPipeSink(CreatePipeSinkPlan plan) throws PipeSinkException {
- TSStatus status = syncInfoFetcher.addPipeSink(plan);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeSinkException(status.message);
- }
- }
-
public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
throws PipeSinkException {
logger.info("Add PIPESINK {}", createPipeSinkStatement);
@@ -425,67 +409,6 @@ public class SyncService implements IService {
return list;
}
- // TODO(sync): delete this in new-standalone version
- public void showPipe(ShowPipePlan plan, ListDataSet listDataSet) {
- boolean showAll = "".equals(plan.getPipeName());
- // show pipe in sender
- for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
- if (showAll || plan.getPipeName().equals(pipe.getPipeName())) {
- try {
- RowRecord record = new RowRecord(0);
- record.addField(
- Binary.valueOf(DateTimeUtils.convertLongToDate(pipe.getCreateTime())),
- TSDataType.TEXT);
- record.addField(Binary.valueOf(pipe.getPipeName()), TSDataType.TEXT);
- record.addField(Binary.valueOf(IoTDBConstant.SYNC_SENDER_ROLE), TSDataType.TEXT);
- record.addField(Binary.valueOf(pipe.getPipeSinkName()), TSDataType.TEXT);
- record.addField(Binary.valueOf(pipe.getStatus().name()), TSDataType.TEXT);
- PipeSink pipeSink = syncInfoFetcher.getPipeSink(pipe.getPipeSinkName());
- if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
- ExtPipePluginManager extPipePluginManager =
- SyncService.getInstance().getExternalPipeManager(pipe.getPipeName());
-
- if (extPipePluginManager != null) {
- String extPipeType = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
- ExternalPipeStatus externalPipeStatus =
- extPipePluginManager.getExternalPipeStatus(extPipeType);
-
- // TODO(ext-pipe): Adapting to the new syntax of SHOW PIPE
- if (externalPipeStatus != null) {
- record.addField(
- Binary.valueOf(
- externalPipeStatus.getWriterInvocationFailures().toString()
- + ";"
- + externalPipeStatus.getWriterStatuses().toString()),
- TSDataType.TEXT);
- }
- }
- } else {
- record.addField(Binary.valueOf(pipe.getMessageType().name()), TSDataType.TEXT);
- }
- listDataSet.putRecord(record);
- } catch (Exception e) {
- logger.error("failed to show pipe [{}] because {}", pipe.getPipeName(), e.getMessage());
- }
- }
- }
- // show pipe in receiver
- List<TSyncIdentityInfo> identityInfoList = receiverManager.getAllTSyncIdentityInfos();
- for (TSyncIdentityInfo identityInfo : identityInfoList) {
- // TODO(sync): Removing duplicate rows
- RowRecord record = new RowRecord(0);
- record.addField(
- Binary.valueOf(DateTimeUtils.convertLongToDate(identityInfo.getCreateTime())),
- TSDataType.TEXT);
- record.addField(Binary.valueOf(identityInfo.getPipeName()), TSDataType.TEXT);
- record.addField(Binary.valueOf(IoTDBConstant.SYNC_RECEIVER_ROLE), TSDataType.TEXT);
- record.addField(Binary.valueOf(identityInfo.getAddress()), TSDataType.TEXT);
- record.addField(Binary.valueOf(PipeStatus.RUNNING.name()), TSDataType.TEXT);
- record.addField(Binary.valueOf(PipeMessage.PipeMessageType.NORMAL.name()), TSDataType.TEXT);
- listDataSet.putRecord(record);
- }
- }
-
// endregion
// region Interfaces and Implementation of External-Pipe
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 8021d9815c..33254a4547 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.rpc.RpcUtils;
@@ -55,11 +54,6 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
// region Interfaces of PipeSink
- @Override
- public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
- return RpcUtils.SUCCESS_STATUS;
- }
-
@Override
public TSStatus addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) {
return RpcUtils.SUCCESS_STATUS;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index 30839df25a..89f04c4e0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -24,15 +24,12 @@ import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import java.util.List;
public interface ISyncInfoFetcher {
// region Interfaces of PipeSink
- // TODO(sync): delete this in new-standalone version
- TSStatus addPipeSink(CreatePipeSinkPlan plan);
TSStatus addPipeSink(CreatePipeSinkStatement createPipeSinkStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
index fd331d572d..c30d5cc852 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.slf4j.Logger;
@@ -68,15 +67,6 @@ public class LocalSyncInfo {
// region Implement of PipeSink
- // TODO: delete this in new-standalone version
- public void addPipeSink(CreatePipeSinkPlan plan) throws PipeSinkException, IOException {
- syncMetadata.checkAddPipeSink(plan.getPipeSinkName());
- PipeSink pipeSink = SyncPipeUtil.parseCreatePipeSinkPlan(plan);
- // should guarantee the adding pipesink is not exist.
- syncMetadata.addPipeSink(pipeSink);
- syncLogWriter.addPipeSink(pipeSink);
- }
-
public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement)
throws PipeSinkException, IOException {
syncMetadata.checkAddPipeSink(createPipeSinkStatement.getPipeSinkName());
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index ce50511013..923828ceda 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -47,15 +46,6 @@ public class LocalSyncInfoFetcher implements ISyncInfoFetcher {
}
// region Implement of PipeSink
- @Override
- public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
- try {
- localSyncInfo.addPipeSink(plan);
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } catch (PipeSinkException | IOException e) {
- return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- }
- }
@Override
public TSStatus addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
index 7d9adf1249..18d41d2f35 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
@@ -82,9 +82,6 @@ public abstract class PipeData {
case DELETION:
pipeData = new DeletionPipeData();
break;
- case SCHEMA:
- pipeData = new SchemaPipeData();
- break;
default:
logger.error("Deserialize PipeData error because Unknown type {}.", type);
throw new UnsupportedOperationException(
@@ -103,6 +100,5 @@ public abstract class PipeData {
public enum PipeDataType {
TSFILE,
DELETION,
- SCHEMA
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
deleted file mode 100644
index ee1d342f24..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.pipedata;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.sync.receiver.load.ILoader;
-import org.apache.iotdb.db.sync.receiver.load.SchemaLoader;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-public class SchemaPipeData extends PipeData {
- private static final int SERIALIZE_BUFFER_SIZE = 1024;
-
- private PhysicalPlan plan;
-
- public SchemaPipeData() {
- super();
- }
-
- public SchemaPipeData(PhysicalPlan plan, long serialNumber) {
- super(serialNumber);
- this.plan = plan;
- }
-
- @Override
- public PipeDataType getType() {
- return PipeDataType.SCHEMA;
- }
-
- @Override
- public long serialize(DataOutputStream stream) throws IOException {
- long serializeSize = super.serialize(stream);
- byte[] bytes = getBytes();
- stream.writeInt(bytes.length);
- stream.write(bytes);
- serializeSize += (Integer.BYTES + bytes.length);
- return serializeSize;
- }
-
- private byte[] getBytes() {
- ByteBuffer buffer = ByteBuffer.allocate(SERIALIZE_BUFFER_SIZE);
- plan.serialize(buffer);
- byte[] bytes = new byte[buffer.position()];
- buffer.flip();
- buffer.get(bytes);
- return bytes;
- }
-
- public void deserialize(DataInputStream stream) throws IOException, IllegalPathException {
- super.deserialize(stream);
- byte[] bytes = new byte[stream.readInt()];
- stream.read(bytes);
- plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
- }
-
- @Override
- public ILoader createLoader() {
- return new SchemaLoader(plan);
- }
-
- @Override
- public String toString() {
- return "SchemaPipeData{" + "serialNumber=" + serialNumber + ", plan=" + plan + '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- SchemaPipeData that = (SchemaPipeData) o;
- return Objects.equals(plan, that.plan) && Objects.equals(serialNumber, that.serialNumber);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(plan, serialNumber);
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/SchemaLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/SchemaLoader.java
deleted file mode 100644
index 34cb369780..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/SchemaLoader.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.receiver.load;
-
-import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This loader is used to PhysicalPlan. Support four types of physical plans: CREATE_TIMESERIES |
- * CREATE_ALIGNED_TIMESERIES | DELETE_TIMESERIES | SET_STORAGE_GROUP
- */
-public class SchemaLoader implements ILoader {
- private static final Logger logger = LoggerFactory.getLogger(SchemaLoader.class);
- private static PlanExecutor planExecutor;
-
- static {
- try {
- planExecutor = new PlanExecutor();
- } catch (QueryProcessException e) {
- logger.error(e.getMessage());
- }
- }
-
- private final PhysicalPlan plan;
-
- public SchemaLoader(PhysicalPlan plan) {
- this.plan = plan;
- }
-
- @Override
- public void load() throws PipeDataLoadException {
- try {
- planExecutor.processNonQuery(plan);
- } catch (QueryProcessException e) {
- if (e.getCause() instanceof StorageGroupAlreadySetException) {
- logger.warn(
- "Sync receiver try to set storage group "
- + ((StorageGroupAlreadySetException) e.getCause()).getStorageGroupPath()
- + " that has already been set");
- } else {
- throw new PipeDataLoadException(e.getMessage());
- }
- } catch (StorageEngineException | StorageGroupNotSetException e) {
- throw new PipeDataLoadException(e.getMessage());
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
index f377bb1f5a..37d7b994f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
@@ -23,11 +23,9 @@ import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadFileException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -39,18 +37,8 @@ import java.io.File;
/** This loader is used to load tsFiles. If .mods file exists, it will be loaded as well. */
public class TsFileLoader implements ILoader {
private static final Logger logger = LoggerFactory.getLogger(TsFileLoader.class);
- private static PlanExecutor planExecutor;
-
- static {
- try {
- planExecutor = new PlanExecutor();
- } catch (QueryProcessException e) {
- logger.error(e.getMessage());
- }
- }
private final File tsFile;
- // TODO(sync): use storage group to support auto create schema
private final String storageGroup;
public TsFileLoader(File tsFile, String storageGroup) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
index 73669ebbcd..d87e07bab5 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
@@ -28,30 +28,14 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.PipeSinkFactory;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.tsfile.utils.Pair;
import java.util.Map;
public class SyncPipeUtil {
- // TODO(sync): delete this in new-standalone version
- public static PipeSink parseCreatePipeSinkPlan(CreatePipeSinkPlan plan) throws PipeSinkException {
- PipeSink pipeSink;
- try {
- pipeSink = PipeSinkFactory.createPipeSink(plan.getPipeSinkType(), plan.getPipeSinkName());
- } catch (UnsupportedOperationException e) {
- throw new PipeSinkException(e.getMessage());
- }
-
- pipeSink.setAttribute(plan.getPipeSinkAttributes());
- return pipeSink;
- }
-
public static PipeSink parseCreatePipeSinkStatement(
CreatePipeSinkStatement createPipeSinkStatement) throws PipeSinkException {
PipeSink pipeSink;
@@ -67,27 +51,6 @@ public class SyncPipeUtil {
return pipeSink;
}
- // TODO(sync): delete this in new-standalone version
- public static PipeInfo parseCreatePipePlanAsPipeInfo(CreatePipePlan plan, long pipeCreateTime)
- throws PipeException {
- boolean syncDelOp = false;
- for (Pair<String, String> pair : plan.getPipeAttributes()) {
- pair.left = pair.left.toLowerCase();
- if ("syncdelop".equals(pair.left)) {
- syncDelOp = Boolean.parseBoolean(pair.right);
- } else {
- throw new PipeException(String.format("Can not recognition attribute %s", pair.left));
- }
- }
-
- return new TsFilePipeInfo(
- plan.getPipeName(),
- plan.getPipeSinkName(),
- pipeCreateTime,
- plan.getDataStartTimestamp(),
- syncDelOp);
- }
-
public static PipeInfo parseCreatePipeStatementAsPipeInfo(
CreatePipeStatement createPipeStatement, long pipeCreateTime) throws PipeException {
boolean syncDelOp = false;
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java b/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java
index 032db8479b..92350d049e 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipe.SyncOperation;
import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.sync.common.LocalSyncInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -35,6 +35,8 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
public class LocalSyncInfoTest {
private static final String pipe1 = "pipe1";
@@ -56,9 +58,13 @@ public class LocalSyncInfoTest {
public void testOperatePipe() throws Exception {
LocalSyncInfo localSyncInfo = new LocalSyncInfo();
try {
- CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo", "iotdb");
- createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
- createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
+ CreatePipeSinkStatement createPipeSinkStatement = new CreatePipeSinkStatement();
+ createPipeSinkStatement.setPipeSinkName("demo");
+ createPipeSinkStatement.setPipeSinkType("IoTDB");
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("ip", "127.0.0.1");
+ attributes.put("port", "6667");
+ createPipeSinkStatement.setAttributes(attributes);
try {
localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true));
Assert.fail();
@@ -66,7 +72,7 @@ public class LocalSyncInfoTest {
Assert.assertTrue(e instanceof PipeSinkNotExistException);
// throw exception because can not find pipeSink
}
- localSyncInfo.addPipeSink(createPipeSinkPlan);
+ localSyncInfo.addPipeSink(createPipeSinkStatement);
localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true));
localSyncInfo.addPipe(new TsFilePipeInfo(pipe2, "demo", createdTime2, 0, true));
try {