You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/02/21 08:42:52 UTC
[iotdb] branch new_sync updated: [To new_sync][IOTDB-2272] implement customized sync process: collector (#4989)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_sync by this push:
new ac9901c [To new_sync][IOTDB-2272] implement customized sync process: collector (#4989)
ac9901c is described below
commit ac9901c2c97a6596e3224494ff8ee4c0b05ef18d
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon Feb 21 16:42:12 2022 +0800
[To new_sync][IOTDB-2272] implement customized sync process: collector (#4989)
---
.../sync/IoTDBSyncReceiverCollectorIT.java | 485 +++++++++++++++++++++
.../sync/IoTDBSyncReceiverLoaderIT.java | 105 +----
.../sync/{WriteUtil.java => SyncTestUtil.java} | 107 ++++-
.../org/apache/iotdb/db/concurrent/ThreadName.java | 1 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +
.../db/newsync/pipedata/DeletionPipeData.java | 36 +-
.../apache/iotdb/db/newsync/pipedata/PipeData.java | 33 +-
.../iotdb/db/newsync/pipedata/SchemaPipeData.java | 39 +-
.../iotdb/db/newsync/pipedata/TsFilePipeData.java | 35 +-
.../iotdb/db/newsync/receiver/ReceiverService.java | 44 +-
.../db/newsync/receiver/collector/Collector.java | 188 +++++++-
.../newsync/receiver/manager/ReceiverManager.java | 29 +-
.../db/newsync/receiver/recovery/ReceiverLog.java | 20 +-
.../receiver/recovery/ReceiverLogAnalyzer.java | 6 +-
.../sender/recovery/TsFilePipeLogAnalyzer.java | 2 +
.../utils/BufferedPipeDataBlockingQueue.java | 79 ++++
.../db/newsync/{conf => utils}/SyncConstant.java | 7 +-
.../iotdb/db/newsync/utils/SyncPathUtil.java | 56 +++
.../org/apache/iotdb/db/qp/logical/Operator.java | 2 -
.../apache/iotdb/db/utils/EnvironmentUtils.java | 2 +
.../iotdb/db/newsync/pipedata/PipeDataTest.java | 82 ++++
21 files changed, 1182 insertions(+), 189 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
new file mode 100644
index 0000000..1b13d13
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.sync;
+
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
+import org.apache.iotdb.db.newsync.receiver.collector.Collector;
+import org.apache.iotdb.db.newsync.utils.BufferedPipeDataBlockingQueue;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBSyncReceiverCollectorIT {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncReceiverLoaderIT.class);
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+ /** create tsfile and move to tmpDir for sync test */
+ File tmpDir = new File("target/synctest");
+
+ String pipeName1 = "pipe1";
+ String remoteIp1 = "192.168.0.11";
+ long createdTime1 = System.currentTimeMillis();
+ File pipeLogDir1 =
+ new File(SyncPathUtil.getReceiverPipeLogDir(pipeName1, remoteIp1, createdTime1));
+ String pipeName2 = "pipe2";
+ String remoteIp2 = "192.168.0.22";
+ long createdTime2 = System.currentTimeMillis();
+ File pipeLogDir2 =
+ new File(SyncPathUtil.getReceiverPipeLogDir(pipeName2, remoteIp2, createdTime2));
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ SyncTestUtil.insertData();
+ EnvironmentUtils.shutdownDaemon();
+ File srcDir = new File(IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]);
+ FileUtils.moveDirectory(srcDir, tmpDir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ FileUtils.deleteDirectory(tmpDir);
+ FileUtils.deleteDirectory(pipeLogDir1);
+ FileUtils.deleteDirectory(pipeLogDir2);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testOnePipe() throws Exception {
+ // 1. restart IoTDB
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.envSetUp();
+
+ // 2. prepare pipelog and .collector to test fail recovery
+ if (!pipeLogDir1.exists()) {
+ pipeLogDir1.mkdirs();
+ }
+ File pipeLog1 = new File(pipeLogDir1.getPath(), String.valueOf(System.currentTimeMillis()));
+ File collectFile1 = new File(pipeLog1.getPath() + SyncConstant.COLLECTOR_SUFFIX);
+ DataOutputStream pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
+ DataOutputStream collectOutput =
+ new DataOutputStream(new FileOutputStream(collectFile1, false));
+ int serialNum = 0;
+ for (int i = 0; i < 5; i++) {
+ // skip first 5 pipeData
+ PipeData pipeData = new TsFilePipeData("", serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ collectOutput.writeInt(i);
+ }
+ List<PhysicalPlan> planList = new ArrayList<>();
+ planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d0.s0"),
+ new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d0.s1"),
+ new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d1.s2"),
+ new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d1.s3"),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
+ planList.add(new SetStorageGroupPlan(new PartialPath("root.sg1")));
+ planList.add(
+ new CreateAlignedTimeSeriesPlan(
+ new PartialPath("root.sg1.d1"),
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ Arrays.asList(
+ TSDataType.FLOAT,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT),
+ Arrays.asList(
+ TSEncoding.RLE,
+ TSEncoding.GORILLA,
+ TSEncoding.RLE,
+ TSEncoding.RLE,
+ TSEncoding.PLAIN),
+ Arrays.asList(
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY),
+ null));
+ for (PhysicalPlan plan : planList) {
+ PipeData pipeData = new SchemaPipeData(plan, serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ }
+ pipeLogOutput.close();
+ collectOutput.close();
+ File pipeLog2 = new File(pipeLogDir1.getPath(), String.valueOf(System.currentTimeMillis() + 1));
+ pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
+ List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
+ for (File f : tsFiles) {
+ PipeData pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ }
+ Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
+ PipeData pipeData = new DeletionPipeData(deletion, serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ pipeLogOutput.close();
+
+ // 3. create and start collector
+ Collector collector = new Collector();
+ collector.startCollect();
+
+ // 4. start collect pipe
+ collector.startPipe(pipeName1, remoteIp1, createdTime1);
+
+ // 5. if all pipeData has been loaded into IoTDB, check result
+ CountDownLatch latch = new CountDownLatch(1);
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ File[] files = pipeLogDir1.listFiles();
+ if (files.length == 0) {
+ break;
+ }
+ }
+ latch.countDown();
+ });
+ es1.shutdown();
+ try {
+ latch.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ String sql1 = "select * from root.vehicle.*";
+ String[] retArray1 =
+ new String[] {
+ "6,120,null,null,null",
+ "9,null,123,null,null",
+ "16,128,null,null,16.0",
+ "18,189,198,true,18.0",
+ "20,null,null,false,null",
+ "29,null,null,true,1205.0",
+ "99,null,1234,null,null"
+ };
+ String[] columnNames1 = {
+ "root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
+ };
+ SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
+ // 5.2 check aligned timeseries
+ String sql2 = "select * from root.sg1.d1";
+ String[] retArray2 =
+ new String[] {
+ "1,1.0,1,null,true,aligned_test1",
+ "2,2.0,2,null,null,aligned_test2",
+ "3,3.0,null,null,false,aligned_test3",
+ "4,4.0,4,null,true,aligned_test4",
+ "5,130000.0,130000,130000,false,aligned_unseq_test1",
+ "6,6.0,6,6,true,null",
+ "7,7.0,7,7,false,aligned_test7",
+ "8,8.0,8,8,null,aligned_test8",
+ "9,9.0,9,9,false,aligned_test9",
+ };
+ String[] columnNames2 = {
+ "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s5",
+ };
+ SyncTestUtil.checkResult(sql2, columnNames2, retArray2);
+
+ // 6. stop pipe, check interrupt thread
+ collector.stopPipe(pipeName1, remoteIp1, createdTime1);
+ Thread.sleep(1000);
+ Deletion deletion1 = new Deletion(new PartialPath("root.vehicle.**"), 0, 0, 99);
+ PipeData pipeData1 = new DeletionPipeData(deletion1, serialNum++);
+ BufferedPipeDataBlockingQueue.getMock().offer(pipeData1);
+ Thread.sleep(1000);
+ SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
+
+ // 7. stop collector, check release thread pool
+ collector.stopCollect();
+ Thread.sleep(1000);
+ for (Thread t : Thread.getAllStackTraces().keySet()) {
+ if (t.getName().contains(ThreadName.SYNC_RECEIVER_COLLECTOR.getName())) {
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testMultiplePipe() throws Exception {
+ // 1. restart IoTDB
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.envSetUp();
+
+ // 2. prepare pipelog and .collector to test fail recovery
+ if (!pipeLogDir1.exists()) {
+ pipeLogDir1.mkdirs();
+ }
+ if (!pipeLogDir2.exists()) {
+ pipeLogDir2.mkdirs();
+ }
+ // 2.1 prepare for pipe1
+ File pipeLog1 = new File(pipeLogDir1.getPath(), String.valueOf(System.currentTimeMillis()));
+ File collectFile1 = new File(pipeLog1.getPath() + SyncConstant.COLLECTOR_SUFFIX);
+ DataOutputStream pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
+ DataOutputStream collectOutput =
+ new DataOutputStream(new FileOutputStream(collectFile1, false));
+ int serialNum = 0;
+ for (int i = 0; i < 5; i++) {
+ // skip first 5 pipeData
+ PipeData pipeData = new TsFilePipeData("", serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ collectOutput.writeInt(i);
+ }
+ List<PhysicalPlan> planList = new ArrayList<>();
+ planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d0.s0"),
+ new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d0.s1"),
+ new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d1.s2"),
+ new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE)));
+ planList.add(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.vehicle.d1.s3"),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.PLAIN)));
+ for (PhysicalPlan plan : planList) {
+ PipeData pipeData = new SchemaPipeData(plan, serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ }
+ pipeLogOutput.close();
+ collectOutput.close();
+ File pipeLog2 = new File(pipeLogDir1.getPath(), String.valueOf(System.currentTimeMillis() + 1));
+ pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
+ List<File> tsFiles =
+ SyncTestUtil.getTsFilePaths(new File(tmpDir, "sequence" + File.separator + "root.vehicle"));
+ for (File f : tsFiles) {
+ PipeData pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ }
+ tsFiles =
+ SyncTestUtil.getTsFilePaths(
+ new File(tmpDir, "unsequence" + File.separator + "root.vehicle"));
+ for (File f : tsFiles) {
+ PipeData pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ }
+ Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
+ PipeData pipeData = new DeletionPipeData(deletion, serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ pipeLogOutput.close();
+
+ // 2.2 prepare for pipe2
+ serialNum = 0;
+ pipeLog1 = new File(pipeLogDir2.getPath(), String.valueOf(System.currentTimeMillis()));
+ pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
+ pipeData =
+ new SchemaPipeData(
+ new CreateAlignedTimeSeriesPlan(
+ new PartialPath("root.sg1.d1"),
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ Arrays.asList(
+ TSDataType.FLOAT,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT),
+ Arrays.asList(
+ TSEncoding.RLE,
+ TSEncoding.GORILLA,
+ TSEncoding.RLE,
+ TSEncoding.RLE,
+ TSEncoding.PLAIN),
+ Arrays.asList(
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY,
+ CompressionType.SNAPPY),
+ null),
+ serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ pipeData =
+ new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("root.sg1")), serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ pipeLogOutput.close();
+ pipeLog2 = new File(pipeLogDir2.getPath(), String.valueOf(System.currentTimeMillis() + 1));
+ pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
+ tsFiles =
+ SyncTestUtil.getTsFilePaths(new File(tmpDir, "sequence" + File.separator + "root.sg1"));
+ for (File f : tsFiles) {
+ pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ }
+ tsFiles =
+ SyncTestUtil.getTsFilePaths(new File(tmpDir, "unsequence" + File.separator + "root.sg1"));
+ for (File f : tsFiles) {
+ pipeData = new TsFilePipeData(f.getPath(), serialNum++);
+ pipeData.serialize(pipeLogOutput);
+ }
+ pipeLogOutput.close();
+
+ // 3. create and start collector
+ Collector collector = new Collector();
+ collector.startCollect();
+
+ // 4. start collect pipe
+ collector.startPipe(pipeName1, remoteIp1, createdTime1);
+ collector.startPipe(pipeName2, remoteIp2, createdTime2);
+
+ // 5. if all pipeData has been loaded into IoTDB, check result
+ CountDownLatch latch = new CountDownLatch(2);
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ File[] files = pipeLogDir1.listFiles();
+ if (files.length == 0) {
+ break;
+ }
+ }
+ latch.countDown();
+ while (true) {
+ File[] files = pipeLogDir2.listFiles();
+ if (files.length == 0) {
+ break;
+ }
+ }
+ latch.countDown();
+ });
+ es1.shutdown();
+ try {
+ latch.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ String sql1 = "select * from root.vehicle.*";
+ String[] retArray1 =
+ new String[] {
+ "6,120,null,null,null",
+ "9,null,123,null,null",
+ "16,128,null,null,16.0",
+ "18,189,198,true,18.0",
+ "20,null,null,false,null",
+ "29,null,null,true,1205.0",
+ "99,null,1234,null,null"
+ };
+ String[] columnNames1 = {
+ "root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
+ };
+ SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
+ // 5.2 check aligned timeseries
+ String sql2 = "select * from root.sg1.d1";
+ String[] retArray2 =
+ new String[] {
+ "1,1.0,1,null,true,aligned_test1",
+ "2,2.0,2,null,null,aligned_test2",
+ "3,3.0,null,null,false,aligned_test3",
+ "4,4.0,4,null,true,aligned_test4",
+ "5,130000.0,130000,130000,false,aligned_unseq_test1",
+ "6,6.0,6,6,true,null",
+ "7,7.0,7,7,false,aligned_test7",
+ "8,8.0,8,8,null,aligned_test8",
+ "9,9.0,9,9,false,aligned_test9",
+ };
+ String[] columnNames2 = {
+ "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s5",
+ };
+ SyncTestUtil.checkResult(sql2, columnNames2, retArray2);
+
+ // 6. stop pipe, check interrupt thread
+ collector.stopPipe(pipeName1, remoteIp1, createdTime1);
+ collector.stopPipe(pipeName2, remoteIp2, createdTime2);
+ Thread.sleep(1000);
+ Deletion deletion1 = new Deletion(new PartialPath("root.vehicle.**"), 0, 0, 99);
+ PipeData pipeData1 = new DeletionPipeData(deletion1, serialNum++);
+ BufferedPipeDataBlockingQueue.getMock().offer(pipeData1);
+ Thread.sleep(1000);
+ SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
+
+ // 7. stop collector, check release thread pool
+ collector.stopCollect();
+ Thread.sleep(1000);
+ for (Thread t : Thread.getAllStackTraces().keySet()) {
+ if (t.getName().contains(ThreadName.SYNC_RECEIVER_COLLECTOR.getName())) {
+ Assert.fail();
+ }
+ }
+ }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
index 3d96ccb..8b98f92 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
@@ -31,11 +31,9 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
-import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.commons.io.FileUtils;
@@ -48,13 +46,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.sql.*;
import java.util.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
@Category({LocalStandaloneTest.class})
public class IoTDBSyncReceiverLoaderIT {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncReceiverLoaderIT.class);
@@ -76,7 +70,7 @@ public class IoTDBSyncReceiverLoaderIT {
IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
- WriteUtil.insertData();
+ SyncTestUtil.insertData();
EnvironmentUtils.shutdownDaemon();
File srcDir = new File(IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]);
FileUtils.moveDirectory(srcDir, tmpDir);
@@ -155,7 +149,7 @@ public class IoTDBSyncReceiverLoaderIT {
}
// 3. test for TsFileLoader
- List<File> tsFiles = getTsFilePaths(tmpDir);
+ List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
for (File tsfile : tsFiles) {
ILoader tsFileLoader = new TsFileLoader(tsfile);
try {
@@ -192,7 +186,7 @@ public class IoTDBSyncReceiverLoaderIT {
String[] columnNames1 = {
"root.vehicle.d0.s0", "root.vehicle.d0.s1", "root.vehicle.d1.s3", "root.vehicle.d1.s2"
};
- checkResult(sql1, columnNames1, retArray1);
+ SyncTestUtil.checkResult(sql1, columnNames1, retArray1);
// 5.2 check aligned timeseries
String sql2 = "select * from root.sg1.d1";
String[] retArray2 =
@@ -210,97 +204,6 @@ public class IoTDBSyncReceiverLoaderIT {
String[] columnNames2 = {
"root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s5",
};
- checkResult(sql2, columnNames2, retArray2);
- }
-
- private void checkResult(String sql, String[] columnNames, String[] retArray)
- throws ClassNotFoundException {
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
- boolean hasResultSet = statement.execute(sql);
- Assert.assertTrue(hasResultSet);
- ResultSet resultSet = statement.getResultSet();
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- Map<String, Integer> map = new HashMap<>();
- for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
- map.put(resultSetMetaData.getColumnName(i), i);
- }
- assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
- int cnt = 0;
- while (resultSet.next()) {
- StringBuilder builder = new StringBuilder();
- builder.append(resultSet.getString(1));
- for (String columnName : columnNames) {
- int index = map.get(columnName);
- builder.append(",").append(resultSet.getString(index));
- }
- assertEquals(retArray[cnt], builder.toString());
- cnt++;
- }
- assertEquals(retArray.length, cnt);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * scan parentDir and return all TsFile sorted by load sequence
- *
- * @param parentDir folder to scan
- */
- private List<File> getTsFilePaths(File parentDir) {
- List<File> res = new ArrayList<>();
- if (!parentDir.exists()) {
- Assert.fail();
- return res;
- }
- scanDir(res, parentDir);
- Collections.sort(
- res,
- new Comparator<File>() {
- @Override
- public int compare(File f1, File f2) {
- int diffSg =
- f1.getParentFile()
- .getParentFile()
- .getParentFile()
- .getName()
- .compareTo(f2.getParentFile().getParentFile().getParentFile().getName());
- if (diffSg != 0) {
- return diffSg;
- } else {
- return (int)
- (FilePathUtils.splitAndGetTsFileVersion(f1.getName())
- - FilePathUtils.splitAndGetTsFileVersion(f2.getName()));
- }
- }
- });
- return res;
- }
-
- private void scanDir(List<File> tsFiles, File parentDir) {
- if (!parentDir.exists()) {
- Assert.fail();
- return;
- }
- File fa[] = parentDir.listFiles();
- for (int i = 0; i < fa.length; i++) {
- File fs = fa[i];
- if (fs.isDirectory()) {
- scanDir(tsFiles, fs);
- } else if (fs.getName().endsWith(".resource")) {
- // only add tsfile that has been flushed
- tsFiles.add(new File(fs.getAbsolutePath().substring(0, fs.getAbsolutePath().length() - 9)));
- try {
- FileUtils.delete(fs);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
+ SyncTestUtil.checkResult(sql2, columnNames2, retArray2);
}
}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
similarity index 61%
rename from integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java
rename to integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
index 9d4da20..4498c11 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/WriteUtil.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
@@ -19,12 +19,20 @@
package org.apache.iotdb.db.integration.sync;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
-public class WriteUtil {
+import java.io.File;
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class SyncTestUtil {
private static final String[] sqls =
new String[] {
@@ -96,7 +104,94 @@ public class WriteUtil {
}
}
- public static void main(String[] args) throws ClassNotFoundException {
- insertData();
+ /**
+ * scan parentDir and return all TsFile sorted by load sequence
+ *
+ * @param parentDir folder to scan
+ */
+ public static List<File> getTsFilePaths(File parentDir) {
+ List<File> res = new ArrayList<>();
+ if (!parentDir.exists()) {
+ Assert.fail();
+ return res;
+ }
+ scanDir(res, parentDir);
+ Collections.sort(
+ res,
+ new Comparator<File>() {
+ @Override
+ public int compare(File f1, File f2) {
+ int diffSg =
+ f1.getParentFile()
+ .getParentFile()
+ .getParentFile()
+ .getName()
+ .compareTo(f2.getParentFile().getParentFile().getParentFile().getName());
+ if (diffSg != 0) {
+ return diffSg;
+ } else {
+ return (int)
+ (FilePathUtils.splitAndGetTsFileVersion(f1.getName())
+ - FilePathUtils.splitAndGetTsFileVersion(f2.getName()));
+ }
+ }
+ });
+ return res;
+ }
+
+ private static void scanDir(List<File> tsFiles, File parentDir) {
+ if (!parentDir.exists()) {
+ Assert.fail();
+ return;
+ }
+ File fa[] = parentDir.listFiles();
+ for (int i = 0; i < fa.length; i++) {
+ File fs = fa[i];
+ if (fs.isDirectory()) {
+ scanDir(tsFiles, fs);
+ } else if (fs.getName().endsWith(".resource")) {
+ // only add tsfile that has been flushed
+ tsFiles.add(new File(fs.getAbsolutePath().substring(0, fs.getAbsolutePath().length() - 9)));
+ try {
+ FileUtils.delete(fs);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public static void checkResult(String sql, String[] columnNames, String[] retArray)
+ throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet = statement.execute(sql);
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(resultSet.getString(1));
+ for (String columnName : columnNames) {
+ int index = map.get(columnName);
+ builder.append(",").append(resultSet.getString(index));
+ }
+ assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 4028571..c407012 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -57,6 +57,7 @@ public enum ThreadName {
TIMED_CLOSE_TSFILE("Timed-Close-TsFile"),
SETTLE_SERVICE("Settle"),
PIPE_SERVICE("Pipe"),
+ SYNC_RECEIVER_COLLECTOR("Sync-Collector"),
CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
CLUSTER_INFO_SERVICE("ClusterInfoClient"),
CLUSTER_RPC_SERVICE("ClusterRPC"),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d43afa4..b0307ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -221,6 +221,10 @@ public class IoTDBConfig {
+ File.separator
+ IoTDBConstant.SYNC_FOLDER_NAME;
+ /** Sync directory, including the lock file, uuid file, device owner map */
+ // TODO: delete old syncDir and rename newSyncDir to syncDir
+ private String newSyncDir = DEFAULT_BASE_DIR + File.separator + IoTDBConstant.SYNC_FOLDER_NAME;
+
/** Performance tracing directory, stores performance tracing files */
private String tracingDir = DEFAULT_BASE_DIR + File.separator + IoTDBConstant.TRACING_FOLDER_NAME;
@@ -928,6 +932,7 @@ public class IoTDBConfig {
systemDir = addHomeDir(systemDir);
schemaDir = addHomeDir(schemaDir);
syncDir = addHomeDir(syncDir);
+ newSyncDir = addHomeDir(newSyncDir);
tracingDir = addHomeDir(tracingDir);
walDir = addHomeDir(walDir);
indexRootFolder = addHomeDir(indexRootFolder);
@@ -1110,6 +1115,14 @@ public class IoTDBConfig {
this.syncDir = syncDir;
}
+ public String getNewSyncDir() {
+ return newSyncDir;
+ }
+
+ void setNewSyncDir(String syncDir) {
+ this.newSyncDir = syncDir;
+ }
+
public String getTracingDir() {
return tracingDir;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
index 99d5638..b4e91c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.newsync.pipedata;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.receiver.load.DeletionLoader;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,16 +31,13 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Objects;
public class DeletionPipeData extends PipeData {
private static final Logger logger = LoggerFactory.getLogger(DeletionPipeData.class);
private Deletion deletion;
- public DeletionPipeData(long serialNumber) {
- super(serialNumber);
- }
-
public DeletionPipeData(Deletion deletion, long serialNumber) {
super(serialNumber);
this.deletion = deletion;
@@ -50,13 +49,20 @@ public class DeletionPipeData extends PipeData {
}
@Override
- public long serializeImpl(DataOutputStream stream) throws IOException {
- return deletion.serializeWithoutFileOffset(stream);
+ public long serialize(DataOutputStream stream) throws IOException {
+ return super.serialize(stream) + deletion.serializeWithoutFileOffset(stream);
+ }
+
+ public static DeletionPipeData deserialize(DataInputStream stream)
+ throws IOException, IllegalPathException {
+ long serialNumber = stream.readLong();
+ Deletion deletion = Deletion.deserializeWithoutFileOffset(stream);
+ return new DeletionPipeData(deletion, serialNumber);
}
@Override
- public void deserializeImpl(DataInputStream stream) throws IOException, IllegalPathException {
- this.deletion = Deletion.deserializeWithoutFileOffset(stream);
+ public ILoader createLoader() {
+ return new DeletionLoader(deletion);
}
@Override
@@ -79,4 +85,18 @@ public class DeletionPipeData extends PipeData {
public String toString() {
return "DeletionData{" + "deletion=" + deletion + ", serialNumber=" + serialNumber + '}';
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DeletionPipeData that = (DeletionPipeData) o;
+ return Objects.equals(deletion, that.deletion)
+ && Objects.equals(serialNumber, that.serialNumber);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deletion, serialNumber);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
index 0575d5c..2e16115 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
@@ -20,12 +20,17 @@
package org.apache.iotdb.db.newsync.pipedata;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
public abstract class PipeData {
+ private static final Logger logger = LoggerFactory.getLogger(PipeData.class);
protected final long serialNumber;
@@ -53,39 +58,31 @@ public abstract class PipeData {
public long serialize(DataOutputStream stream) throws IOException {
long serializeSize = 0;
- stream.writeLong(serialNumber);
- serializeSize += Long.BYTES;
stream.writeByte((byte) getType().ordinal());
serializeSize += Byte.BYTES;
- serializeSize += serializeImpl(stream);
+ stream.writeLong(serialNumber);
+ serializeSize += Long.BYTES;
return serializeSize;
}
- public abstract long serializeImpl(DataOutputStream stream) throws IOException;
-
public static PipeData deserialize(DataInputStream stream)
throws IOException, IllegalPathException {
- long serialNumber = stream.readLong();
Type type = Type.values()[stream.readByte()];
-
- PipeData pipeData = null;
switch (type) {
case TSFILE:
- pipeData = new TsFilePipeData(serialNumber);
- break;
+ return TsFilePipeData.deserialize(stream);
case DELETION:
- pipeData = new DeletionPipeData(serialNumber);
- break;
+ return DeletionPipeData.deserialize(stream);
case PHYSICALPLAN:
- pipeData = new SchemaPipeData(serialNumber);
- break;
+ return SchemaPipeData.deserialize(stream);
+ default:
+ logger.error("Deserialize PipeData error because Unknown type {}.", type);
+ throw new UnsupportedOperationException(
+ "Deserialize PipeData error because Unknown type " + type);
}
- pipeData.deserializeImpl(stream);
- return pipeData;
}
- public abstract void deserializeImpl(DataInputStream stream)
- throws IOException, IllegalPathException;
+ public abstract ILoader createLoader();
public abstract void sendToTransport();
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
index 28f068a..809a0e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
@@ -20,22 +20,21 @@
package org.apache.iotdb.db.newsync.pipedata;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
+import org.apache.iotdb.db.newsync.receiver.load.SchemaLoader;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class SchemaPipeData extends PipeData {
private static final int SERIALIZE_BUFFER_SIZE = 1024;
private PhysicalPlan plan;
- public SchemaPipeData(long serialNumber) {
- super(serialNumber);
- }
-
public SchemaPipeData(PhysicalPlan plan, long serialNumber) {
super(serialNumber);
this.plan = plan;
@@ -47,11 +46,13 @@ public class SchemaPipeData extends PipeData {
}
@Override
- public long serializeImpl(DataOutputStream stream) throws IOException {
+ public long serialize(DataOutputStream stream) throws IOException {
+ long serializeSize = super.serialize(stream);
byte[] bytes = getBytes();
stream.writeInt(bytes.length);
stream.write(bytes);
- return Integer.BYTES + bytes.length;
+ serializeSize += (Integer.BYTES + bytes.length);
+ return serializeSize;
}
private byte[] getBytes() {
@@ -63,11 +64,18 @@ public class SchemaPipeData extends PipeData {
return bytes;
}
- @Override
- public void deserializeImpl(DataInputStream stream) throws IOException, IllegalPathException {
+ public static SchemaPipeData deserialize(DataInputStream stream)
+ throws IOException, IllegalPathException {
+ long serialNumber = stream.readLong();
byte[] bytes = new byte[stream.readInt()];
stream.read(bytes);
- this.plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
+ PhysicalPlan plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
+ return new SchemaPipeData(plan, serialNumber);
+ }
+
+ @Override
+ public ILoader createLoader() {
+ return new SchemaLoader(plan);
}
@Override
@@ -80,4 +88,17 @@ public class SchemaPipeData extends PipeData {
public String toString() {
return "SchemaPipeData{" + "serialNumber=" + serialNumber + ", plan=" + plan + '}';
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SchemaPipeData that = (SchemaPipeData) o;
+ return Objects.equals(plan, that.plan) && Objects.equals(serialNumber, that.serialNumber);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(plan, serialNumber);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
index 503486a..3ff101f 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.newsync.pipedata;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.receiver.load.ILoader;
+import org.apache.iotdb.db.newsync.receiver.load.TsFileLoader;
import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -34,16 +36,13 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
public class TsFilePipeData extends PipeData {
private static final Logger logger = LoggerFactory.getLogger(TsFilePipeData.class);
private String tsFilePath;
- public TsFilePipeData(long serialNumber) {
- super(serialNumber);
- }
-
public TsFilePipeData(String tsFilePath, long serialNumber) {
super(serialNumber);
this.tsFilePath = tsFilePath;
@@ -55,13 +54,19 @@ public class TsFilePipeData extends PipeData {
}
@Override
- public long serializeImpl(DataOutputStream stream) throws IOException {
- return ReadWriteIOUtils.write(tsFilePath, stream);
+ public long serialize(DataOutputStream stream) throws IOException {
+ return super.serialize(stream) + ReadWriteIOUtils.write(tsFilePath, stream);
+ }
+
+ public static TsFilePipeData deserialize(DataInputStream stream) throws IOException {
+ long serialNumber = stream.readLong();
+ String tsFilePath = ReadWriteIOUtils.readString(stream);
+ return new TsFilePipeData(tsFilePath, serialNumber);
}
@Override
- public void deserializeImpl(DataInputStream stream) throws IOException {
- this.tsFilePath = ReadWriteIOUtils.readString(stream);
+ public ILoader createLoader() {
+ return new TsFileLoader(new File(tsFilePath));
}
@Override
@@ -125,4 +130,18 @@ public class TsFilePipeData extends PipeData {
+ '\''
+ '}';
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TsFilePipeData pipeData = (TsFilePipeData) o;
+ return Objects.equals(tsFilePath, pipeData.tsFilePath)
+ && Objects.equals(serialNumber, pipeData.serialNumber);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tsFilePath, serialNumber);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
index e85bc7c..3dcca71 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -22,7 +22,9 @@ import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.newsync.receiver.collector.Collector;
import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
+import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -34,10 +36,12 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -53,7 +57,16 @@ public class ReceiverService implements IService {
public boolean startPipeServer() {
try {
receiverManager.startServer();
- // TODO: start socket and collector
+ collector.startCollect();
+ // recover started pipe
+ List<PipeInfo> pipeInfos = receiverManager.getAllPipeInfos();
+ for (PipeInfo pipeInfo : pipeInfos) {
+ if (pipeInfo.getStatus().equals(PipeStatus.RUNNING)) {
+ collector.startPipe(
+ pipeInfo.getPipeName(), pipeInfo.getRemoteIp(), pipeInfo.getCreateTime());
+ }
+ }
+ // TODO: start socket
} catch (IOException e) {
logger.error(e.getMessage());
return false;
@@ -65,6 +78,7 @@ public class ReceiverService implements IService {
public boolean stopPipeServer() {
try {
receiverManager.stopServer();
+ collector.stopCollect();
// TODO: stop socket and collector
} catch (IOException e) {
logger.error(e.getMessage());
@@ -74,23 +88,41 @@ public class ReceiverService implements IService {
}
/** create and start a new pipe named pipeName */
- public void createPipe(String pipeName, String remoteIp, long startTime) throws IOException {
- receiverManager.createPipe(pipeName, remoteIp, startTime);
+ public void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+ createDir(pipeName, remoteIp, createTime);
+ receiverManager.createPipe(pipeName, remoteIp, createTime);
+ collector.startPipe(pipeName, remoteIp, createTime);
}
/** start an existed pipe named pipeName */
- public void startPipe(String pipeName, String remoteIp) throws IOException {
+ public void startPipe(String pipeName, String remoteIp, long createTime) throws IOException {
receiverManager.startPipe(pipeName, remoteIp);
+ collector.startPipe(pipeName, remoteIp, createTime);
}
/** stop an existed pipe named pipeName */
- public void stopPipe(String pipeName, String remoteIp) throws IOException {
+ public void stopPipe(String pipeName, String remoteIp, long createTime) throws IOException {
receiverManager.stopPipe(pipeName, remoteIp);
+ collector.stopPipe(pipeName, remoteIp, createTime);
}
/** drop an existed pipe named pipeName */
- public void dropPipe(String pipeName, String remoteIp) throws IOException {
+ public void dropPipe(String pipeName, String remoteIp, long createTime) throws IOException {
receiverManager.dropPipe(pipeName, remoteIp);
+ collector.stopPipe(pipeName, remoteIp, createTime);
+ File dir = new File(SyncPathUtil.getReceiverPipeDir(pipeName, remoteIp, createTime));
+ FileUtils.deleteDirectory(dir);
+ }
+
+ private void createDir(String pipeName, String remoteIp, long createTime) {
+ File f = new File(SyncPathUtil.getReceiverFileDataDir(pipeName, remoteIp, createTime));
+ if (!f.exists()) {
+ f.mkdirs();
+ }
+ f = new File(SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
+ if (!f.exists()) {
+ f.mkdirs();
+ }
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
index 3c44ed0..9321f5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
@@ -19,4 +19,190 @@
*/
package org.apache.iotdb.db.newsync.receiver.collector;
-public class Collector {}
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.utils.BufferedPipeDataBlockingQueue;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/** scan sync receiver folder and load pipeData into IoTDB */
+public class Collector {
+
+ private static final Logger logger = LoggerFactory.getLogger(Collector.class);
+ private static final int WAIT_TIMEOUT = 2000;
+ private ExecutorService executorService;
+ private Map<String, Future> taskFutures;
+
+ public Collector() {
+ taskFutures = new ConcurrentHashMap<>();
+ }
+
+ public void startCollect() {
+ this.executorService =
+ IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.SYNC_RECEIVER_COLLECTOR.getName());
+ }
+
+ public void stopCollect() {
+ for (Future f : taskFutures.values()) {
+ f.cancel(true);
+ }
+ if (executorService != null) {
+ executorService.shutdownNow();
+ int totalWaitTime = WAIT_TIMEOUT;
+ while (!executorService.isTerminated()) {
+ try {
+ if (!executorService.awaitTermination(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+ logger.info(
+ "{} thread pool doesn't exit after {}ms.",
+ ThreadName.SYNC_RECEIVER_COLLECTOR.getName(),
+ totalWaitTime);
+ }
+ totalWaitTime += WAIT_TIMEOUT;
+ } catch (InterruptedException e) {
+ logger.error(
+ "Interrupted while waiting {} thread pool to exit. ",
+ ThreadName.SYNC_RECEIVER_COLLECTOR.getName());
+ Thread.currentThread().interrupt();
+ }
+ }
+ executorService = null;
+ }
+ }
+
+ public void startPipe(String pipeName, String remoteIp, long createTime) {
+ String dir = SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime);
+ ScanTask task = new ScanTask(dir);
+ taskFutures.put(dir, executorService.submit(task));
+ }
+
+ public void stopPipe(String pipeName, String remoteIp, long createTime) {
+ String dir = SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime);
+ taskFutures.get(dir).cancel(true);
+ taskFutures.remove(dir);
+ }
+
+ private static BufferedPipeDataBlockingQueue parsePipeLogToBlockingQueue(File file)
+ throws IOException {
+ BufferedPipeDataBlockingQueue blockingQueue =
+ new BufferedPipeDataBlockingQueue(file.getAbsolutePath(), 100);
+ DataInputStream inputStream = new DataInputStream(new FileInputStream(file));
+ try {
+ while (true) {
+ blockingQueue.offer(PipeData.deserialize(inputStream));
+ }
+ } catch (EOFException e) {
+ logger.info(String.format("Finish parsing pipeLog %s.", file.getPath()));
+ } catch (IllegalPathException e) {
+ logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
+ throw new IOException(e);
+ } finally {
+ inputStream.close();
+ }
+ blockingQueue.end();
+ return blockingQueue;
+ }
+
+ private class ScanTask implements Runnable {
+ private final String scanPath;
+
+ private ScanTask(String dirPath) {
+ scanPath = dirPath;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!Thread.interrupted()) {
+ File dir = new File(scanPath);
+ if (dir.exists() && dir.isDirectory()) {
+ BufferedPipeDataBlockingQueue pipeDataQueue;
+ File[] files = dir.listFiles((d, s) -> !s.endsWith(SyncConstant.COLLECTOR_SUFFIX));
+ int nextIndex = 0;
+
+ if (files.length > 0) {
+ // read from disk
+ // TODO: Assuming that the file name is incremented by number
+ Arrays.sort(files, Comparator.comparingLong(o -> Long.parseLong(o.getName())));
+ try {
+ pipeDataQueue = parsePipeLogToBlockingQueue(files[0]);
+ } catch (IOException e) {
+ logger.error("Parse pipe data log {} error.", files[0].getPath());
+ // TODO: stop
+ return;
+ }
+ } else {
+ // read from buffer
+ // TODO: get buffer from transport, this is mock implement
+ pipeDataQueue = BufferedPipeDataBlockingQueue.getMock();
+ }
+
+ File recordFile = new File(pipeDataQueue.getFileName() + SyncConstant.COLLECTOR_SUFFIX);
+ if (recordFile.exists()) {
+ RandomAccessFile raf = new RandomAccessFile(recordFile, "r");
+ if (raf.length() > Integer.BYTES) {
+ raf.seek(raf.length() - Integer.BYTES);
+ nextIndex = raf.readInt() + 1;
+ }
+ raf.close();
+ }
+ DataOutputStream outputStream =
+ new DataOutputStream(new FileOutputStream(recordFile, true));
+ while (!pipeDataQueue.isEnd()) {
+ PipeData pipeData = null;
+ try {
+ pipeData = pipeDataQueue.take();
+ } catch (InterruptedException e) {
+ outputStream.close();
+ Thread.currentThread().interrupt();
+ }
+ int currentIndex = pipeDataQueue.getAndIncreaseIndex();
+ if (currentIndex < nextIndex) {
+ continue;
+ }
+ try {
+ logger.info(
+ "Start load pipeData with serialize number {} and type {}",
+ pipeData.getSerialNumber(),
+ pipeData.getType());
+ pipeData.createLoader().load();
+ outputStream.writeInt(currentIndex);
+ } catch (Exception e) {
+ // TODO: how to response error message to sender?
+ // TODO: should drop this pipe?
+ logger.error(
+ "Cannot load pipeData with serialize number {} and type {}, because {}",
+ pipeData.getSerialNumber(),
+ pipeData.getType(),
+ e.getMessage());
+ break;
+ }
+ }
+ outputStream.close();
+ // if all success loaded, remove pipelog and record file
+ File pipeLog = new File(pipeDataQueue.getFileName());
+ if (pipeLog.exists()) {
+ Files.deleteIfExists(pipeLog.toPath());
+ Files.deleteIfExists(Paths.get(files[0].getPath() + SyncConstant.COLLECTOR_SUFFIX));
+ }
+ }
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
index 4dd33e3..dde1aac 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/manager/ReceiverManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.newsync.receiver.manager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLog;
import org.apache.iotdb.db.newsync.receiver.recovery.ReceiverLogAnalyzer;
-import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,16 +38,10 @@ public class ReceiverManager {
private ReceiverLog log;
public void init() throws StartupException {
- try {
- log = new ReceiverLog();
- } catch (IOException e) {
- logger.error(e.getMessage());
- throw new StartupException(
- ServiceType.RECEIVER_SERVICE.getName(), "cannot create receiver log");
- }
ReceiverLogAnalyzer.scan();
pipeInfoMap = ReceiverLogAnalyzer.getPipeInfoMap();
pipeServerEnable = ReceiverLogAnalyzer.isPipeServerEnable();
+ log = new ReceiverLog();
}
public void close() throws IOException {
@@ -65,36 +58,28 @@ public class ReceiverManager {
pipeServerEnable = false;
}
- public void createPipe(String pipeName, String remoteIp, long startTime) throws IOException {
- if (log != null) {
- log.createPipe(pipeName, remoteIp, startTime);
- }
+ public void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+ log.createPipe(pipeName, remoteIp, createTime);
if (!pipeInfoMap.containsKey(pipeName)) {
pipeInfoMap.put(pipeName, new HashMap<>());
}
pipeInfoMap
.get(pipeName)
- .put(remoteIp, new PipeInfo(pipeName, remoteIp, PipeStatus.RUNNING, startTime));
+ .put(remoteIp, new PipeInfo(pipeName, remoteIp, PipeStatus.RUNNING, createTime));
}
public void startPipe(String pipeName, String remoteIp) throws IOException {
- if (log != null) {
- log.startPipe(pipeName, remoteIp);
- }
+ log.startPipe(pipeName, remoteIp);
pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.RUNNING);
}
public void stopPipe(String pipeName, String remoteIp) throws IOException {
- if (log != null) {
- log.stopPipe(pipeName, remoteIp);
- }
+ log.stopPipe(pipeName, remoteIp);
pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.PAUSE);
}
public void dropPipe(String pipeName, String remoteIp) throws IOException {
- if (log != null) {
- log.dropPipe(pipeName, remoteIp);
- }
+ log.dropPipe(pipeName, remoteIp);
pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.DROP);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
index 55427e1..c0dc7b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.db.newsync.receiver.recovery;
-import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
import java.io.BufferedWriter;
import java.io.File;
@@ -30,8 +30,8 @@ import java.io.IOException;
public class ReceiverLog {
private BufferedWriter bw;
- public ReceiverLog() throws IOException {
- File logFile = new File(SenderConf.sysDir, SyncConstant.RECEIVER_LOG_NAME);
+ public void init() throws IOException {
+ File logFile = new File(SyncPathUtil.getSysDir(), SyncConstant.RECEIVER_LOG_NAME);
if (!logFile.getParentFile().exists()) {
logFile.getParentFile().mkdirs();
}
@@ -39,12 +39,18 @@ public class ReceiverLog {
}
public void startPipeServer() throws IOException {
+ if (bw == null) {
+ init();
+ }
bw.write("on");
bw.newLine();
bw.flush();
}
public void stopPipeServer() throws IOException {
+ if (bw == null) {
+ init();
+ }
bw.write("off");
bw.newLine();
bw.flush();
@@ -68,12 +74,18 @@ public class ReceiverLog {
private void writeLog(String pipeName, String remoteIp, PipeStatus status, long time)
throws IOException {
+ if (bw == null) {
+ init();
+ }
bw.write(String.format("%s,%s,%s,%d", pipeName, remoteIp, status, time));
bw.newLine();
bw.flush();
}
private void writeLog(String pipeName, String remoteIp, PipeStatus status) throws IOException {
+ if (bw == null) {
+ init();
+ }
bw.write(String.format("%s,%s,%s", pipeName, remoteIp, status));
bw.newLine();
bw.flush();
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
index 50e6724..5ffe24c 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.db.newsync.receiver.recovery;
import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.utils.SyncConstant;
+import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public class ReceiverLogAnalyzer {
logger.info("Start to recover all sync state for sync receiver.");
pipeInfoMap = new HashMap<>();
pipeServerEnable = false;
- File logFile = new File(SenderConf.sysDir, SyncConstant.RECEIVER_LOG_NAME);
+ File logFile = new File(SyncPathUtil.getSysDir(), SyncConstant.RECEIVER_LOG_NAME);
try (BufferedReader loadReader = new BufferedReader(new FileReader(logFile))) {
String line;
int lineNum = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
index d24598f..a182023 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
@@ -173,6 +173,8 @@ public class TsFilePipeLogAnalyzer {
} catch (IllegalPathException e) {
logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
throw new IOException(e);
+ } finally {
+ inputStream.close();
}
return pipeData;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java
new file mode 100644
index 0000000..537bc4e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.utils;
+
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+
+import java.io.File;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+public class BufferedPipeDataBlockingQueue {
+ // TODO: delete mock if unnecessary
+ private static BufferedPipeDataBlockingQueue mock;
+ private String fileName;
+ private BlockingQueue<PipeData> pipeDataQueue;
+ private int currentIndex;
+ private boolean end;
+
+ public BufferedPipeDataBlockingQueue(String fileName, int blockingQueueCapacity) {
+ this.fileName = fileName;
+ this.currentIndex = 0;
+ this.end = false;
+ pipeDataQueue = new ArrayBlockingQueue<>(blockingQueueCapacity);
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public int getAndIncreaseIndex() {
+ return currentIndex++;
+ }
+
+ public boolean offer(PipeData pipeData) {
+ return pipeDataQueue.offer(pipeData);
+ }
+
+ public PipeData take() throws InterruptedException {
+ return pipeDataQueue.take();
+ }
+
+ public void end() {
+ this.end = true;
+ }
+
+ public boolean isEnd() {
+ return end && pipeDataQueue.isEmpty();
+ }
+
+ // TODO: delete mock if unnecessary
+ public static BufferedPipeDataBlockingQueue getMock() {
+ if (mock == null) {
+ String mockPath = SyncPathUtil.getReceiverPipeLogDir("mock", "127.0.0.1", 0);
+ File file = new File(mockPath);
+ if (!file.exists()) {
+ file.mkdirs();
+ }
+ mock = new BufferedPipeDataBlockingQueue(mockPath + File.separator + "100", 100);
+ }
+ return mock;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
index 9a6d0d5..ecf0bd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
@@ -16,8 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.newsync.conf;
+package org.apache.iotdb.db.newsync.utils;
public class SyncConstant {
+ public static final String SYNC_SYS_DIR = "sys";
+ public static final String RECEIVER_DIR = "receiver";
public static final String RECEIVER_LOG_NAME = "receiverService.log";
+ public static final String PIPELOG_DIR_NAME = "pipe-log";
+ public static final String FILEDATA_DIR_NAME = "file-data";
+ public static final String COLLECTOR_SUFFIX = ".collector";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java
new file mode 100644
index 0000000..f57a080
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.utils;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import java.io.File;
+
+/** Util for path generation in sync module */
+public class SyncPathUtil {
+ public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) {
+ return getReceiverPipeDir(pipeName, remoteIp, createTime)
+ + File.separator
+ + SyncConstant.PIPELOG_DIR_NAME;
+ }
+
+ public static String getReceiverFileDataDir(String pipeName, String remoteIp, long createTime) {
+ return getReceiverPipeDir(pipeName, remoteIp, createTime)
+ + File.separator
+ + SyncConstant.FILEDATA_DIR_NAME;
+ }
+
+ public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
+ return getReceiverDir()
+ + File.separator
+ + String.format("%s-%d-%s", pipeName, createTime, remoteIp);
+ }
+
+ public static String getReceiverDir() {
+ return IoTDBDescriptor.getInstance().getConfig().getNewSyncDir()
+ + File.separator
+ + SyncConstant.RECEIVER_DIR;
+ }
+
+ public static String getSysDir() {
+ return IoTDBDescriptor.getInstance().getConfig().getNewSyncDir()
+ + File.separator
+ + SyncConstant.SYNC_SYS_DIR;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 4aa910e..2a82e6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -193,9 +193,7 @@ public abstract class Operator {
PRUNE_TEMPLATE,
APPEND_TEMPLATE,
DROP_TEMPLATE,
-
SHOW_QUERY_RESOURCE,
-
CREATE_PIPESINK,
DROP_PIPESINK,
SHOW_PIPESINK,
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 0a35a93..fd6eb0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -249,6 +249,8 @@ public class EnvironmentUtils {
cleanDir(config.getUdfDir());
// delete tlog
cleanDir(config.getTriggerDir());
+ // delete sync dir
+ cleanDir(config.getNewSyncDir());
// delete data files
for (String dataDir : config.getDataDirs()) {
cleanDir(dataDir);
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
new file mode 100644
index 0000000..ed9d79f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class PipeDataTest {
+ private static final Logger logger = LoggerFactory.getLogger(PipeDataTest.class);
+ private static final String pipeLogPath = "target/pipelog";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ EnvironmentUtils.cleanEnv();
+ Files.deleteIfExists(Paths.get(pipeLogPath));
+ }
+
+ @Test
+ public void testSerializeAndDeserialize() {
+ try {
+ File f1 = new File(pipeLogPath);
+ File f2 = new File(pipeLogPath);
+ PipeData pipeData1 = new TsFilePipeData("1", 1);
+ Deletion deletion = new Deletion(new PartialPath("root.sg1.d1.s1"), 0, 1, 5);
+ PipeData pipeData2 = new DeletionPipeData(deletion, 3);
+ PhysicalPlan plan = new SetStorageGroupPlan(new PartialPath("root.sg1"));
+ PipeData pipeData3 = new SchemaPipeData(plan, 2);
+ DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(f2));
+ pipeData1.serialize(outputStream);
+ outputStream.flush();
+ DataInputStream inputStream = new DataInputStream(new FileInputStream(f1));
+ Assert.assertEquals(pipeData1, PipeData.deserialize(inputStream));
+ pipeData2.serialize(outputStream);
+ outputStream.flush();
+ Assert.assertEquals(pipeData2, PipeData.deserialize(inputStream));
+ pipeData3.serialize(outputStream);
+ outputStream.flush();
+ Assert.assertEquals(pipeData3, PipeData.deserialize(inputStream));
+ inputStream.close();
+ outputStream.close();
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ Assert.fail();
+ }
+ }
+}