You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/24 01:09:50 UTC
[iotdb] branch master updated: [IOTDB-4641] New Standalone Sync Receiver TsFile Loader Implement (#7610)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ca7b2b8311 [IOTDB-4641] New Standalone Sync Receiver TsFile Loader Implement (#7610)
ca7b2b8311 is described below
commit ca7b2b8311d57842e3bb5520658beeb2dc93613d
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Mon Oct 24 09:09:43 2022 +0800
[IOTDB-4641] New Standalone Sync Receiver TsFile Loader Implement (#7610)
---
.../apache/iotdb/db/it/sync/SyncTransportTest.java | 323 +++++++++++++++++++
.../plan/statement/crud/LoadTsFileStatement.java | 4 +
.../db/sync/receiver/load/DeletionLoader.java | 52 ++-
.../iotdb/db/sync/receiver/load/ILoader.java | 20 ++
.../iotdb/db/sync/receiver/load/TsFileLoader.java | 58 +++-
.../iotdb/db/sync/transport/SyncTransportTest.java | 357 ---------------------
6 files changed, 446 insertions(+), 368 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/SyncTransportTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/SyncTransportTest.java
new file mode 100644
index 0000000000..64a091a911
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/SyncTransportTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.it.sync;
+
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@Ignore
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class SyncTransportTest {
+
+ // private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ // /** create tsfile and move to tmpDir for sync test */
+ // File tmpDir = new File("target/synctest");
+ //
+ // String pipeName1 = "pipe1";
+ // String remoteIp1;
+ // long createdTime1 = System.currentTimeMillis();
+ // File fileDir;
+ //
+ // File tsfile;
+ // File resourceFile;
+ // File modsFile;
+ //
+ // @Before
+ // public void setUp() throws Exception {
+ // EnvironmentUtils.envSetUp();
+ // remoteIp1 = "127.0.0.1";
+ // fileDir = new File(SyncPathUtil.getReceiverFileDataDir(pipeName1, remoteIp1, createdTime1));
+ // prepareData();
+ // EnvironmentUtils.shutdownDaemon();
+ // File srcDir =
+ // new File(
+ // IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]
+ // + File.separator
+ // + "sequence"
+ // + File.separator
+ // + "root.vehicle"
+ // + File.separator
+ // + "0"
+ // + File.separator
+ // + "0");
+ // if (tmpDir.exists()) {
+ // FileUtils.deleteDirectory(tmpDir);
+ // }
+ // FileUtils.moveDirectory(srcDir, tmpDir);
+ // tsfile = null;
+ // resourceFile = null;
+ // modsFile = null;
+ // File[] fileList = tmpDir.listFiles();
+ // for (File f : fileList) {
+ // if (f.getName().endsWith(".tsfile")) {
+ // tsfile = f;
+ // } else if (f.getName().endsWith(".mods")) {
+ // modsFile = f;
+ // } else if (f.getName().endsWith(".resource")) {
+ // resourceFile = f;
+ // }
+ // }
+ // EnvironmentUtils.cleanEnv();
+ // EnvironmentUtils.envSetUp();
+ // }
+ //
+ // @After
+ // public void tearDown() throws Exception {
+ // FileUtils.deleteDirectory(tmpDir);
+ // EnvironmentUtils.cleanEnv();
+ // }
+ //
+ // @Test
+ // public void testTransportFile() throws Exception {
+ // TSyncIdentityInfo identityInfo =
+ // new TSyncIdentityInfo("127.0.0.1", pipeName1, createdTime1, config.getIoTDBVersion(),
+ // "");
+ // try (TTransport transport =
+ // RpcTransportFactory.INSTANCE.getTransport(
+ // new TSocket(
+ // TConfigurationConst.defaultTConfiguration,
+ // "127.0.0.1",
+ // 6667,
+ // SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+ // SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
+ // TProtocol protocol;
+ // if (config.isRpcThriftCompressionEnable()) {
+ // protocol = new TCompactProtocol(transport);
+ // } else {
+ // protocol = new TBinaryProtocol(transport);
+ // }
+ // IClientRPCService.Client serviceClient = new IClientRPCService.Client(protocol);
+ // // Underlay socket open.
+ // if (!transport.isOpen()) {
+ // transport.open();
+ // }
+ // byte[] buffer = new byte[10];
+ // try (RandomAccessFile randomAccessFile = new RandomAccessFile(tsfile, "rw")) {
+ // // no handshake, response TException
+ // try {
+ // serviceClient.sendFile(
+ // new TSyncTransportMetaInfo(tsfile.getName(), 0), ByteBuffer.wrap(buffer));
+ // Assert.fail();
+ // } catch (TException e) {
+ // // do nothing
+ // }
+ // serviceClient.handshake(identityInfo);
+ // // response REBASE:0
+ // randomAccessFile.read(buffer, 0, 10);
+ // TSStatus tsStatus1 =
+ // serviceClient.sendFile(
+ // new TSyncTransportMetaInfo(tsfile.getName(), 1), ByteBuffer.wrap(buffer));
+ // Assert.assertEquals(tsStatus1.getCode(), TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+ // Assert.assertEquals(tsStatus1.getMessage(), "0");
+ // // response SUCCESS
+ // TSStatus tsStatus2 =
+ // serviceClient.sendFile(
+ // new TSyncTransportMetaInfo(tsfile.getName(), 0), ByteBuffer.wrap(buffer));
+ // Assert.assertEquals(tsStatus2.getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ // // response response REBASE:10
+ // TSStatus tsStatus3 =
+ // serviceClient.sendFile(
+ // new TSyncTransportMetaInfo(tsfile.getName(), 0), ByteBuffer.wrap(buffer));
+ // Assert.assertEquals(tsStatus3.getCode(), TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+ // Assert.assertEquals(tsStatus3.getMessage(), "10");
+ // TSStatus tsStatus4 =
+ // serviceClient.sendFile(
+ // new TSyncTransportMetaInfo(tsfile.getName(), 100), ByteBuffer.wrap(buffer));
+ // Assert.assertEquals(tsStatus4.getCode(), TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
+ // Assert.assertEquals(tsStatus4.getMessage(), "10");
+ // // response SUCCESS
+ // byte[] remainBuffer = new byte[(int) (randomAccessFile.length() - 10)];
+ // randomAccessFile.read(remainBuffer, 0, (int) (randomAccessFile.length() - 10));
+ // TSStatus tsStatus5 =
+ // serviceClient.sendFile(
+ // new TSyncTransportMetaInfo(tsfile.getName(), 10),
+ // ByteBuffer.wrap(remainBuffer));
+ // Assert.assertEquals(tsStatus5.getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ // }
+ // }
+ // // check completeness of file
+ // File receiveFile =
+ // new File(
+ // SyncPathUtil.getFileDataDirPath(identityInfo),
+ // tsfile.getName() + SyncConstant.PATCH_SUFFIX);
+ // Assert.assertTrue(receiveFile.exists());
+ //
+ // try (RandomAccessFile originFileRAF = new RandomAccessFile(tsfile, "r");
+ // RandomAccessFile receiveFileRAF = new RandomAccessFile(receiveFile, "r")) {
+ // Assert.assertEquals(originFileRAF.length(), receiveFileRAF.length());
+ // byte[] buffer1 = new byte[(int) originFileRAF.length()];
+ // byte[] buffer2 = new byte[(int) receiveFile.length()];
+ // originFileRAF.read(buffer1);
+ // receiveFileRAF.read(buffer2);
+ // Assert.assertArrayEquals(buffer1, buffer2);
+ // }
+ // }
+ //
+ // @Test
+ // public void testTransportPipeData() throws Exception {
+ // try (TTransport transport =
+ // RpcTransportFactory.INSTANCE.getTransport(
+ // new TSocket(
+ // TConfigurationConst.defaultTConfiguration,
+ // "127.0.0.1",
+ // 6667,
+ // SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+ // SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
+ // TProtocol protocol;
+ // if (config.isRpcThriftCompressionEnable()) {
+ // protocol = new TCompactProtocol(transport);
+ // } else {
+ // protocol = new TBinaryProtocol(transport);
+ // }
+ // IClientRPCService.Client serviceClient = new IClientRPCService.Client(protocol);
+ // // Underlay socket open.
+ // if (!transport.isOpen()) {
+ // transport.open();
+ // }
+ // PipeData pipeData =
+ // new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("root.sg1")), 0);
+ // byte[] buffer = pipeData.serialize();
+ // ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
+ // try {
+ // TSStatus tsStatus = serviceClient.sendPipeData(buffToSend);
+ // Assert.fail();
+ // } catch (TException e) {
+ // // do nothing
+ // }
+ // serviceClient.handshake(
+ // new TSyncIdentityInfo(
+ // "127.0.0.1", pipeName1, createdTime1, config.getIoTDBVersion(), "root.sg1"));
+ // TSStatus tsStatus = serviceClient.sendPipeData(buffToSend);
+ // Assert.assertEquals(tsStatus.getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ // }
+ // }
+ //
+ // @Test
+ // public void testSyncClient() throws Exception {
+ // // 1. prepare fake file
+ // Assert.assertNotNull(tsfile);
+ // Assert.assertNotNull(modsFile);
+ // Assert.assertNotNull(resourceFile);
+ //
+ // // 2. prepare pipelog and pipeDataQueue
+ // int serialNum = 0;
+ // List<PipeData> pipeDataList = new ArrayList<>();
+ // pipeDataList.add(
+ // new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("root.vehicle")),
+ // serialNum++));
+ // pipeDataList.add(
+ // new SchemaPipeData(
+ // new CreateTimeSeriesPlan(
+ // new PartialPath("root.vehicle.d0.s0"),
+ // new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)),
+ // serialNum++));
+ // TsFilePipeData tsFilePipeData = new TsFilePipeData(tsfile.getPath(), serialNum++);
+ // pipeDataList.add(tsFilePipeData);
+ // Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
+ // pipeDataList.add(new DeletionPipeData(deletion, serialNum++));
+ //
+ // // 3. start client
+ // Pipe pipe = new TsFilePipe(createdTime1, pipeName1, new IoTDBPipeSink("sink"), 0, false);
+ // IoTDBSyncClient client =
+ // new IoTDBSyncClient(
+ // pipe,
+ // "127.0.0.1",
+ // IoTDBDescriptor.getInstance().getConfig().getRpcPort(),
+ // "127.0.0.1",
+ // "root.vehicle");
+ // client.handshake();
+ // for (PipeData pipeData : pipeDataList) {
+ // client.send(pipeData);
+ // }
+ //
+ // // 4. check result
+ // checkResult(
+ // "select ** from root.vehicle",
+ // new String[] {"Time", "root.vehicle.d0.s0"},
+ // new String[] {"2,2"});
+ // }
+ //
+ // private void prepareData() throws Exception {
+ // Session session =
+ // new Session.Builder()
+ // .host("127.0.0.1")
+ // .port(6667)
+ // .username("root")
+ // .password("root")
+ // .version(Version.V_0_13)
+ // .build();
+ // try {
+ // session.open(false);
+ //
+ // // set session fetchSize
+ // session.setFetchSize(10000);
+ // session.setStorageGroup("root.vehicle");
+ //
+ // List<String> measurements = Collections.singletonList("s0");
+ // List<TSDataType> types = Collections.singletonList(TSDataType.INT32);
+ // session.insertRecord("root.vehicle.d0", 1, measurements, types,
+ // Collections.singletonList(1));
+ // session.insertRecord("root.vehicle.d0", 2, measurements, types,
+ // Collections.singletonList(2));
+ // session.insertRecord(
+ // "root.vehicle.d0", 35, measurements, types, Collections.singletonList(35));
+ // session.executeNonQueryStatement("flush");
+ // session.executeNonQueryStatement("delete from root.vehicle.d0.s0 where time<2");
+ // } finally {
+ // session.close();
+ // }
+ // }
+ //
+ // private void checkResult(String sql, String[] columnNames, String[] retArray) throws Exception
+ // {
+ // Session session =
+ // new Session.Builder()
+ // .host("127.0.0.1")
+ // .port(6667)
+ // .username("root")
+ // .password("root")
+ // .version(Version.V_0_13)
+ // .build();
+ // try {
+ // session.open(false);
+ // // set session fetchSize
+ // session.setFetchSize(10000);
+ // try (SessionDataSet dataSet = session.executeQueryStatement(sql)) {
+ // Assert.assertArrayEquals(columnNames, dataSet.getColumnNames().toArray(new String[0]));
+ // List<String> actualRetArray = new ArrayList<>();
+ // while (dataSet.hasNext()) {
+ // RowRecord rowRecord = dataSet.next();
+ // StringBuilder rowString = new StringBuilder(rowRecord.getTimestamp() + ",");
+ // rowRecord.getFields().forEach(i -> rowString.append(i.getStringValue()));
+ // actualRetArray.add(rowString.toString());
+ // }
+ // Assert.assertArrayEquals(retArray, actualRetArray.toArray(new String[0]));
+ // }
+ // } finally {
+ // session.close();
+ // }
+ // }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
index 0b076b00be..e0d492dd03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
@@ -103,6 +103,10 @@ public class LoadTsFileStatement extends Statement {
this.verifySchema = verifySchema;
}
+ public void setAutoCreateSchema(boolean autoCreateSchema) {
+ this.autoCreateSchema = autoCreateSchema;
+ }
+
public boolean isVerifySchema() {
return verifySchema;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/DeletionLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/DeletionLoader.java
index f8e4f22e4d..2be3303e59 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/DeletionLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/DeletionLoader.java
@@ -20,11 +20,26 @@ package org.apache.iotdb.db.sync.receiver.load;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
/** This loader is used to load deletion plan. */
public class DeletionLoader implements ILoader {
+ private static final Logger logger = LoggerFactory.getLogger(DeletionLoader.class);
private Deletion deletion;
@@ -38,10 +53,43 @@ public class DeletionLoader implements ILoader {
throw new PipeDataLoadException("storage engine readonly");
}
try {
- StorageEngine.getInstance()
- .delete(deletion.getPath(), deletion.getStartTime(), deletion.getEndTime(), 0, null);
+ if (!config.isMppMode()) {
+ StorageEngine.getInstance()
+ .delete(deletion.getPath(), deletion.getStartTime(), deletion.getEndTime(), 0, null);
+ return;
+ }
+
+ Statement statement = generateStatement();
+ long queryId = SessionManager.getInstance().requestQueryId(false);
+ ExecutionResult result =
+ Coordinator.getInstance()
+ .execute(
+ statement,
+ queryId,
+ null,
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.error(String.format("Delete %s error, statement: %s.", deletion, statement));
+ logger.error(String.format("Delete result status : %s.", result.status));
+ throw new LoadFileException(
+ String.format("Can not execute delete statement: %s", statement));
+ }
} catch (Exception e) {
throw new PipeDataLoadException(e.getMessage());
}
}
+
+ private Statement generateStatement() {
+ if (deletion.getStartTime() == Long.MIN_VALUE && deletion.getEndTime() == Long.MAX_VALUE) {
+ return new DeleteTimeSeriesStatement(Collections.singletonList(deletion.getPath()));
+ }
+ DeleteDataStatement statement = new DeleteDataStatement();
+ statement.setPathList(Collections.singletonList(deletion.getPath()));
+ statement.setDeleteStartTime(deletion.getStartTime());
+ statement.setDeleteEndTime(deletion.getEndTime());
+ return statement;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoader.java
index e13d263ce6..36ba0da5ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoader.java
@@ -19,11 +19,31 @@
package org.apache.iotdb.db.sync.receiver.load;
import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
/**
* This interface is used to load files, including tsFile, syncTask, schema, modsFile and
* deletePlan.
*/
public interface ILoader {
+
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ IPartitionFetcher PARTITION_FETCHER =
+ config.isClusterMode()
+ ? ClusterPartitionFetcher.getInstance()
+ : StandalonePartitionFetcher.getInstance();
+
+ ISchemaFetcher SCHEMA_FETCHER =
+ config.isClusterMode()
+ ? ClusterSchemaFetcher.getInstance()
+ : StandaloneSchemaFetcher.getInstance();
+
void load() throws PipeDataLoadException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
index f5e73abb9d..14bb3a608a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
@@ -18,13 +18,21 @@
*/
package org.apache.iotdb.db.sync.receiver.load;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,17 +64,49 @@ public class TsFileLoader implements ILoader {
@Override
public void load() throws PipeDataLoadException {
try {
- PhysicalPlan plan =
- new OperateFilePlan(
- tsFile,
- Operator.OperatorType.LOAD_FILES,
- true,
- IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(),
- true,
- true);
- planExecutor.processNonQuery(plan);
+ if (!config.isMppMode()) {
+ PhysicalPlan plan =
+ new OperateFilePlan(
+ tsFile,
+ Operator.OperatorType.LOAD_FILES,
+ true,
+ IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(),
+ true,
+ true);
+ planExecutor.processNonQuery(plan);
+ return;
+ }
+
+ LoadTsFileStatement statement = new LoadTsFileStatement(tsFile.getAbsolutePath());
+ statement.setDeleteAfterLoad(true);
+ statement.setSgLevel(parseSgLevel());
+ statement.setVerifySchema(true);
+ statement.setAutoCreateSchema(true);
+
+ long queryId = SessionManager.getInstance().requestQueryId(false);
+ ExecutionResult result =
+ Coordinator.getInstance()
+ .execute(
+ statement,
+ queryId,
+ null,
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER,
+ IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logger.error(
+ String.format("Load TsFile %s error, statement: %s.", tsFile.getPath(), statement));
+ logger.error(String.format("Load TsFile result status : %s.", result.status));
+ throw new LoadFileException(
+ String.format("Can not execute load TsFile statement: %s", statement));
+ }
} catch (Exception e) {
throw new PipeDataLoadException(e.getMessage());
}
}
+
+ private int parseSgLevel() throws IllegalPathException {
+ return new PartialPath(storageGroup).getNodeLength() - 1;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
deleted file mode 100644
index 1c4ac95038..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
-import org.apache.iotdb.commons.sync.utils.SyncConstant;
-import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
-import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
-import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSyncClient;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
-import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
-import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.session.util.Version;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class SyncTransportTest {
-
- private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- /** create tsfile and move to tmpDir for sync test */
- File tmpDir = new File("target/synctest");
-
- String pipeName1 = "pipe1";
- String remoteIp1;
- long createdTime1 = System.currentTimeMillis();
- File fileDir;
-
- File tsfile;
- File resourceFile;
- File modsFile;
-
- @Before
- public void setUp() throws Exception {
- EnvironmentUtils.envSetUp();
- remoteIp1 = "127.0.0.1";
- fileDir = new File(SyncPathUtil.getReceiverFileDataDir(pipeName1, remoteIp1, createdTime1));
- prepareData();
- EnvironmentUtils.shutdownDaemon();
- File srcDir =
- new File(
- IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]
- + File.separator
- + "sequence"
- + File.separator
- + "root.vehicle"
- + File.separator
- + "0"
- + File.separator
- + "0");
- if (tmpDir.exists()) {
- FileUtils.deleteDirectory(tmpDir);
- }
- FileUtils.moveDirectory(srcDir, tmpDir);
- tsfile = null;
- resourceFile = null;
- modsFile = null;
- File[] fileList = tmpDir.listFiles();
- for (File f : fileList) {
- if (f.getName().endsWith(".tsfile")) {
- tsfile = f;
- } else if (f.getName().endsWith(".mods")) {
- modsFile = f;
- } else if (f.getName().endsWith(".resource")) {
- resourceFile = f;
- }
- }
- EnvironmentUtils.cleanEnv();
- EnvironmentUtils.envSetUp();
- }
-
- @After
- public void tearDown() throws Exception {
- FileUtils.deleteDirectory(tmpDir);
- EnvironmentUtils.cleanEnv();
- }
-
- @Test
- public void testTransportFile() throws Exception {
- TSyncIdentityInfo identityInfo =
- new TSyncIdentityInfo("127.0.0.1", pipeName1, createdTime1, config.getIoTDBVersion(), "");
- try (TTransport transport =
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- "127.0.0.1",
- 6667,
- SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
- SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
- TProtocol protocol;
- if (config.isRpcThriftCompressionEnable()) {
- protocol = new TCompactProtocol(transport);
- } else {
- protocol = new TBinaryProtocol(transport);
- }
- IClientRPCService.Client serviceClient = new IClientRPCService.Client(protocol);
- // Underlay socket open.
- if (!transport.isOpen()) {
- transport.open();
- }
- byte[] buffer = new byte[10];
- try (RandomAccessFile randomAccessFile = new RandomAccessFile(tsfile, "rw")) {
- // no handshake, response TException
- try {
- serviceClient.sendFile(
- new TSyncTransportMetaInfo(tsfile.getName(), 0), ByteBuffer.wrap(buffer));
- Assert.fail();
- } catch (TException e) {
- // do nothing
- }
- serviceClient.handshake(identityInfo);
- // response REBASE:0
- randomAccessFile.read(buffer, 0, 10);
- TSStatus tsStatus1 =
- serviceClient.sendFile(
- new TSyncTransportMetaInfo(tsfile.getName(), 1), ByteBuffer.wrap(buffer));
- Assert.assertEquals(tsStatus1.getCode(), TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
- Assert.assertEquals(tsStatus1.getMessage(), "0");
- // response SUCCESS
- TSStatus tsStatus2 =
- serviceClient.sendFile(
- new TSyncTransportMetaInfo(tsfile.getName(), 0), ByteBuffer.wrap(buffer));
- Assert.assertEquals(tsStatus2.getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
- // response response REBASE:10
- TSStatus tsStatus3 =
- serviceClient.sendFile(
- new TSyncTransportMetaInfo(tsfile.getName(), 0), ByteBuffer.wrap(buffer));
- Assert.assertEquals(tsStatus3.getCode(), TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
- Assert.assertEquals(tsStatus3.getMessage(), "10");
- TSStatus tsStatus4 =
- serviceClient.sendFile(
- new TSyncTransportMetaInfo(tsfile.getName(), 100), ByteBuffer.wrap(buffer));
- Assert.assertEquals(tsStatus4.getCode(), TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
- Assert.assertEquals(tsStatus4.getMessage(), "10");
- // response SUCCESS
- byte[] remainBuffer = new byte[(int) (randomAccessFile.length() - 10)];
- randomAccessFile.read(remainBuffer, 0, (int) (randomAccessFile.length() - 10));
- TSStatus tsStatus5 =
- serviceClient.sendFile(
- new TSyncTransportMetaInfo(tsfile.getName(), 10), ByteBuffer.wrap(remainBuffer));
- Assert.assertEquals(tsStatus5.getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
- }
- // check completeness of file
- File receiveFile =
- new File(
- SyncPathUtil.getFileDataDirPath(identityInfo),
- tsfile.getName() + SyncConstant.PATCH_SUFFIX);
- Assert.assertTrue(receiveFile.exists());
-
- try (RandomAccessFile originFileRAF = new RandomAccessFile(tsfile, "r");
- RandomAccessFile receiveFileRAF = new RandomAccessFile(receiveFile, "r")) {
- Assert.assertEquals(originFileRAF.length(), receiveFileRAF.length());
- byte[] buffer1 = new byte[(int) originFileRAF.length()];
- byte[] buffer2 = new byte[(int) receiveFile.length()];
- originFileRAF.read(buffer1);
- receiveFileRAF.read(buffer2);
- Assert.assertArrayEquals(buffer1, buffer2);
- }
- }
-
- @Test
- public void testTransportPipeData() throws Exception {
- try (TTransport transport =
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- "127.0.0.1",
- 6667,
- SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
- SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
- TProtocol protocol;
- if (config.isRpcThriftCompressionEnable()) {
- protocol = new TCompactProtocol(transport);
- } else {
- protocol = new TBinaryProtocol(transport);
- }
- IClientRPCService.Client serviceClient = new IClientRPCService.Client(protocol);
- // Underlay socket open.
- if (!transport.isOpen()) {
- transport.open();
- }
- PipeData pipeData =
- new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("root.sg1")), 0);
- byte[] buffer = pipeData.serialize();
- ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
- try {
- TSStatus tsStatus = serviceClient.sendPipeData(buffToSend);
- Assert.fail();
- } catch (TException e) {
- // do nothing
- }
- serviceClient.handshake(
- new TSyncIdentityInfo(
- "127.0.0.1", pipeName1, createdTime1, config.getIoTDBVersion(), "root.sg1"));
- TSStatus tsStatus = serviceClient.sendPipeData(buffToSend);
- Assert.assertEquals(tsStatus.getCode(), TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
- }
-
- @Test
- public void testSyncClient() throws Exception {
- // 1. prepare fake file
- Assert.assertNotNull(tsfile);
- Assert.assertNotNull(modsFile);
- Assert.assertNotNull(resourceFile);
-
- // 2. prepare pipelog and pipeDataQueue
- int serialNum = 0;
- List<PipeData> pipeDataList = new ArrayList<>();
- pipeDataList.add(
- new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("root.vehicle")), serialNum++));
- pipeDataList.add(
- new SchemaPipeData(
- new CreateTimeSeriesPlan(
- new PartialPath("root.vehicle.d0.s0"),
- new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)),
- serialNum++));
- TsFilePipeData tsFilePipeData = new TsFilePipeData(tsfile.getPath(), serialNum++);
- pipeDataList.add(tsFilePipeData);
- Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
- pipeDataList.add(new DeletionPipeData(deletion, serialNum++));
-
- // 3. start client
- Pipe pipe = new TsFilePipe(createdTime1, pipeName1, new IoTDBPipeSink("sink"), 0, false);
- IoTDBSyncClient client =
- new IoTDBSyncClient(
- pipe,
- "127.0.0.1",
- IoTDBDescriptor.getInstance().getConfig().getRpcPort(),
- "127.0.0.1",
- "root.vehicle");
- client.handshake();
- for (PipeData pipeData : pipeDataList) {
- client.send(pipeData);
- }
-
- // 4. check result
- checkResult(
- "select ** from root.vehicle",
- new String[] {"Time", "root.vehicle.d0.s0"},
- new String[] {"2,2"});
- }
-
- private void prepareData() throws Exception {
- Session session =
- new Session.Builder()
- .host("127.0.0.1")
- .port(6667)
- .username("root")
- .password("root")
- .version(Version.V_0_13)
- .build();
- try {
- session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
- session.setStorageGroup("root.vehicle");
-
- List<String> measurements = Collections.singletonList("s0");
- List<TSDataType> types = Collections.singletonList(TSDataType.INT32);
- session.insertRecord("root.vehicle.d0", 1, measurements, types, Collections.singletonList(1));
- session.insertRecord("root.vehicle.d0", 2, measurements, types, Collections.singletonList(2));
- session.insertRecord(
- "root.vehicle.d0", 35, measurements, types, Collections.singletonList(35));
- session.executeNonQueryStatement("flush");
- session.executeNonQueryStatement("delete from root.vehicle.d0.s0 where time<2");
- } finally {
- session.close();
- }
- }
-
- private void checkResult(String sql, String[] columnNames, String[] retArray) throws Exception {
- Session session =
- new Session.Builder()
- .host("127.0.0.1")
- .port(6667)
- .username("root")
- .password("root")
- .version(Version.V_0_13)
- .build();
- try {
- session.open(false);
- // set session fetchSize
- session.setFetchSize(10000);
- try (SessionDataSet dataSet = session.executeQueryStatement(sql)) {
- Assert.assertArrayEquals(columnNames, dataSet.getColumnNames().toArray(new String[0]));
- List<String> actualRetArray = new ArrayList<>();
- while (dataSet.hasNext()) {
- RowRecord rowRecord = dataSet.next();
- StringBuilder rowString = new StringBuilder(rowRecord.getTimestamp() + ",");
- rowRecord.getFields().forEach(i -> rowString.append(i.getStringValue()));
- actualRetArray.add(rowString.toString());
- }
- Assert.assertArrayEquals(retArray, actualRetArray.toArray(new String[0]));
- }
- } finally {
- session.close();
- }
- }
-}