You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/01/25 03:20:21 UTC
[iotdb] branch new_sync updated: [To new_sync][IOTDB-2272] implement customized sync process: receiver (#4786)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_sync by this push:
new 5e62a81 [To new_sync][IOTDB-2272] implement customized sync process: receiver (#4786)
5e62a81 is described below
commit 5e62a81e21bc61fc9428fb68c1b97115ca40ea0f
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Tue Jan 25 11:19:42 2022 +0800
[To new_sync][IOTDB-2272] implement customized sync process: receiver (#4786)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4 | 4 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 16 +-
.../sync/IoTDBSyncReceiverLoaderIT.java | 306 +++++++++++++++++++++
.../iotdb/db/integration/sync/WriteUtil.java | 102 +++++++
.../resources/conf/iotdb-engine.properties | 6 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 2 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../apache/iotdb/db/newsync/conf/SyncConstant.java | 23 ++
.../iotdb/db/newsync/receiver/ReceiverService.java | 177 ++++++++++++
.../db/newsync/receiver/collector/Collector.java | 22 ++
.../db/newsync/receiver/load/DeletionLoader.java | 44 +++
.../iotdb/db/newsync/receiver/load/ILoader.java | 36 +++
.../db/newsync/receiver/load/SchemaLoader.java | 54 ++++
.../db/newsync/receiver/load/TsFileLoader.java | 67 +++++
.../db/newsync/receiver/manager/PipeInfo.java | 83 ++++++
.../db/newsync/receiver/manager/PipeStatus.java | 25 ++
.../newsync/receiver/manager/ReceiverManager.java | 132 +++++++++
.../db/newsync/receiver/recovery/ReceiverLog.java | 85 ++++++
.../receiver/recovery/ReceiverLogAnalyzer.java | 106 +++++++
.../apache/iotdb/db/qp/constant/SQLConstant.java | 6 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 20 ++
.../org/apache/iotdb/db/qp/logical/Operator.java | 6 +-
.../db/qp/logical/sys/ShowPipeServerOperator.java | 51 ++++
.../db/qp/logical/sys/StartPipeServerOperator.java | 38 +++
.../db/qp/logical/sys/StopPipeServerOperator.java | 38 +++
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 10 +
.../db/qp/physical/sys/ShowPipeServerPlan.java | 41 +++
.../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 3 +-
.../db/qp/physical/sys/StartPipeServerPlan.java | 56 ++++
.../db/qp/physical/sys/StopPipeServerPlan.java | 56 ++++
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 19 ++
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +
.../org/apache/iotdb/db/service/ServiceType.java | 1 +
.../receiver/manager/ReceiverManagerTest.java | 81 ++++++
.../receiver/recovery/ReceiverLogAnalyzerTest.java | 82 ++++++
.../iotdb/db/qp/physical/PhysicalPlanTest.java | 34 +++
37 files changed, 1845 insertions(+), 5 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
index 6b7398d..f406677 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
@@ -362,6 +362,10 @@ PIPE
: P I P E
;
+PIPESERVER
+ : P I P E S E R V E R
+ ;
+
PIPESINK
: P I P E S I N K
;
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 01982b7..24dbc79 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -63,7 +63,8 @@ utilityStatement
| loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile;
syncStatement
- : createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
+ : startPipeServer | stopPipeServer | showPipeServer
+ | createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
| createPipe | showPipe | stopPipe | startPipe | dropPipe;
/**
@@ -706,6 +707,19 @@ syncAttributeClauses
: propertyClause (COMMA propertyClause)*
;
+// sync receiver
+startPipeServer
+ : START PIPESERVER
+ ;
+
+stopPipeServer
+ : STOP PIPESERVER
+ ;
+
+showPipeServer
+ : SHOW PIPESERVER (pipeName=ID)?
+ ;
+
/**
* 7. Common Clauses
*/
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
new file mode 100644
index 0000000..3d96ccb
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.sync;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.receiver.load.DeletionLoader;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
+import org.apache.iotdb.db.newsync.receiver.load.SchemaLoader;
+import org.apache.iotdb.db.newsync.receiver.load.TsFileLoader;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBSyncReceiverLoaderIT {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncReceiverLoaderIT.class);
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+ /** create tsfile and move to tmpDir for sync test */
+ File tmpDir = new File("target/synctest");
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ WriteUtil.insertData();
+ EnvironmentUtils.shutdownDaemon();
+ File srcDir = new File(IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]);
+ FileUtils.moveDirectory(srcDir, tmpDir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ FileUtils.deleteDirectory(tmpDir);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void test() throws Exception {
+ // 1. restart IoTDB
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.envSetUp();
+
+ // 2. test for SchemaLoader
+ List<PhysicalPlan> planList = new ArrayList<>();
+ planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d0.s0"),
+ new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d0.s1"),
+ new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d1.s2"),
+ new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d1.s3"),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
+ planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
+ planList.add(
+ new CreateAlignedTimeSeriesPlan(
+ new PartialPath("root.sg1.d1"),
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ Arrays.asList(
+ TSDataType.FLOAT,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT),
+ Arrays.asList(
+ TSEncoding.RLE,
+ TSEncoding.GORILLA,
+ TSEncoding.RLE,
+ TSEncoding.RLE,
+ TSEncoding.PLAIN),
+ Arrays.asList(
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY),
+ null));
+ for (PhysicalPlan plan : planList) {
+ ILoader planLoader = new SchemaLoader(plan);
+ try {
+ planLoader.load();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ // 3. test for TsFileLoader
+ List<File> tsFiles = getTsFilePaths(tmpDir);
+ for (File tsfile : tsFiles) {
+ ILoader tsFileLoader = new TsFileLoader(tsfile);
+ try {
+ tsFileLoader.load();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ // 4. test for DeletionPlanLoader
+ Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
+ ILoader deletionLoader = new DeletionLoader(deletion);
+ try {
+ deletionLoader.load();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ // 5. check result after loading
+ // 5.1 check normal timeseries
+ String sql1 = "select * from root.vehicle.*";
+ String[] retArray1 =
+ new String[] {
+ "6,120,null,null,null",
+ "9,null,123,null,null",
+ "16,128,null,null,16.0",
+ "18,189,198,true,18.0",
+ "20,null,null,false,null",
+ "29,null,null,true,1205.0",
+ "99,null,1234,null,null"
+ };
+ String[] columnNames1 = {
+ "root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
+ };
+ checkResult(sql1, columnNames1, retArray1);
+ // 5.2 check aligned timeseries
+ String sql2 = "select * from root.sg1.d1";
+ String[] retArray2 =
+ new String[] {
+ "1,1.0,1,null,true,aligned_test1",
+ "2,2.0,2,null,null,aligned_test2",
+ "3,3.0,null,null,false,aligned_test3",
+ "4,4.0,4,null,true,aligned_test4",
+ "5,130000.0,130000,130000,false,aligned_unseq_test1",
+ "6,6.0,6,6,true,null",
+ "7,7.0,7,7,false,aligned_test7",
+ "8,8.0,8,8,null,aligned_test8",
+ "9,9.0,9,9,false,aligned_test9",
+ };
+ String[] columnNames2 = {
+ "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s5",
+ };
+ checkResult(sql2, columnNames2, retArray2);
+ }
+
+ private void checkResult(String sql, String[] columnNames, String[] retArray)
+ throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(sql);
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * scan parentDir and return all TsFile sorted by load sequence
+ *
+ * @param parentDir folder to scan
+ */
+ private List<File> getTsFilePaths(File parentDir) {
+ List<File> res = new ArrayList<>();
+ if (!parentDir.exists()) {
+ Assert.fail();
+ return res;
+ }
+ scanDir(res, parentDir);
+ Collections.sort(
+ res,
+ new Comparator<File>() {
+ @Override
+ public int compare(File f1, File f2) {
+ int diffSg =
+ f1.getParentFile()
+ .getParentFile()
+ .getParentFile()
+ .getName()
+ .compareTo(f2.getParentFile().getParentFile().getParentFile().getName());
+ if (diffSg != 0) {
+ return diffSg;
+ } else {
+ return (int)
+ (FilePathUtils.splitAndGetTsFileVersion(f1.getName())
+ - FilePathUtils.splitAndGetTsFileVersion(f2.getName()));
+ }
+ }
+ });
+ return res;
+ }
+
+ private void scanDir(List<File> tsFiles, File parentDir) {
+ if (!parentDir.exists()) {
+ Assert.fail();
+ return;
+ }
+ File fa[] = parentDir.listFiles();
+ for (int i = 0; i < fa.length; i++) {
+ File fs = fa[i];
+ if (fs.isDirectory()) {
+ scanDir(tsFiles, fs);
+ } else if (fs.getName().endsWith(".resource")) {
+ // only add tsfile that has been flushed
+ tsFiles.add(new File(fs.getAbsolutePath().substring(0, fs.getAbsolutePath().length() - 9)));
+ try {
+ FileUtils.delete(fs);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java
new file mode 100644
index 0000000..9d4da20
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.sync;
+
+import org.apache.iotdb.jdbc.Config;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+public class WriteUtil {
+
+ private static final String[] sqls =
+ new String[] {
+ "SET STORAGE GROUP TO root.vehicle",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "SET STORAGE GROUP TO root.sg1",
+ "create aligned timeseries root.sg1.d1(s1 FLOAT encoding=RLE, s2 INT32 encoding=Gorilla compression=SNAPPY, s3 INT64, s4 BOOLEAN, s5 TEXT)",
+ "insert into root.sg1.d1(time, s1, s2, s3, s4, s5) aligned values(1, 1.0, 1, 1, TRUE, 'aligned_test1')",
+ "insert into root.sg1.d1(time, s1, s2, s3, s5) aligned values(2, 2.0, 2, 2, 'aligned_test2')",
+ "insert into root.sg1.d1(time, s1, s3, s4, s5) aligned values(3, 3.0, 3, FALSE, 'aligned_test3')",
+ "insert into root.sg1.d1(time, s1, s2, s4, s5) aligned values(4, 4.0, 4, TRUE, 'aligned_test4')",
+ "insert into root.sg1.d1(time, s1, s2, s4, s5) aligned values(5, 5.0, 5, TRUE, 'aligned_test5')",
+ "insert into root.sg1.d1(time, s1, s2, s3, s4) aligned values(6, 6.0, 6, 6, TRUE)",
+ "insert into root.sg1.d1(time, s1, s2, s3, s4, s5) aligned values(7, 7.0, 7, 7, FALSE, 'aligned_test7')",
+ "insert into root.sg1.d1(time, s1, s2, s3, s5) aligned values(8, 8.0, 8, 8, 'aligned_test8')",
+ "insert into root.sg1.d1(time, s1, s2, s3, s4, s5) aligned values(9, 9.0, 9, 9, FALSE, 'aligned_test9')",
+ "insert into root.vehicle.d0(timestamp,s0) values(10,10)",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(12,12,'12')",
+ "insert into root.vehicle.d0(timestamp,s1) values(14,'14')",
+ "insert into root.vehicle.d1(timestamp,s2) values(16,16.0)",
+ "insert into root.vehicle.d1(timestamp,s2,s3) values(18,18.0,true)",
+ "insert into root.vehicle.d1(timestamp,s3) values(20,false)",
+ "flush",
+ "insert into root.vehicle.d0(timestamp,s0) values(6,120)",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(38,121,'122')",
+ "insert into root.vehicle.d0(timestamp,s1) values(9,'123')",
+ "insert into root.vehicle.d0(timestamp,s0) values(16,128)",
+ "insert into root.sg1.d1(time, s1, s2, s3, s4, s5) aligned values(5, 130000.0, 130000, 130000, FALSE, 'aligned_unseq_test1')",
+ "flush",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(18,189,'198')",
+ "insert into root.vehicle.d0(timestamp,s1) values(99,'1234')",
+ "insert into root.vehicle.d1(timestamp,s2) values(14,1024.0)",
+ "insert into root.vehicle.d1(timestamp,s2,s3) values(29,1205.0,true)",
+ "insert into root.vehicle.d1(timestamp,s3) values(33,true)",
+ "delete from root.sg1.d1.s3 where time<=3",
+ "flush",
+ "delete from root.vehicle.** where time >= 10 and time<=14",
+ "flush",
+ // no flush data
+ "insert into root.sg1.d1(time, s1, s3, s4) aligned values(23, 230000.0, 230000, FALSE)",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(31, 31, 'aligned_test31')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(32, 32, 'aligned_test32')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(33, 33, 'aligned_test33')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(34, 34, 'aligned_test34')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(35, 35, 'aligned_test35')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(36, 36, 'aligned_test36')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(37, 37, 'aligned_test37')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(38, 38, 'aligned_test38')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(39, 39, 'aligned_test39')",
+ "insert into root.sg1.d1(time, s2, s5) aligned values(40, 40, 'aligned_test40')",
+ };
+
+ public static void insertData() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ // create aligned and non-aligned time series
+ for (String sql : sqls) {
+ statement.execute(sql);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void main(String[] args) throws ClassNotFoundException {
+ insertData();
+ }
+}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index b1a9bf4..d76605e 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -568,6 +568,12 @@ timestamp_precision=ms
# Datatype: int
# external_sort_threshold=1000
+####################
+### PIPE Server Configuration
+####################
+# PIPE server port to listen
+# Datatype: int
+# local_pipe_server_port=5555
####################
### Sync Server Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index befc91d..d43afa4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -469,6 +469,9 @@ public class IoTDBConfig {
/** If this IoTDB instance is a receiver of sync, set the server port. */
private int syncServerPort = 5555;
+ /** If this IoTDB instance is a receiver of sync, set the server port. */
+ private int pipeServerPort = 5555;
+
/**
* Set the language version when loading file including error information, default value is "EN"
*/
@@ -1283,6 +1286,14 @@ public class IoTDBConfig {
this.syncServerPort = syncServerPort;
}
+ public int getPipeServerPort() {
+ return pipeServerPort;
+ }
+
+ public void setPipeServerPort(int pipeServerPort) {
+ this.pipeServerPort = pipeServerPort;
+ }
+
String getLanguageVersion() {
return languageVersion;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index b449ef3..f0f939c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -131,6 +131,8 @@ public class IoTDBConstant {
public static final String COLUMN_PIPE2PIPESINK_NAME = "pipeSink";
public static final String COLUMN_PIPE_STATUS = "status";
+ // sync receiver
+ public static final String COLUMN_PIPE_REMOTE_IP = "remote ip";
public static final String ONE_LEVEL_PATH_WILDCARD = "*";
public static final String MULTI_LEVEL_PATH_WILDCARD = "**";
public static final String TIME = "time";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f46671b..2c3a80b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -379,6 +379,11 @@ public class IoTDBDescriptor {
properties
.getProperty("sync_server_port", Integer.toString(conf.getSyncServerPort()))
.trim()));
+ conf.setSyncServerPort(
+ Integer.parseInt(
+ properties
+ .getProperty("local_pipe_server_port", Integer.toString(conf.getSyncServerPort()))
+ .trim()));
conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
new file mode 100644
index 0000000..9a6d0d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.conf;
+
+public class SyncConstant {
+ public static final String RECEIVER_LOG_NAME = "receiverService.log";
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
new file mode 100644
index 0000000..e85bc7c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.receiver.collector.Collector;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
+import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+
+public class ReceiverService implements IService {
+ private static final Logger logger = LoggerFactory.getLogger(ReceiverService.class);
+ private static final ReceiverManager receiverManager = ReceiverManager.getInstance();
+ private Collector collector;
+
+ /** start receiver service */
+ public boolean startPipeServer() {
+ try {
+ receiverManager.startServer();
+ // TODO: start socket and collector
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /** stop receiver service */
+ public boolean stopPipeServer() {
+ try {
+ receiverManager.stopServer();
+ // TODO: stop socket and collector
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /** create and start a new pipe named pipeName */
+ public void createPipe(String pipeName, String remoteIp, long startTime) throws IOException {
+ receiverManager.createPipe(pipeName, remoteIp, startTime);
+ }
+
+ /** start an existed pipe named pipeName */
+ public void startPipe(String pipeName, String remoteIp) throws IOException {
+ receiverManager.startPipe(pipeName, remoteIp);
+ }
+
+ /** stop an existed pipe named pipeName */
+ public void stopPipe(String pipeName, String remoteIp) throws IOException {
+ receiverManager.stopPipe(pipeName, remoteIp);
+ }
+
+ /** drop an existed pipe named pipeName */
+ public void dropPipe(String pipeName, String remoteIp) throws IOException {
+ receiverManager.dropPipe(pipeName, remoteIp);
+ }
+
+ /**
+ * query by sql SHOW PIPE
+ *
+ * @return QueryDataSet contained three columns: pipe name, status and start time
+ */
+ public QueryDataSet showPipe(ShowPipeServerPlan plan) {
+ ListDataSet dataSet =
+ new ListDataSet(
+ Arrays.asList(
+ new PartialPath(COLUMN_PIPE_NAME, false),
+ new PartialPath(COLUMN_PIPE_REMOTE_IP, false),
+ new PartialPath(COLUMN_PIPE_STATUS, false),
+ new PartialPath(COLUMN_CREATED_TIME, false)),
+ Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+ List<PipeInfo> pipeInfos;
+ if (!StringUtils.isEmpty(plan.getPipeName())) {
+ pipeInfos = receiverManager.getPipeInfos(plan.getPipeName());
+ } else {
+ pipeInfos = receiverManager.getAllPipeInfos();
+ }
+ for (PipeInfo pipeInfo : pipeInfos) {
+ putPipeRecord(dataSet, pipeInfo);
+ }
+ return dataSet;
+ }
+
+ private void putPipeRecord(ListDataSet dataSet, PipeInfo pipeInfo) {
+ RowRecord rowRecord = new RowRecord(0);
+ Field pipeNameField = new Field(TSDataType.TEXT);
+ Field pipeRemoteIp = new Field(TSDataType.TEXT);
+ Field pipeStatusField = new Field(TSDataType.TEXT);
+ Field pipeCreateTimeField = new Field(TSDataType.TEXT);
+ pipeNameField.setBinaryV(new Binary(pipeInfo.getPipeName()));
+ pipeRemoteIp.setBinaryV(new Binary(pipeInfo.getRemoteIp()));
+ pipeStatusField.setBinaryV(new Binary(pipeInfo.getStatus().name()));
+ pipeCreateTimeField.setBinaryV(
+ new Binary(DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())));
+ rowRecord.addField(pipeNameField);
+ rowRecord.addField(pipeRemoteIp);
+ rowRecord.addField(pipeStatusField);
+ rowRecord.addField(pipeCreateTimeField);
+ dataSet.putRecord(rowRecord);
+ }
+
+ private ReceiverService() {
+ collector = new Collector();
+ }
+
+ public static ReceiverService getInstance() {
+ return ReceiverServiceHolder.INSTANCE;
+ }
+
+ /** IService * */
+ @Override
+ public void start() throws StartupException {
+ receiverManager.init();
+ if (receiverManager.isPipeServerEnable()) {
+ startPipeServer();
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopPipeServer();
+ try {
+ receiverManager.close();
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.RECEIVER_SERVICE;
+ }
+
+ private static class ReceiverServiceHolder {
+ private static final ReceiverService INSTANCE = new ReceiverService();
+
+ private ReceiverServiceHolder() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
new file mode 100644
index 0000000..3c44ed0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.receiver.collector;
+
+public class Collector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java
new file mode 100644
index 0000000..b12c0cb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/DeletionLoader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.load;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageEngineReadonlyException;
+
+/** This loader is used to load deletion plan. */
+public class DeletionLoader implements ILoader {
+
+ private Deletion deletion;
+
+ public DeletionLoader(Deletion deletion) {
+ this.deletion = deletion;
+ }
+
+ @Override
+ public void load() throws StorageEngineException {
+ if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+ throw new StorageEngineReadonlyException();
+ }
+ StorageEngine.getInstance()
+ .delete(deletion.getPath(), deletion.getStartTime(), deletion.getEndTime(), 0, null);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java
new file mode 100644
index 0000000..05be693
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/ILoader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.load;
+
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import java.io.IOException;
+
+/**
+ * This interface is used to load files, including tsFile, syncTask, schema, modsFile and
+ * deletePlan.
+ */
+public interface ILoader {
+ void load()
+ throws StorageEngineException, IOException, MetadataException, WriteProcessException,
+ LoadFileException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java
new file mode 100644
index 0000000..1547b5a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/SchemaLoader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.load;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This loader is used to PhysicalPlan. Support four types of physical plans: CREATE_TIMESERIES |
+ * CREATE_ALIGNED_TIMESERIES | DELETE_TIMESERIES | SET_STORAGE_GROUP
+ */
+public class SchemaLoader implements ILoader {
+ private static final Logger logger = LoggerFactory.getLogger(SchemaLoader.class);
+
+ private PhysicalPlan plan;
+
+ public SchemaLoader(PhysicalPlan plan) {
+ this.plan = plan;
+ }
+
+ @Override
+ public void load() throws IOException, MetadataException {
+ try {
+ MManager.getInstance().operation(plan);
+ } catch (StorageGroupAlreadySetException e) {
+ logger.warn(
+ "Sync receiver try to set storage group {} that has already been set",
+ e.getStorageGroupPath());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java
new file mode 100644
index 0000000..1ee57d2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/load/TsFileLoader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.load;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.tools.TsFileRewriteTool;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** This loader is used to load tsFiles. If .mods file exists, it will be loaded as well. */
+public class TsFileLoader implements ILoader {
+
+ private File tsFile;
+
+ public TsFileLoader(File tsFile) {
+ this.tsFile = tsFile;
+ }
+
+ @Override
+ public void load()
+ throws IOException, MetadataException, WriteProcessException, StorageEngineException,
+ LoadFileException {
+ TsFileResource tsFileResource = new TsFileResource(tsFile);
+ tsFileResource.setClosed(true);
+ FileLoaderUtils.checkTsFileResource(tsFileResource);
+ List<TsFileResource> splitResources = new ArrayList();
+ if (tsFileResource.isSpanMultiTimePartitions()) {
+ TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources);
+ tsFileResource.writeLock();
+ tsFileResource.removeModFile();
+ tsFileResource.writeUnlock();
+ }
+
+ if (splitResources.isEmpty()) {
+ splitResources.add(tsFileResource);
+ }
+
+ for (TsFileResource resource : splitResources) {
+ StorageEngine.getInstance().loadNewTsFile(resource);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java
new file mode 100644
index 0000000..535d662
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.manager;
+
+import java.util.Objects;
+
+public class PipeInfo {
+ private String pipeName;
+ private PipeStatus status;
+ private String remoteIp;
+ private long createTime;
+
+ public PipeInfo(String pipeName, String remoteIp, PipeStatus status, long createTime) {
+ this.pipeName = pipeName;
+ this.remoteIp = remoteIp;
+ this.status = status;
+ this.createTime = createTime;
+ }
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public void setPipeName(String pipeName) {
+ this.pipeName = pipeName;
+ }
+
+ public PipeStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(PipeStatus status) {
+ this.status = status;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(long createTime) {
+ this.createTime = createTime;
+ }
+
+ public String getRemoteIp() {
+ return remoteIp;
+ }
+
+ public void setRemoteIp(String remoteIp) {
+ this.remoteIp = remoteIp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PipeInfo pipeInfo = (PipeInfo) o;
+ return createTime == pipeInfo.createTime
+ && Objects.equals(pipeName, pipeInfo.pipeName)
+ && status == pipeInfo.status
+ && Objects.equals(remoteIp, pipeInfo.remoteIp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeName, status, remoteIp, createTime);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
new file mode 100644
index 0000000..b38df8d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/PipeStatus.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.manager;
+
+public enum PipeStatus {
+ RUNNING,
+ PAUSE,
+ DROP
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
new file mode 100644
index 0000000..4dd33e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.manager;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLog;
+import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLogAnalyzer;
+import org.apache.iotdb.db.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class ReceiverManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
+
+ private boolean pipeServerEnable;
+ // <pipeName, <remoteIp, pipeInfo>>
+ private Map<String, Map<String, PipeInfo>> pipeInfoMap;
+ private ReceiverLog log;
+
+ public void init() throws StartupException {
+ try {
+ log = new ReceiverLog();
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ throw new StartupException(
+ ServiceType.RECEIVER_SERVICE.getName(), "cannot create receiver log");
+ }
+ ReceiverLogAnalyzer.scan();
+ pipeInfoMap = ReceiverLogAnalyzer.getPipeInfoMap();
+ pipeServerEnable = ReceiverLogAnalyzer.isPipeServerEnable();
+ }
+
+ public void close() throws IOException {
+ log.close();
+ }
+
+ public void startServer() throws IOException {
+ log.startPipeServer();
+ pipeServerEnable = true;
+ }
+
+ public void stopServer() throws IOException {
+ log.stopPipeServer();
+ pipeServerEnable = false;
+ }
+
+ public void createPipe(String pipeName, String remoteIp, long startTime) throws IOException {
+ if (log != null) {
+ log.createPipe(pipeName, remoteIp, startTime);
+ }
+ if (!pipeInfoMap.containsKey(pipeName)) {
+ pipeInfoMap.put(pipeName, new HashMap<>());
+ }
+ pipeInfoMap
+ .get(pipeName)
+ .put(remoteIp, new PipeInfo(pipeName, remoteIp, PipeStatus.RUNNING, startTime));
+ }
+
+ public void startPipe(String pipeName, String remoteIp) throws IOException {
+ if (log != null) {
+ log.startPipe(pipeName, remoteIp);
+ }
+ pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.RUNNING);
+ }
+
+ public void stopPipe(String pipeName, String remoteIp) throws IOException {
+ if (log != null) {
+ log.stopPipe(pipeName, remoteIp);
+ }
+ pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.PAUSE);
+ }
+
+ public void dropPipe(String pipeName, String remoteIp) throws IOException {
+ if (log != null) {
+ log.dropPipe(pipeName, remoteIp);
+ }
+ pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.DROP);
+ }
+
+ public List<PipeInfo> getPipeInfos(String pipeName) {
+ return new ArrayList<>(pipeInfoMap.get(pipeName).values());
+ }
+
+ public List<PipeInfo> getAllPipeInfos() {
+ List<PipeInfo> res = new ArrayList<>();
+ for (String pipeName : pipeInfoMap.keySet()) {
+ res.addAll(pipeInfoMap.get(pipeName).values());
+ }
+ return res;
+ }
+
+ public boolean isPipeServerEnable() {
+ return pipeServerEnable;
+ }
+
+ public void setPipeServerEnable(boolean pipeServerEnable) {
+ this.pipeServerEnable = pipeServerEnable;
+ }
+
+ public static ReceiverManager getInstance() {
+ return ReceiverMonitorHolder.INSTANCE;
+ }
+
+ private ReceiverManager() {}
+
+ private static class ReceiverMonitorHolder {
+ private static final ReceiverManager INSTANCE = new ReceiverManager();
+
+ private ReceiverMonitorHolder() {}
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
new file mode 100644
index 0000000..55427e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.recovery;
+
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class ReceiverLog {
+ private BufferedWriter bw;
+
+ public ReceiverLog() throws IOException {
+ File logFile = new File(SenderConf.sysDir, SyncConstant.RECEIVER_LOG_NAME);
+ if (!logFile.getParentFile().exists()) {
+ logFile.getParentFile().mkdirs();
+ }
+ bw = new BufferedWriter(new FileWriter(logFile, true));
+ }
+
+ public void startPipeServer() throws IOException {
+ bw.write("on");
+ bw.newLine();
+ bw.flush();
+ }
+
+ public void stopPipeServer() throws IOException {
+ bw.write("off");
+ bw.newLine();
+ bw.flush();
+ }
+
+ public void createPipe(String pipeName, String remoteIp, long time) throws IOException {
+ writeLog(pipeName, remoteIp, PipeStatus.RUNNING, time);
+ }
+
+ public void startPipe(String pipeName, String remoteIp) throws IOException {
+ writeLog(pipeName, remoteIp, PipeStatus.RUNNING);
+ }
+
+ public void stopPipe(String pipeName, String remoteIp) throws IOException {
+ writeLog(pipeName, remoteIp, PipeStatus.PAUSE);
+ }
+
+ public void dropPipe(String pipeName, String remoteIp) throws IOException {
+ writeLog(pipeName, remoteIp, PipeStatus.DROP);
+ }
+
+ private void writeLog(String pipeName, String remoteIp, PipeStatus status, long time)
+ throws IOException {
+ bw.write(String.format("%s,%s,%s,%d", pipeName, remoteIp, status, time));
+ bw.newLine();
+ bw.flush();
+ }
+
+ private void writeLog(String pipeName, String remoteIp, PipeStatus status) throws IOException {
+ bw.write(String.format("%s,%s,%s", pipeName, remoteIp, status));
+ bw.newLine();
+ bw.flush();
+ }
+
+ public void close() throws IOException {
+ bw.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
new file mode 100644
index 0000000..50e6724
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.recovery;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ReceiverLogAnalyzer {
+ private static final Logger logger = LoggerFactory.getLogger(ReceiverLogAnalyzer.class);
+ private static boolean pipeServerEnable =
+ false; // record recovery result of receiver server status
+ private static Map<String, Map<String, PipeInfo>> pipeInfoMap =
+ new HashMap<>(); // record recovery result of receiver server status
+
+ public static void scan() throws StartupException {
+ logger.info("Start to recover all sync state for sync receiver.");
+ pipeInfoMap = new HashMap<>();
+ pipeServerEnable = false;
+ File logFile = new File(SenderConf.sysDir, SyncConstant.RECEIVER_LOG_NAME);
+ try (BufferedReader loadReader = new BufferedReader(new FileReader(logFile))) {
+ String line;
+ int lineNum = 0;
+ while ((line = loadReader.readLine()) != null) {
+ lineNum++;
+ try {
+ analyzeLog(line);
+ } catch (Exception e) {
+ logger.error("Receiver log recovery error: log file parse error at line " + lineNum);
+ logger.error(e.getMessage());
+ throw new StartupException(
+ ServiceType.RECEIVER_SERVICE.getName(), "log file recover error at line " + lineNum);
+ }
+ }
+ } catch (IOException e) {
+ logger.info("Receiver log file not found");
+ }
+ }
+
+ public static boolean isPipeServerEnable() {
+ return pipeServerEnable;
+ }
+
+ public static Map<String, Map<String, PipeInfo>> getPipeInfoMap() {
+ return pipeInfoMap;
+ }
+
+ /**
+ * parse log line and load result
+ *
+ * @param logLine log line
+ */
+ private static void analyzeLog(String logLine) {
+ if (logLine.equals("on")) {
+ pipeServerEnable = true;
+ } else if (logLine.equals("off")) {
+ pipeServerEnable = false;
+ } else {
+ String[] items = logLine.split(",");
+ String pipeName = items[0];
+ String remoteIp = items[1];
+ PipeStatus status = PipeStatus.valueOf(items[2]);
+ if (status == PipeStatus.RUNNING) {
+ if (!pipeInfoMap.containsKey(pipeName)) {
+ pipeInfoMap.put(pipeName, new HashMap<>());
+ }
+ if (items.length == 4) {
+ // create
+ pipeInfoMap
+ .get(pipeName)
+ .put(remoteIp, new PipeInfo(pipeName, remoteIp, status, Long.parseLong(items[3])));
+ } else {
+ pipeInfoMap.get(pipeName).get(remoteIp).setStatus(status);
+ }
+ } else {
+ pipeInfoMap.get(pipeName).get(remoteIp).setStatus(status);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 19c8715..9e3026e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -201,6 +201,9 @@ public class SQLConstant {
public static final int TOK_STOP_PIPE = 206;
public static final int TOK_START_PIPE = 207;
public static final int TOK_DROP_PIPE = 208;
+ public static final int TOK_SHOW_PIPE_SERVER = 209;
+ public static final int TOK_PIPE_SERVER_START = 210;
+ public static final int TOK_PIPE_SERVER_STOP = 211;
public static final Map<Integer, String> tokenNames = new HashMap<>();
@@ -291,6 +294,9 @@ public class SQLConstant {
tokenNames.put(TOK_STOP_PIPE, "TOK_STOP_PIPE");
tokenNames.put(TOK_START_PIPE, "TOK_START_PIPE");
tokenNames.put(TOK_DROP_PIPE, "TOK_DROP_PIPE");
+ tokenNames.put(TOK_SHOW_PIPE_SERVER, "TOK_SHOW_PIPE_SERVER");
+ tokenNames.put(TOK_PIPE_SERVER_START, "TOK_PIPE_SERVER_START");
+ tokenNames.put(TOK_PIPE_SERVER_STOP, "TOK_PIPE_SERVER_STOP");
}
public static boolean isReservedPath(PartialPath pathStr) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index f4b3c75..2946cd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.newsync.receiver.ReceiverService;
import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
import org.apache.iotdb.db.newsync.sender.pipe.PipeSink;
import org.apache.iotdb.db.newsync.sender.service.SenderService;
@@ -124,6 +125,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowLockInfoPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
@@ -419,6 +421,10 @@ public class PlanExecutor implements IPlanExecutor {
case DROP_PIPESINK:
dropPipeSink((DropPipeSinkPlan) plan);
return true;
+ case START_PIPE_SERVER:
+ return operateStartPipeServer();
+ case STOP_PIPE_SERVER:
+ return operateStopPipeServer();
case CREATE_PIPE:
createPipe((CreatePipePlan) plan);
return true;
@@ -433,6 +439,14 @@ public class PlanExecutor implements IPlanExecutor {
}
}
+ private boolean operateStopPipeServer() {
+ return ReceiverService.getInstance().stopPipeServer();
+ }
+
+ private boolean operateStartPipeServer() {
+ return ReceiverService.getInstance().startPipeServer();
+ }
+
private boolean createTemplate(CreateTemplatePlan createTemplatePlan)
throws QueryProcessException {
try {
@@ -718,6 +732,8 @@ public class PlanExecutor implements IPlanExecutor {
return processShowPipeSink((ShowPipeSinkPlan) showPlan);
case PIPESINKTYPE:
return processShowPipeSinkType();
+ case PIPESERVER:
+ return processShowPipeServer((ShowPipeServerPlan) showPlan);
case PIPE:
return processShowPipes((ShowPipePlan) showPlan);
default:
@@ -725,6 +741,10 @@ public class PlanExecutor implements IPlanExecutor {
}
}
+ private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) {
+ return ReceiverService.getInstance().showPipe(plan);
+ }
+
private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
int num =
getNodesNumInGivenLevel(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 522d3f8..2a82e6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -193,9 +193,7 @@ public abstract class Operator {
PRUNE_TEMPLATE,
APPEND_TEMPLATE,
DROP_TEMPLATE,
-
SHOW_QUERY_RESOURCE,
-
CREATE_PIPESINK,
DROP_PIPESINK,
SHOW_PIPESINK,
@@ -204,6 +202,8 @@ public abstract class Operator {
SHOW_PIPE,
STOP_PIPE,
START_PIPE,
- DROP_PIPE
+ DROP_PIPE,
+ START_PIPE_SERVER,
+ STOP_PIPE_SERVER,
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
new file mode 100644
index 0000000..29801d1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class ShowPipeServerOperator extends ShowOperator {
+
+ private String pipeName;
+
+ public ShowPipeServerOperator(String pipeName, int tokenIntType) {
+ this(tokenIntType);
+ this.pipeName = pipeName;
+ }
+
+ public ShowPipeServerOperator(int tokenIntType) {
+ super(tokenIntType);
+ }
+
+ @Override
+ public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+ throws QueryProcessException {
+ if (StringUtils.isEmpty(pipeName)) {
+ return new ShowPipeServerPlan();
+ } else {
+ return new ShowPipeServerPlan(pipeName);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java
new file mode 100644
index 0000000..e23833a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class StartPipeServerOperator extends Operator {
+ public StartPipeServerOperator(int tokenIntType) {
+ super(tokenIntType);
+ operatorType = OperatorType.START_PIPE_SERVER;
+ }
+
+ @Override
+ public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+ throws QueryProcessException {
+ return new StartPipeServerPlan();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java
new file mode 100644
index 0000000..f5bcdb2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class StopPipeServerOperator extends Operator {
+ public StopPipeServerOperator(int tokenIntType) {
+ super(tokenIntType);
+ operatorType = OperatorType.START_PIPE_SERVER;
+ }
+
+ @Override
+ public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+ throws QueryProcessException {
+ return new StopPipeServerPlan();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 91cc485..0b59db5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -68,7 +68,9 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
@@ -483,6 +485,12 @@ public abstract class PhysicalPlan {
case SET_SYSTEM_MODE:
plan = new SetSystemModePlan();
break;
+ case START_PIPE_SERVER:
+ plan = new StartPipeServerPlan();
+ break;
+ case STOP_PIPE_SERVER:
+ plan = new StopPipeServerPlan();
+ break;
default:
throw new IOException("unrecognized log type " + type);
}
@@ -553,6 +561,8 @@ public abstract class PhysicalPlan {
UNSET_TEMPLATE,
APPEND_TEMPLATE,
PRUNE_TEMPLATE,
+ START_PIPE_SERVER,
+ STOP_PIPE_SERVER,
DROP_TEMPLATE
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
new file mode 100644
index 0000000..955bdf5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+public class ShowPipeServerPlan extends ShowPlan {
+
+ private String pipeName;
+
+ public ShowPipeServerPlan(String pipeName) {
+ super(ShowContentType.PIPESERVER);
+ this.pipeName = pipeName;
+ }
+
+ public ShowPipeServerPlan() {
+ super(ShowContentType.PIPESERVER);
+ }
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public void setPipeName(String pipeName) {
+ this.pipeName = pipeName;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 3d3c00f..961479d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -120,6 +120,7 @@ public class ShowPlan extends PhysicalPlan {
QUERY_RESOURCE,
PIPESINK,
PIPESINKTYPE,
- PIPE
+ PIPE,
+ PIPESERVER
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
new file mode 100644
index 0000000..a08e5b6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class StartPipeServerPlan extends PhysicalPlan {
+
+ public StartPipeServerPlan() {
+ super(Operator.OperatorType.START_PIPE_SERVER);
+ canBeSplit = false;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeByte((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
+ }
+
+ @Override
+ public void serializeImpl(ByteBuffer buffer) {
+ buffer.put((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) throws IllegalPathException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
new file mode 100644
index 0000000..45f9270
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class StopPipeServerPlan extends PhysicalPlan {
+
+ public StopPipeServerPlan() {
+ super(Operator.OperatorType.STOP_PIPE_SERVER);
+ canBeSplit = false;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeByte((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
+ }
+
+ @Override
+ public void serializeImpl(ByteBuffer buffer) {
+ buffer.put((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) throws IllegalPathException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 273b774..fc923cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -2282,6 +2282,25 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
}
}
+ @Override
+ public Operator visitStartPipeServer(IoTDBSqlParser.StartPipeServerContext ctx) {
+ return new StartPipeServerOperator(SQLConstant.TOK_PIPE_SERVER_START);
+ }
+
+ @Override
+ public Operator visitStopPipeServer(IoTDBSqlParser.StopPipeServerContext ctx) {
+ return new StopPipeServerOperator(SQLConstant.TOK_PIPE_SERVER_STOP);
+ }
+
+ @Override
+ public Operator visitShowPipeServer(IoTDBSqlParser.ShowPipeServerContext ctx) {
+ if (ctx.pipeName != null) {
+ return new ShowPipeServerOperator(ctx.pipeName.getText(), SQLConstant.TOK_SHOW_PIPE_SERVER);
+ } else {
+ return new ShowPipeServerOperator(SQLConstant.TOK_SHOW_PIPE_SERVER);
+ }
+ }
+
/** 7. Common Clauses */
// IoTDB Objects
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 7ae98e3..80c236f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.exception.ConfigurationException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.newsync.receiver.ReceiverService;
import org.apache.iotdb.db.newsync.sender.service.SenderService;
import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
import org.apache.iotdb.db.protocol.rest.RestService;
@@ -134,6 +135,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(UDFClassLoaderManager.getInstance());
registerManager.register(UDFRegistrationService.getInstance());
+ registerManager.register(ReceiverService.getInstance());
registerManager.register(MetricsService.getInstance());
// in cluster mode, RPC service is not enabled.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index eb82e29..bf8946e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -39,6 +39,7 @@ public enum ServiceType {
UPGRADE_SERVICE("UPGRADE DataService", ""),
SETTLE_SERVICE("SETTLE DataService", ""),
SENDER_SERVICE("Sync Sender service", ""),
+ RECEIVER_SERVICE("Sync Receiver service", ""),
MERGE_SERVICE("Merge Manager", "Merge Manager"),
COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
new file mode 100644
index 0000000..081e01c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManagerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.manager;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ReceiverManagerTest {
+ private static final String pipe1 = "pipe1";
+ private static final String pipe2 = "pipe2";
+ private static final String ip1 = "192.168.1.11";
+ private static final String ip2 = "192.168.2.22";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void test() {
+ try {
+ ReceiverManager manager = ReceiverManager.getInstance();
+ manager.startServer();
+ manager.createPipe(pipe1, ip1, 1);
+ manager.createPipe(pipe2, ip2, 2);
+ manager.createPipe(pipe1, ip2, 3);
+ manager.stopPipe(pipe1, ip1);
+ manager.stopPipe(pipe2, ip2);
+ manager.dropPipe(pipe1, ip2);
+ manager.startPipe(pipe1, ip1);
+ List<PipeInfo> allPipeInfos = manager.getAllPipeInfos();
+ Assert.assertEquals(3, allPipeInfos.size());
+ List<PipeInfo> pipeInfos1 = manager.getPipeInfos(pipe1);
+ List<PipeInfo> pipeInfos2 = manager.getPipeInfos(pipe2);
+ Assert.assertEquals(2, pipeInfos1.size());
+ Assert.assertEquals(1, pipeInfos2.size());
+ for (PipeInfo pipeInfo : pipeInfos2) {
+ Assert.assertEquals(new PipeInfo(pipe2, ip2, PipeStatus.PAUSE, 2), pipeInfo);
+ }
+ for (PipeInfo pipeInfo : pipeInfos1) {
+ if (pipeInfo.getRemoteIp().equals(ip1)) {
+ Assert.assertEquals(new PipeInfo(pipe1, ip1, PipeStatus.RUNNING, 1), pipeInfo);
+ } else {
+ Assert.assertEquals(new PipeInfo(pipe1, ip2, PipeStatus.DROP, 3), pipeInfo);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail();
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java
new file mode 100644
index 0000000..536792a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.receiver.recovery;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** This test is for ReceiverLog and ReceiverLogAnalyzer */
+public class ReceiverLogAnalyzerTest {
+
+ private static final String pipe1 = "pipe1";
+ private static final String pipe2 = "pipe2";
+ private static final String ip1 = "192.168.1.11";
+ private static final String ip2 = "192.168.2.22";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void test() {
+ try {
+ ReceiverLog log = new ReceiverLog();
+ log.startPipeServer();
+ log.createPipe(pipe1, ip1, 1);
+ log.createPipe(pipe2, ip2, 2);
+ log.createPipe(pipe1, ip2, 3);
+ log.stopPipe(pipe1, ip1);
+ log.stopPipeServer();
+ log.startPipeServer();
+ log.stopPipe(pipe2, ip2);
+ log.dropPipe(pipe1, ip2);
+ log.startPipe(pipe1, ip1);
+ ReceiverLogAnalyzer.scan();
+ Map<String, Map<String, PipeInfo>> map = ReceiverLogAnalyzer.getPipeInfoMap();
+ Assert.assertTrue(ReceiverLogAnalyzer.isPipeServerEnable());
+ Assert.assertNotNull(map);
+ Assert.assertEquals(2, map.get(pipe1).size());
+ Assert.assertEquals(1, map.get(pipe2).size());
+ Assert.assertEquals(1, map.get(pipe2).size());
+ Assert.assertEquals(new PipeInfo(pipe2, ip2, PipeStatus.PAUSE, 2), map.get(pipe2).get(ip2));
+ Assert.assertEquals(new PipeInfo(pipe1, ip1, PipeStatus.RUNNING, 1), map.get(pipe1).get(ip1));
+ Assert.assertEquals(new PipeInfo(pipe1, ip2, PipeStatus.DROP, 3), map.get(pipe1).get(ip2));
+ log.close();
+ } catch (Exception e) {
+ Assert.fail();
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
index ea34f3b..af6a033 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -53,9 +54,12 @@ import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowContinuousQueriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
+import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
@@ -1210,6 +1214,36 @@ public class PhysicalPlanTest {
}
@Test
+ public void testShowPipeServer() throws QueryProcessException {
+ String sql1 = "SHOW PIPESERVER abc";
+ ShowPipeServerPlan plan1 = (ShowPipeServerPlan) processor.parseSQLToPhysicalPlan(sql1);
+ Assert.assertTrue(plan1.isQuery());
+ Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER, plan1.getShowContentType());
+ Assert.assertNotNull(plan1.getPipeName());
+ String sql2 = "SHOW PIPESERVER";
+ ShowPipeServerPlan plan2 = (ShowPipeServerPlan) processor.parseSQLToPhysicalPlan(sql2);
+ Assert.assertTrue(plan2.isQuery());
+ Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER, plan2.getShowContentType());
+ Assert.assertNull(plan2.getPipeName());
+ }
+
+ @Test
+ public void testStartPipeServer() throws QueryProcessException {
+ String sql = "START PIPESERVER";
+ StartPipeServerPlan plan = (StartPipeServerPlan) processor.parseSQLToPhysicalPlan(sql);
+ Assert.assertFalse(plan.isQuery());
+ Assert.assertEquals(Operator.OperatorType.START_PIPE_SERVER, plan.getOperatorType());
+ }
+
+ @Test
+ public void testStopPipeServer() throws QueryProcessException {
+ String sql = "STOP PIPESERVER";
+ StopPipeServerPlan plan = (StopPipeServerPlan) processor.parseSQLToPhysicalPlan(sql);
+ Assert.assertFalse(plan.isQuery());
+ Assert.assertEquals(OperatorType.STOP_PIPE_SERVER, plan.getOperatorType());
+ }
+
+ @Test
public void testCreateCQ1() throws QueryProcessException {
String sql =
"CREATE CONTINUOUS QUERY cq1 BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* GROUP BY time(10s) END";