You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/27 06:10:50 UTC
[iotdb] branch master updated: [IOTDB-2938]Some improvements and bug fix for New Sync (#5563)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6946f3031a [IOTDB-2938]Some improvements and bug fix for New Sync (#5563)
6946f3031a is described below
commit 6946f3031af64a59845aa17358b3b7ae42799af4
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Wed Apr 27 14:10:45 2022 +0800
[IOTDB-2938]Some improvements and bug fix for New Sync (#5563)
* 1.refactor sender IT and add delete and restart cases, 2.add close in TsFilePipe, 3.fix delete storage group
* spotless
* 5.change pipe data to pipe log for sender in SyncPathUtil
* 5. change sender dir structure
* 6. add MsgManager
* add dir structure in SyncPathUtil and remove create Msg in TransportClient
* fix restart MsgManager and add Connect timeout and Socket timeout for transport in TransportClient
* spotless
* delete System.out.println in SenderIT
---
.../sync/IoTDBSyncReceiverCollectorIT.java | 12 +-
.../db/integration/sync/IoTDBSyncSenderIT.java | 327 +++++++++++++++++++--
.../db/integration/sync/TransportClientMock.java | 12 +-
.../db/integration/sync/TransportHandlerMock.java | 61 ++++
.../iotdb/db/localconfignode/LocalConfigNode.java | 13 +-
.../iotdb/db/metadata/LocalSchemaProcessor.java | 6 +-
.../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 12 +
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
.../apache/iotdb/db/sync/conf/SyncConstant.java | 54 ++--
.../apache/iotdb/db/sync/conf/SyncPathUtil.java | 82 ++++--
.../iotdb/db/sync/pipedata/DeletionPipeData.java | 2 +-
.../sync/pipedata/queue/BufferedPipeDataQueue.java | 31 +-
.../db/sync/pipedata/queue/PipeDataQueue.java | 2 +
.../db/sync/receiver/collector/Collector.java | 4 +-
.../db/sync/receiver/manager/ReceiverManager.java | 4 +-
.../db/sync/sender/manager/SchemaSyncManager.java | 18 +-
.../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java | 18 ++
.../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 10 +
.../iotdb/db/sync/sender/pipe/TsFilePipe.java | 61 +++-
.../db/sync/sender/recovery/SenderLogAnalyzer.java | 25 +-
.../db/sync/sender/recovery/SenderLogger.java | 12 +-
.../db/sync/sender/recovery/TsFilePipeLogger.java | 2 +-
.../iotdb/db/sync/sender/service/MsgManager.java | 114 +++++++
.../db/sync/sender/service/SenderService.java | 81 ++---
.../db/sync/sender/service/TransportHandler.java | 47 ++-
.../db/sync/transport/client/TransportClient.java | 35 ++-
.../sync/pipedata/BufferedPipeDataQueueTest.java | 26 +-
.../receiver/recovery/ReceiverLogAnalyzerTest.java | 4 +-
28 files changed, 829 insertions(+), 248 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
index 67df8a3012..11b99be8c7 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
@@ -131,7 +131,7 @@ public class IoTDBSyncReceiverCollectorIT {
outputStream.writeLong(-1);
outputStream.close();
int serialNum = 0;
- File pipeLog1 = new File(pipeLogDir1.getPath(), SyncConstant.getPipeLogName(serialNum));
+ File pipeLog1 = new File(pipeLogDir1.getPath(), SyncPathUtil.getPipeLogName(serialNum));
DataOutputStream pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
List<PhysicalPlan> planList = new ArrayList<>();
planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
@@ -182,7 +182,7 @@ public class IoTDBSyncReceiverCollectorIT {
pipeData.serialize(pipeLogOutput);
}
pipeLogOutput.close();
- File pipeLog2 = new File(pipeLogDir1.getPath(), SyncConstant.getPipeLogName(serialNum));
+ File pipeLog2 = new File(pipeLogDir1.getPath(), SyncPathUtil.getPipeLogName(serialNum));
pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
List<File> tsFiles = SyncTestUtil.getTsFilePaths(tmpDir);
for (File f : tsFiles) {
@@ -303,7 +303,7 @@ public class IoTDBSyncReceiverCollectorIT {
outputStream.writeLong(-1);
outputStream.close();
int serialNum1 = 0;
- File pipeLog1 = new File(pipeLogDir1.getPath(), SyncConstant.getPipeLogName(serialNum1));
+ File pipeLog1 = new File(pipeLogDir1.getPath(), SyncPathUtil.getPipeLogName(serialNum1));
DataOutputStream pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
List<PhysicalPlan> planList = new ArrayList<>();
planList.add(new SetStorageGroupPlan(new PartialPath("root.vehicle")));
@@ -328,7 +328,7 @@ public class IoTDBSyncReceiverCollectorIT {
pipeData.serialize(pipeLogOutput);
}
pipeLogOutput.close();
- File pipeLog2 = new File(pipeLogDir1.getPath(), SyncConstant.getPipeLogName(serialNum1));
+ File pipeLog2 = new File(pipeLogDir1.getPath(), SyncPathUtil.getPipeLogName(serialNum1));
pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
List<File> tsFiles =
SyncTestUtil.getTsFilePaths(new File(tmpDir, "sequence" + File.separator + "root.vehicle"));
@@ -355,7 +355,7 @@ public class IoTDBSyncReceiverCollectorIT {
new FileOutputStream(new File(pipeLogDir2, SyncConstant.COMMIT_LOG_NAME), true));
outputStream.writeLong(-1);
outputStream.close();
- pipeLog1 = new File(pipeLogDir2.getPath(), SyncConstant.getPipeLogName(serialNum2));
+ pipeLog1 = new File(pipeLogDir2.getPath(), SyncPathUtil.getPipeLogName(serialNum2));
pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog1, false));
pipeData =
new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("root.sg1")), serialNum2++);
@@ -389,7 +389,7 @@ public class IoTDBSyncReceiverCollectorIT {
serialNum2++);
pipeData.serialize(pipeLogOutput);
pipeLogOutput.close();
- pipeLog2 = new File(pipeLogDir2.getPath(), SyncConstant.getPipeLogName(serialNum2));
+ pipeLog2 = new File(pipeLogDir2.getPath(), SyncPathUtil.getPipeLogName(serialNum2));
pipeLogOutput = new DataOutputStream(new FileOutputStream(pipeLog2, false));
tsFiles =
SyncTestUtil.getTsFilePaths(new File(tmpDir, "sequence" + File.separator + "root.sg1"));
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index c9fa74c7f7..cacf6c6f57 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -19,11 +19,15 @@
package org.apache.iotdb.db.integration.sync;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkTypePlan;
+import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.sync.sender.service.TransportHandler;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
@@ -38,7 +42,12 @@ import org.junit.experimental.categories.Category;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
@Category({LocalStandaloneTest.class})
public class IoTDBSyncSenderIT {
@@ -49,8 +58,16 @@ public class IoTDBSyncSenderIT {
private static final String pipeSinkName = "test_pipesink";
private static final String pipeName = "test_pipe";
- private TsFilePipe pipe;
- private TransportClientMock mock;
+ private TransportHandlerMock handler;
+ private TransportClientMock transportClient;
+
+ private final Map<String, List<PipeData>> resultMap = new HashMap<>();
+ private static final TsFilePipeData simpleTsFilePipeData =
+ new TsFilePipeData("path", "tsfile", 0L);
+ private static final SchemaPipeData simpleSchemaPipeData =
+ new SchemaPipeData(new ShowPipeSinkTypePlan(), 0L);
+ private static final DeletionPipeData simpleDeletionPipeData =
+ new DeletionPipeData(new Deletion(new PartialPath(), 0L, 0L), 0L);
@Before
public void setUp() throws Exception {
@@ -65,6 +82,12 @@ public class IoTDBSyncSenderIT {
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
Class.forName(Config.JDBC_DRIVER_NAME);
+
+ IoTDBPipeSink pipeSink = new IoTDBPipeSink(pipeSinkName);
+ TsFilePipe pipe = new TsFilePipe(0L, pipeName, pipeSink, 0L, true);
+ transportClient = new TransportClientMock(pipe, pipeSink.getIp(), pipeSink.getPort());
+ handler = new TransportHandlerMock(pipe, pipeSink, transportClient);
+ TransportHandler.setDebugTransportHandler(handler);
}
@After
@@ -80,7 +103,7 @@ public class IoTDBSyncSenderIT {
EnvironmentUtils.cleanEnv();
}
- private void prepareSchema() throws Exception {
+ private void prepareSchema() throws Exception { // 8 schema plans
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
@@ -93,6 +116,18 @@ public class IoTDBSyncSenderIT {
statement.execute("create timeseries root.sg2.d1.s0 with datatype=int32, encoding=PLAIN");
statement.execute("create timeseries root.sg2.d2.s1 with datatype=boolean, encoding=PLAIN");
}
+
+ List<PipeData> resultList = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ resultList.add(simpleSchemaPipeData);
+ }
+ resultMap.put("schemaWithDel3InHistory", resultList); // del3 in history
+
+ resultList = new ArrayList<>();
+ for (int i = 0; i < 8; i++) {
+ resultList.add(simpleSchemaPipeData);
+ }
+ resultMap.put("schema", resultList); // del3 do not in history
}
private void prepareIns1() throws Exception { // add one seq tsfile in sg1
@@ -107,6 +142,8 @@ public class IoTDBSyncSenderIT {
statement.execute("insert into root.sg1.d2(timestamp, s4) values(1, 1)");
statement.execute("flush");
}
+
+ resultMap.put("ins1", Collections.singletonList(simpleTsFilePipeData));
}
private void prepareIns2() throws Exception { // add one seq tsfile in sg1
@@ -120,6 +157,8 @@ public class IoTDBSyncSenderIT {
statement.execute("insert into root.sg1.d2(timestamp, s4) values(200, 100)");
statement.execute("flush");
}
+
+ resultMap.put("ins2", Collections.singletonList(simpleTsFilePipeData));
}
private void prepareIns3()
@@ -137,27 +176,72 @@ public class IoTDBSyncSenderIT {
"insert into root.sg1.d1(timestamp, s1, s2, s3) values(200, 100, 16.65, 'h')");
statement.execute("flush");
}
+
+ resultMap.put(
+ "ins3",
+ Arrays.asList(
+ simpleTsFilePipeData, simpleTsFilePipeData, simpleTsFilePipeData)); // del3 in history
+ resultMap.put(
+ "ins3WithDel3InHistory",
+ Arrays.asList(simpleTsFilePipeData, simpleTsFilePipeData)); // del3 do not in history
+ }
+
+ private void prepareDel1() throws Exception { // after ins1, add 2 deletions
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("delete from root.sg1.d1.s1 where time == 3");
+ statement.execute("delete from root.sg1.d1.s2 where time >= 1 and time <= 2");
+ }
+
+ resultMap.put("del1", Arrays.asList(simpleDeletionPipeData, simpleDeletionPipeData));
+ }
+
+ private void prepareDel2() throws Exception { // after ins2, add 3 deletions
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("delete from root.sg1.d1.s3 where time <= 65");
+ }
+
+ resultMap.put(
+ "del2",
+ Arrays.asList(simpleDeletionPipeData, simpleDeletionPipeData, simpleDeletionPipeData));
+ resultMap.put("del2WithoutIns3", Arrays.asList(simpleDeletionPipeData, simpleDeletionPipeData));
+ }
+
+ private void prepareDel3() throws Exception { // after ins3, add 5 deletions, 2 schemas
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("delete from root.sg1.d1.* where time <= 2");
+ statement.execute("delete timeseries root.sg1.d2.*");
+ statement.execute("delete storage group root.sg2");
+ }
+
+ List<PipeData> resultList = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ resultList.add(simpleDeletionPipeData);
+ }
+ for (int i = 0; i < 2; i++) {
+ resultList.add(simpleSchemaPipeData);
+ }
+ resultMap.put("del3", resultList);
}
private void preparePipeAndSetMock() throws Exception {
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- statement.execute("start pipeserver");
statement.execute("create pipesink " + pipeSinkName + " as iotdb");
statement.execute("create pipe " + pipeName + " to " + pipeSinkName);
- pipe = (TsFilePipe) SenderService.getInstance().getRunningPipe();
- mock =
- new TransportClientMock(SenderService.getInstance().getRunningPipe(), "127.0.0.1", 2333);
- TransportHandler handler =
- new TransportHandler(
- mock, pipeName, SenderService.getInstance().getRunningPipe().getCreateTime());
- SenderService.getInstance().setTransportHandler(handler);
- Thread.sleep(1000L);
- statement.execute("stop pipeserver");
}
}
+ private void restart() throws Exception {
+ EnvironmentUtils.restartDaemon();
+ }
+
private void startPipe() throws Exception {
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -182,8 +266,8 @@ public class IoTDBSyncSenderIT {
}
}
- private void checkResult(List<PipeData> list) { // check ins1, ins2, ins3
- Assert.assertEquals(list.size(), 13);
+ private void checkInsOnlyResult(List<PipeData> list) { // check ins1, ins2, ins3
+ Assert.assertEquals(13, list.size());
for (int i = 0; i < 8; i++) {
Assert.assertTrue(list.get(i) instanceof SchemaPipeData);
}
@@ -192,18 +276,33 @@ public class IoTDBSyncSenderIT {
}
}
+ private void checkResult(List<String> resultString, List<PipeData> list) {
+ int totalNumber = 0;
+ for (String string : resultString) {
+ totalNumber += resultMap.get(string).size();
+ }
+ Assert.assertEquals(totalNumber, list.size());
+ int cnt = 0;
+ for (String string : resultString) {
+ for (PipeData pipeData : resultMap.get(string)) {
+ Assert.assertEquals(pipeData.getType(), list.get(cnt++).getType());
+ }
+ }
+ }
+
@Test
public void testHistoryInsert() {
try {
- prepareSchema();
+ prepareSchema(); // history
prepareIns1();
prepareIns2();
prepareIns3();
- preparePipeAndSetMock();
+ preparePipeAndSetMock(); // realtime
startPipe();
- Thread.sleep(1000L);
- checkResult(mock.getPipeDataList());
+
+ Thread.sleep(1000L); // check
+ checkInsOnlyResult(transportClient.getPipeDataList());
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -221,16 +320,17 @@ public class IoTDBSyncSenderIT {
@Test
public void testHistoryAndRealTimeInsert() {
try {
- prepareSchema();
+ prepareSchema(); // history
prepareIns1();
prepareIns2();
- preparePipeAndSetMock();
+ preparePipeAndSetMock(); // realtime
startPipe();
Thread.sleep(1000L);
prepareIns3();
- Thread.sleep(1000L);
- checkResult(mock.getPipeDataList());
+
+ Thread.sleep(1000L); // check
+ checkInsOnlyResult(transportClient.getPipeDataList());
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -248,17 +348,18 @@ public class IoTDBSyncSenderIT {
@Test
public void testStopAndStartInsert() {
try {
- prepareSchema();
+ prepareSchema(); // history
prepareIns1();
- preparePipeAndSetMock();
+ preparePipeAndSetMock(); // realtime
startPipe();
prepareIns2();
stopPipe();
prepareIns3();
startPipe();
- Thread.sleep(1000L);
- checkResult(mock.getPipeDataList());
+
+ Thread.sleep(1000L); // check
+ checkInsOnlyResult(transportClient.getPipeDataList());
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
@@ -274,20 +375,182 @@ public class IoTDBSyncSenderIT {
}
@Test
- public void testRealTimeSchemaAndStopInsert() {
+ public void testRealTimeAndStopInsert() {
try {
- preparePipeAndSetMock();
+ preparePipeAndSetMock(); // realtime
+ startPipe();
prepareSchema();
+ prepareIns1();
+ stopPipe();
+ prepareIns2();
+ startPipe();
+ prepareIns3();
+ stopPipe();
+
+ Thread.sleep(1000L); // check
+ checkInsOnlyResult(transportClient.getPipeDataList());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ dropPipe();
+ Thread.sleep(1000L);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testHistoryDel() {
+ try {
+ prepareSchema(); // history
+ prepareIns1();
+ prepareIns2();
+ prepareIns3();
+ prepareDel1();
+ prepareDel2();
+ prepareDel3();
+
+ preparePipeAndSetMock(); // realtime
+ startPipe();
+
+ Thread.sleep(1000L); // check
+ checkResult(
+ Arrays.asList("schemaWithDel3InHistory", "ins1", "ins2", "ins3WithDel3InHistory"),
+ transportClient.getPipeDataList());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ dropPipe();
+ Thread.sleep(1000L);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testRealtimeDel() {
+ try {
+ prepareSchema(); // history
+ prepareIns1();
+
+ preparePipeAndSetMock(); // realtime
+ startPipe();
+ prepareIns2();
+ prepareDel1();
+ stopPipe();
+ prepareIns3();
startPipe();
+ prepareDel2();
+ prepareDel3();
+ stopPipe();
+
+ Thread.sleep(1000L); // check
+ checkResult(
+ Arrays.asList("schema", "ins1", "ins2", "del1", "ins3", "del2", "del3"),
+ transportClient.getPipeDataList());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ dropPipe();
+ Thread.sleep(1000L);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testRestartWhileRunning() {
+ try {
+ prepareSchema(); // history
prepareIns1();
+ prepareIns2();
+
+ preparePipeAndSetMock(); // realtime
+ startPipe();
+ restart();
+ prepareIns3();
+
+ Thread.sleep(1000L); // check
+ checkInsOnlyResult(transportClient.getPipeDataList());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ dropPipe();
+ Thread.sleep(1000L);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testRestartWhileStopping() {
+ try {
+ prepareSchema(); // history
+ prepareIns1();
+
+ preparePipeAndSetMock(); // realtime
+ startPipe();
+ prepareIns2();
stopPipe();
+ restart();
+ prepareIns3();
+ startPipe();
+
+ Thread.sleep(1000L); // check
+ checkInsOnlyResult(transportClient.getPipeDataList());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ dropPipe();
+ Thread.sleep(1000L);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testRestartWithDel() {
+ try {
+ prepareSchema(); // history
+ prepareIns1();
+ prepareDel1();
+
+ preparePipeAndSetMock(); // realtime
+ startPipe();
prepareIns2();
+ stopPipe();
+ prepareDel2();
+ restart();
startPipe();
prepareIns3();
stopPipe();
+ prepareDel3();
+ startPipe();
- Thread.sleep(1000L);
- checkResult(mock.getPipeDataList());
+ Thread.sleep(1000L); // check
+ checkResult(
+ Arrays.asList("schema", "ins1", "ins2", "del2WithoutIns3", "ins3", "del3"),
+ transportClient.getPipeDataList());
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
index 702a72f6f9..1614e2e142 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
@@ -30,9 +30,9 @@ import java.util.ArrayList;
import java.util.List;
public class TransportClientMock implements ITransportClient {
- private final Pipe pipe;
- private final String ipAddress;
- private final int port;
+ private Pipe pipe;
+ private String ipAddress;
+ private int port;
private List<PipeData> pipeDataList;
@@ -49,6 +49,12 @@ public class TransportClientMock implements ITransportClient {
return new SyncResponse(ResponseType.INFO, "");
}
+ public void resetInfo(Pipe pipe, String ipAddress, int port) {
+ this.pipe = pipe;
+ this.ipAddress = ipAddress;
+ this.port = port;
+ }
+
@Override
public void run() {
try {
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
new file mode 100644
index 0000000000..be7ea71c85
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportHandlerMock.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.sync;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.service.TransportHandler;
+
+public class TransportHandlerMock extends TransportHandler {
+ private Pipe pipe;
+
+ public TransportHandlerMock(
+ Pipe pipe, IoTDBPipeSink pipeSink, TransportClientMock transportClientMock) {
+ super(pipe, pipeSink);
+ this.transportClient = transportClientMock;
+ }
+
+ @Override
+ protected void resetTransportClient(Pipe pipe) {
+ try {
+ super.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ super.resetTransportClient(pipe);
+ IoTDBPipeSink pipeSink = (IoTDBPipeSink) pipe.getPipeSink();
+ ((TransportClientMock) this.transportClient)
+ .resetInfo(pipe, pipeSink.getIp(), pipeSink.getPort());
+ this.pipe = pipe;
+
+ this.transportExecutorService =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(
+ ThreadName.SYNC_SENDER_PIPE.getName() + "-" + pipe.getName());
+ this.heartbeatExecutorService =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.SYNC_SENDER_HEARTBEAT.getName() + "-" + pipe.getName());
+ }
+
+ public TransportClientMock getTransportClientMock() {
+ return (TransportClientMock) transportClient;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index 2ea72f4711..07630b54b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -48,7 +48,7 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -65,7 +65,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -74,6 +73,8 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+
/**
* This class simulates the behaviour of configNode to manage the configs locally. The schema
* configs include storage group, schema region and template. The data config is dataRegion.
@@ -230,6 +231,11 @@ public class LocalConfigNode {
}
public void deleteStorageGroup(PartialPath storageGroup) throws MetadataException {
+
+ DeleteTimeSeriesPlan deleteTimeSeriesPlan =
+ SchemaSyncManager.getInstance()
+ .splitDeleteTimeseriesPlanByDevice(storageGroup.concatNode(MULTI_LEVEL_PATH_WILDCARD));
+
deleteSchemaRegionsInStorageGroup(
storageGroup, schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
@@ -237,8 +243,7 @@ public class LocalConfigNode {
templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
}
if (SchemaSyncManager.getInstance().isEnableSync()) {
- SchemaSyncManager.getInstance()
- .syncMetadataPlan(new DeleteStorageGroupPlan(Collections.singletonList(storageGroup)));
+ SchemaSyncManager.getInstance().syncMetadataPlan(deleteTimeSeriesPlan);
}
if (!config.isEnableMemControl()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 5a3622b1d3..a34b71633e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -720,7 +720,11 @@ public class LocalSchemaProcessor {
* @return A HashSet instance which stores devices paths.
*/
public Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException {
- return getBelongedSchemaRegion(timeseries).getBelongedDevices(timeseries);
+ Set<PartialPath> result = new TreeSet<>();
+ for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(timeseries, false)) {
+ result.addAll(schemaRegion.getBelongedDevices(timeseries));
+ }
+ return result;
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index c2e65729ed..b80af3b504 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -118,4 +118,16 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, deletePathList.size());
}
+
+ @Override
+ public String toString() {
+ return "DeleteTimeSeriesPlan{"
+ + "deletePathList="
+ + deletePathList
+ + ", results="
+ + results
+ + ", partitionFilter="
+ + partitionFilter
+ + '}';
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index f9e1228893..ae8b8d0b1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -139,6 +139,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
+ registerManager.register(SenderService.getInstance());
registerManager.register(WALManager.getInstance());
// in mpp mode we need to start some other services
@@ -188,7 +189,6 @@ public class IoTDB implements IoTDBMBean {
}
}
- registerManager.register(SenderService.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
// in mpp mode we temporarily don't start settle service because it uses StorageEngine directly
// in itself, but currently we need to use StorageEngineV2 instead of StorageEngine in mpp mode.
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
index a0c63590af..6f24cbdd05 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
@@ -22,41 +22,45 @@ import org.apache.iotdb.rpc.RpcUtils;
public class SyncConstant {
/** common */
- public static final String PIPE_LOG_DIR_NAME = "pipe-log";
+ public static final String UNKNOWN_IP = "UNKNOWN IP";
- public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+ public static final String SYNC_SYS_DIR = "sys";
+ public static final String FILE_DATA_DIR_NAME = "file-data";
+
+ // pipe log: serialNumber + SEPARATOR + SUFFIX
+ public static final String PIPE_LOG_DIR_NAME = "pipe-log";
+ public static final String PIPE_LOG_NAME_SEPARATOR = "_";
+ public static final String PIPE_LOG_NAME_SUFFIX = PIPE_LOG_NAME_SEPARATOR + "pipe.log";
+ public static final String COMMIT_LOG_NAME = "commit.log";
+ public static final Long DEFAULT_PIPE_LOG_SIZE_IN_BYTE = 10485760L;
/** sender */
- public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
- public static final int DEFAULT_PIPE_SINK_PORT = 6670;
+ // dir structure
+ public static final String SENDER_DIR_NAME = "sender";
- public static final String SENDER_PIPE_DIR_NAME = "sender";
- public static final String FINISH_COLLECT_LOCK_NAME = "finishCollect.lock";
public static final String HISTORY_PIPE_LOG_DIR_NAME = "history-" + PIPE_LOG_DIR_NAME;
- public static final Long DEFAULT_HEARTBEAT_DELAY_SECONDS = 10 * 60L;
- public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L;
- public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L;
- public static final Long DEFAULT_WAITING_FOR_STOP_MILLISECONDS = 100L;
+ public static final String FINISH_COLLECT_LOCK_NAME = "finishCollect.lock";
public static final String MODS_OFFSET_FILE_SUFFIX = ".offset";
- public static final String PIPE_LOG_NAME_SUFFIX = "_pipe.log";
- public static final Long DEFAULT_PIPE_LOG_SIZE_IN_BYTE = 10485760L;
- public static final String COMMIT_LOG_NAME = "commit.log";
-
- public static final String UNKNOWN_IP = "UNKNOWN IP";
+ // recover
public static final String SENDER_LOG_NAME = "senderService.log";
public static final String PLAN_SERIALIZE_SPLIT_CHARACTER = ",";
- public static final String SENDER_LOG_SPLIT_CHARACTER = " ";
- public static final int MESSAGE_LENGTH_LIMIT = 200;
+ public static final String SENDER_LOG_SPLIT_CHARACTER = "#";
- public static String getPipeLogName(long serialNumber) {
- return serialNumber + PIPE_LOG_NAME_SUFFIX;
- }
+ // data config
+ public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
+ public static final int DEFAULT_PIPE_SINK_PORT = 6670;
- public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
- return Long.parseLong(pipeLogName.split("_")[0]);
- }
+ public static final Long HEARTBEAT_DELAY_SECONDS = 10 * 60L;
+ public static final int CONNECT_TIMEOUT_MILLISECONDS = 1_000;
+ public static final int SOCKET_TIMEOUT_MILLISECONDS = 100_000;
+
+ public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L;
+ public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L;
+ public static final Long DEFAULT_WAITING_FOR_STOP_MILLISECONDS = 1000L;
+
+ public static final int MESSAGE_NUMBER_LIMIT = 1; // do not support multi lines now
/** transport */
@@ -65,11 +69,9 @@ public class SyncConstant {
Math.min(64 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
/** receiver */
- public static final String SYNC_SYS_DIR = "sys";
+ public static final String RECEIVER_DIR_NAME = "receiver";
- public static final String RECEIVER_DIR = "receiver";
public static final String RECEIVER_LOG_NAME = "receiverService.log";
public static final String RECEIVER_MSG_LOG_NAME = "receiverMessage.log";
- public static final String FILE_DATA_DIR_NAME = "file-data";
public static final String IP_SEPARATOR = "\\.";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
index fc38852597..2161b27e1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
@@ -32,33 +32,64 @@ public class SyncPathUtil {
// forbidding instantiation
}
+ // sync data structure
+ // data/sync
+ // |----sender dir
+ // | |----sender pipe dir
+ // | |----history pipe log dir
+ // | |----realtime pipe log dir
+ // | |----file data dir
+ // |----receiver dir
+ // |-----receiver pipe dir
+ // |----receiver pipe log dir
+ // |----file data dir
+
/** sender */
- public static String getSenderPipeDir(String pipeName, long createTime) {
+ public static String getSenderDir() {
return IoTDBDescriptor.getInstance().getConfig().getSyncDir()
+ File.separator
- + SyncConstant.SENDER_PIPE_DIR_NAME
- + String.format("-%s-%d", pipeName, createTime);
+ + SyncConstant.SENDER_DIR_NAME;
+ }
+
+ public static String getSenderPipeDir(String pipeName, long createTime) {
+ return getSenderDir() + File.separator + getSenderPipeDirName(pipeName, createTime);
+ }
+
+ public static String getSenderPipeDirName(String pipeName, long createTime) {
+ return String.format("%s-%d", pipeName, createTime);
}
- public static String getSenderHistoryPipeDataDir(String pipeName, long createTime) {
+ public static String getSenderHistoryPipeLogDir(String pipeName, long createTime) {
return getSenderPipeDir(pipeName, createTime)
+ File.separator
+ SyncConstant.HISTORY_PIPE_LOG_DIR_NAME;
}
- public static String getSenderRealTimePipeDataDir(String pipeName, long createTime) {
+ public static String getSenderRealTimePipeLogDir(String pipeName, long createTime) {
return getSenderPipeDir(pipeName, createTime) + File.separator + SyncConstant.PIPE_LOG_DIR_NAME;
}
+ public static String getSenderFileDataDir(String pipeName, long createTime) {
+ return getSenderPipeDir(pipeName, createTime)
+ + File.separator
+ + SyncConstant.FILE_DATA_DIR_NAME;
+ }
+
/** receiver */
- public static String getFileDataDirPath(IdentityInfo identityInfo) {
- return SyncPathUtil.getReceiverFileDataDir(
- identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+ public static String getReceiverDir() {
+ return IoTDBDescriptor.getInstance().getConfig().getSyncDir()
+ + File.separator
+ + SyncConstant.RECEIVER_DIR_NAME;
}
- public static String getPipeLogDirPath(IdentityInfo identityInfo) {
- return SyncPathUtil.getReceiverPipeLogDir(
- identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+ public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
+ return getReceiverDir()
+ + File.separator
+ + getReceiverPipeDirName(pipeName, remoteIp, createTime);
+ }
+
+ public static String getReceiverPipeDirName(String pipeName, String remoteIp, long createTime) {
+ return String.format("%s-%d-%s", pipeName, createTime, remoteIp);
}
public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) {
@@ -73,30 +104,31 @@ public class SyncPathUtil {
+ SyncConstant.FILE_DATA_DIR_NAME;
}
- public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
- return getReceiverDir()
- + File.separator
- + getReceiverPipeFolderName(pipeName, remoteIp, createTime);
- }
-
- public static String getReceiverPipeFolderName(
- String pipeName, String remoteIp, long createTime) {
- return String.format("%s-%d-%s", pipeName, createTime, remoteIp);
+ public static String getFileDataDirPath(IdentityInfo identityInfo) {
+ return SyncPathUtil.getReceiverFileDataDir(
+ identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
}
- public static String getReceiverDir() {
- return IoTDBDescriptor.getInstance().getConfig().getSyncDir()
- + File.separator
- + SyncConstant.RECEIVER_DIR;
+ public static String getPipeLogDirPath(IdentityInfo identityInfo) {
+ return SyncPathUtil.getReceiverPipeLogDir(
+ identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
}
+ /** common */
public static String getSysDir() {
return IoTDBDescriptor.getInstance().getConfig().getSyncDir()
+ File.separator
+ SyncConstant.SYNC_SYS_DIR;
}
- /** common */
+ public static String getPipeLogName(long serialNumber) {
+ return serialNumber + SyncConstant.PIPE_LOG_NAME_SUFFIX;
+ }
+
+ public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
+ return Long.parseLong(pipeLogName.split(SyncConstant.PIPE_LOG_NAME_SEPARATOR)[0]);
+ }
+
public static boolean createFile(File file) throws IOException {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
index a8568bd72e..9a89f4dd57 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
@@ -66,7 +66,7 @@ public class DeletionPipeData extends PipeData {
@Override
public String toString() {
- return "DeletionData{" + "deletion=" + deletion + ", serialNumber=" + serialNumber + '}';
+ return "DeletionData{" + "serialNumber=" + serialNumber + ", deletion=" + deletion + '}';
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
index 88dcba6bd0..212a922f16 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
@@ -98,7 +98,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
for (File file : logDir.listFiles())
if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX) && file.length() > 0) {
- startNumbers.add(SyncConstant.getSerialNumberFromPipeLogName(file.getName()));
+ startNumbers.add(SyncPathUtil.getSerialNumberFromPipeLogName(file.getName()));
}
if (startNumbers.size() != 0) {
Collections.sort(startNumbers);
@@ -114,7 +114,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
}
File writingPipeLog =
- new File(pipeLogDir, SyncConstant.getPipeLogName(pipeLogStartNumber.peekLast()));
+ new File(pipeLogDir, SyncPathUtil.getPipeLogName(pipeLogStartNumber.peekLast()));
try {
List<PipeData> recoverPipeData = parsePipeLog(writingPipeLog);
int recoverPipeDataSize = recoverPipeData.size();
@@ -157,7 +157,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
}
File readingPipeLog =
- new File(pipeLogDir, SyncConstant.getPipeLogName(pipeLogStartNumber.peek()));
+ new File(pipeLogDir, SyncPathUtil.getPipeLogName(pipeLogStartNumber.peek()));
try {
List<PipeData> recoverPipeData = parsePipeLog(readingPipeLog);
int recoverPipeDataSize = recoverPipeData.size();
@@ -214,7 +214,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
if (outputStream != null) {
outputStream.close();
}
- File newPipeLog = new File(pipeLogDir, SyncConstant.getPipeLogName(startSerialNumber));
+ File newPipeLog = new File(pipeLogDir, SyncPathUtil.getPipeLogName(startSerialNumber));
SyncPathUtil.createFile(newPipeLog);
outputStream = new DataOutputStream(new FileOutputStream(newPipeLog));
@@ -250,7 +250,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
outputDeque = inputDeque;
} else {
List<PipeData> parsePipeData =
- parsePipeLog(new File(pipeLogDir, SyncConstant.getPipeLogName(serialNumber)));
+ parsePipeLog(new File(pipeLogDir, SyncPathUtil.getPipeLogName(serialNumber)));
int parsePipeDataSize = parsePipeData.size();
outputDeque = new LinkedBlockingDeque<>();
for (int i = 0; i < parsePipeDataSize; i++) {
@@ -354,7 +354,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
if (!pipeLogStartNumber.isEmpty() && pipeLogStartNumber.peek() <= commitSerialNumber) {
try {
Files.deleteIfExists(
- new File(pipeLogDir, SyncConstant.getPipeLogName(nowPipeLogStartNumber)).toPath());
+ new File(pipeLogDir, SyncPathUtil.getPipeLogName(nowPipeLogStartNumber)).toPath());
} catch (IOException e) {
logger.warn(
String.format("Delete %s-pipe.log error, because %s.", nowPipeLogStartNumber, e));
@@ -401,7 +401,7 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
}
@Override
- public void clear() {
+ public void close() {
try {
if (outputStream != null) {
outputStream.close();
@@ -415,12 +415,18 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
inputDeque = null;
pipeLogStartNumber = null;
outputDeque = null;
- File logDir = new File(pipeLogDir);
- if (logDir.exists()) {
- FileUtils.deleteDirectory(logDir);
- }
} catch (IOException e) {
- logger.warn(String.format("Clear pipe log dir %s error, because %s.", pipeLogDir, e));
+ logger.warn(String.format("Close pipe log dir %s error.", pipeLogDir), e);
+ }
+ }
+
+ @Override
+ public void clear() {
+ close();
+
+ File logDir = new File(pipeLogDir);
+ if (logDir.exists()) {
+ FileUtils.deleteDirectory(logDir);
}
}
@@ -431,7 +437,6 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
pipeData.add(PipeData.deserialize(inputStream));
}
} catch (EOFException e) {
- logger.info(String.format("Finish parsing pipeLog %s.", file.getPath()));
} catch (IllegalPathException e) {
logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
throw new IOException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueue.java
index 576a06a4c4..1fb1054b19 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueue.java
@@ -33,5 +33,7 @@ public interface PipeDataQueue {
boolean isEmpty();
+ void close();
+
void clear();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java
index 99829641d7..8ea32f5a58 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java
@@ -84,7 +84,7 @@ public class Collector {
}
public void startPipe(String pipeName, String remoteIp, long createTime) {
- String dir = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+ String dir = SyncPathUtil.getReceiverPipeDirName(pipeName, remoteIp, createTime);
synchronized (dir.intern()) {
if (!taskFutures.containsKey(dir)) {
ScanTask task = new ScanTask(pipeName, remoteIp, createTime);
@@ -94,7 +94,7 @@ public class Collector {
}
public void stopPipe(String pipeName, String remoteIp, long createTime) {
- String dir = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+ String dir = SyncPathUtil.getReceiverPipeDirName(pipeName, remoteIp, createTime);
logger.info("try stop task key={}", dir);
synchronized (dir.intern()) {
if (taskFutures.containsKey(dir)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
index 0ad998ea62..ba0c0f5296 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
@@ -135,7 +135,7 @@ public class ReceiverManager {
*/
public synchronized void writePipeMessage(
String pipeName, String remoteIp, long createTime, PipeMessage message) {
- String pipeIdentifier = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+ String pipeIdentifier = SyncPathUtil.getReceiverPipeDirName(pipeName, remoteIp, createTime);
try {
log.writePipeMsg(pipeIdentifier, message);
} catch (IOException e) {
@@ -161,7 +161,7 @@ public class ReceiverManager {
public synchronized List<PipeMessage> getPipeMessages(
String pipeName, String remoteIp, long createTime, boolean consume) {
List<PipeMessage> pipeMessageList = new ArrayList<>();
- String pipeIdentifier = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+ String pipeIdentifier = SyncPathUtil.getReceiverPipeDirName(pipeName, remoteIp, createTime);
if (consume) {
try {
log.comsumePipeMsg(pipeIdentifier);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
index e2bfb99c1e..6e3686df17 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
@@ -40,7 +40,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
/**
@@ -80,20 +79,7 @@ public class SchemaSyncManager {
}
public void syncMetadataPlan(PhysicalPlan plan) {
- try {
- switch (plan.getOperatorType()) {
- case DELETE_STORAGE_GROUP:
- syncPipe.collectRealTimeMetaData(
- splitDeleteTimeseriesPlanByDevice(
- plan.getPaths().get(0).concatNode(MULTI_LEVEL_PATH_WILDCARD)));
- break;
- default:
- syncPipe.collectRealTimeMetaData(plan);
- break;
- }
- } catch (MetadataException e) {
-
- }
+ syncPipe.collectRealTimeMetaData(plan);
}
public void clear() {
@@ -144,7 +130,7 @@ public class SchemaSyncManager {
return result;
}
- private DeleteTimeSeriesPlan splitDeleteTimeseriesPlanByDevice(PartialPath pathPattern)
+ public DeleteTimeSeriesPlan splitDeleteTimeseriesPlanByDevice(PartialPath pathPattern)
throws MetadataException {
return new DeleteTimeSeriesPlan(splitPathPatternByDevice(pathPattern));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java
index 3c5b619a63..4ba9194838 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.util.Objects;
+
public class IoTDBPipeSink implements PipeSink {
private final PipeSink.Type type;
@@ -94,4 +96,20 @@ public class IoTDBPipeSink implements PipeSink {
+ port
+ '}';
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ IoTDBPipeSink pipeSink = (IoTDBPipeSink) o;
+ return port == pipeSink.port
+ && type == pipeSink.type
+ && Objects.equals(name, pipeSink.name)
+ && Objects.equals(ip, pipeSink.ip);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, name, ip, port);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
index 82d1ccce9d..94aae7b40b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.sync.sender.pipe;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.sync.pipedata.PipeData;
+import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.sync.transport.client.ITransportClient;
/**
@@ -52,6 +53,15 @@ public interface Pipe {
*/
void drop() throws PipeException;
+ /**
+ * Close this pipe, stop collecting data from IoTDB, but do not delete information about this pipe
+ * on disk. Used for {@linkplain SenderService#shutdown(long)}. Do not change the status of this
+ * pipe.
+ *
+ * @throws PipeException Some inside error happens(such as IOException about disk).
+ */
+ void close() throws PipeException;
+
/**
* Get the name of this pipe.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
index 286847916e..3faefbfbdb 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
@@ -41,6 +41,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
public class TsFilePipe implements Pipe {
@@ -73,9 +74,9 @@ public class TsFilePipe implements Pipe {
this.syncDelOp = syncDelOp;
this.historyQueue =
- new BufferedPipeDataQueue(SyncPathUtil.getSenderHistoryPipeDataDir(name, createTime));
+ new BufferedPipeDataQueue(SyncPathUtil.getSenderHistoryPipeLogDir(name, createTime));
this.realTimeQueue =
- new BufferedPipeDataQueue(SyncPathUtil.getSenderRealTimePipeDataDir(name, createTime));
+ new BufferedPipeDataQueue(SyncPathUtil.getSenderRealTimePipeLogDir(name, createTime));
this.pipeLog = new TsFilePipeLogger(this);
this.collectRealTimeDataLock = new ReentrantLock();
@@ -298,7 +299,7 @@ public class TsFilePipe implements Pipe {
status = PipeStatus.DROP;
}
- private void clear() throws PipeException {
+ private void clear() {
deregisterMetadata();
deregisterTsFile();
isCollectingRealTimeData = false;
@@ -312,6 +313,20 @@ public class TsFilePipe implements Pipe {
}
}
+ @Override
+ public void close() throws PipeException {
+ if (status == PipeStatus.DROP) {
+ return;
+ }
+
+ deregisterMetadata();
+ deregisterTsFile();
+ isCollectingRealTimeData = false;
+
+ historyQueue.close();
+ realTimeQueue.close();
+ }
+
@Override
public String getName() {
return name;
@@ -331,4 +346,44 @@ public class TsFilePipe implements Pipe {
public synchronized PipeStatus getStatus() {
return status;
}
+
+ @Override
+ public String toString() {
+ return "TsFilePipe{"
+ + "createTime="
+ + createTime
+ + ", name='"
+ + name
+ + '\''
+ + ", pipeSink="
+ + pipeSink
+ + ", dataStartTime="
+ + dataStartTime
+ + ", syncDelOp="
+ + syncDelOp
+ + ", pipeLog="
+ + pipeLog
+ + ", isCollectingRealTimeData="
+ + isCollectingRealTimeData
+ + ", maxSerialNumber="
+ + maxSerialNumber
+ + ", status="
+ + status
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TsFilePipe that = (TsFilePipe) o;
+ return createTime == that.createTime
+ && Objects.equals(name, that.name)
+ && Objects.equals(pipeSink, that.pipeSink);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(createTime, name, pipeSink);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java
index bb95056a6e..c5124d4dfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
+import org.apache.iotdb.db.sync.sender.service.MsgManager;
import org.apache.iotdb.db.sync.sender.service.SenderService;
import java.io.BufferedReader;
@@ -46,7 +47,7 @@ public class SenderLogAnalyzer {
private Pipe runningPipe;
private Pipe.PipeStatus runningPipeStatus;
- private String runningMsg;
+ private MsgManager msgManager;
public SenderLogAnalyzer() throws IOException {
senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
@@ -56,7 +57,7 @@ public class SenderLogAnalyzer {
this.pipeSinks = new HashMap<>();
this.pipes = new ArrayList<>();
- this.runningMsg = "";
+ this.msgManager = new MsgManager();
}
public void recover() throws IOException {
@@ -97,19 +98,20 @@ public class SenderLogAnalyzer {
Long.parseLong(parseStrings[1]));
pipes.add(runningPipe);
runningPipeStatus = runningPipe.getStatus();
- runningMsg = "";
+ msgManager.addPipe(runningPipe);
break;
case STOP_PIPE: // ignore status check
runningPipeStatus = Pipe.PipeStatus.STOP;
- appendMsg(parseStrings);
+ msgManager.recoverMsg(parseStrings);
break;
case START_PIPE:
runningPipeStatus = Pipe.PipeStatus.RUNNING;
- appendMsg(parseStrings);
+ msgManager.recoverMsg(parseStrings);
break;
case DROP_PIPE:
runningPipeStatus = Pipe.PipeStatus.DROP;
runningPipe.drop();
+ msgManager.removeAllPipe();
break;
default:
throw new UnsupportedOperationException(
@@ -145,15 +147,6 @@ public class SenderLogAnalyzer {
br.close();
}
- private void appendMsg(String[] parseStrings) {
- if (parseStrings.length == 3) {
- if (runningMsg.length() > 0) {
- runningMsg += System.lineSeparator();
- }
- runningMsg += parseStrings[2];
- }
- }
-
public Map<String, PipeSink> getRecoveryAllPipeSinks() {
return pipeSinks;
}
@@ -166,7 +159,7 @@ public class SenderLogAnalyzer {
return runningPipe;
}
- public String getRecoveryRunningMsg() {
- return runningMsg;
+ public MsgManager getMsgManager() {
+ return msgManager;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java
index af522c889a..02f6628f79 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java
@@ -60,7 +60,7 @@ public class SenderLogger {
}
}
- public void addPipeSink(CreatePipeSinkPlan plan) {
+ public synchronized void addPipeSink(CreatePipeSinkPlan plan) {
getBufferedWriter();
try {
bw.write(Operator.OperatorType.CREATE_PIPESINK.name());
@@ -73,7 +73,7 @@ public class SenderLogger {
}
}
- public void dropPipeSink(String pipeSinkName) {
+ public synchronized void dropPipeSink(String pipeSinkName) {
getBufferedWriter();
try {
bw.write(Operator.OperatorType.DROP_PIPESINK.name());
@@ -86,7 +86,7 @@ public class SenderLogger {
}
}
- public void addPipe(CreatePipePlan plan, long pipeCreateTime) {
+ public synchronized void addPipe(CreatePipePlan plan, long pipeCreateTime) {
getBufferedWriter();
try {
bw.write(Operator.OperatorType.CREATE_PIPE.name());
@@ -101,7 +101,7 @@ public class SenderLogger {
}
}
- public void operatePipe(String pipeName, Operator.OperatorType type) {
+ public synchronized void operatePipe(String pipeName, Operator.OperatorType type) {
getBufferedWriter();
try {
bw.write(type.name());
@@ -114,7 +114,7 @@ public class SenderLogger {
}
}
- public void recordMsg(String pipeName, Operator.OperatorType type, String msg) {
+ public synchronized void recordMsg(String pipeName, Operator.OperatorType type, String msg) {
getBufferedWriter();
try {
bw.write(type.name());
@@ -129,7 +129,7 @@ public class SenderLogger {
}
}
- public void close() {
+ public synchronized void close() {
try {
if (bw != null) {
bw.close();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
index e6e599005e..40ea57e5bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
@@ -46,7 +46,7 @@ public class TsFilePipeLogger {
public TsFilePipeLogger(TsFilePipe tsFilePipe) {
pipeDir = SyncPathUtil.getSenderPipeDir(tsFilePipe.getName(), tsFilePipe.getCreateTime());
- tsFileDir = new File(pipeDir, SyncConstant.FILE_DATA_DIR_NAME).getPath();
+ tsFileDir = SyncPathUtil.getSenderFileDataDir(tsFilePipe.getName(), tsFilePipe.getCreateTime());
}
/** make hard link for tsfile * */
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
new file mode 100644
index 0000000000..4853dec5d1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/MsgManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.sync.sender.service;
+
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.sync.conf.SyncPathUtil;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
+import org.apache.iotdb.db.sync.sender.recovery.SenderLogger;
+import org.apache.iotdb.service.transport.thrift.ResponseType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class MsgManager {
+ private static final Logger logger = LoggerFactory.getLogger(MsgManager.class);
+
+ private SenderLogger senderLogger;
+
+ private Pipe runningPipe;
+ private Queue<String> Messages;
+
+ public MsgManager() {}
+
+ public MsgManager(SenderLogger senderLogger) {
+ this.senderLogger = senderLogger;
+ }
+
+ public void addPipe(Pipe pipe) {
+ this.runningPipe = pipe;
+ Messages = new ArrayDeque<>();
+ }
+
+ public void removeAllPipe() {
+ runningPipe = null;
+ Messages = null;
+ }
+
+ public synchronized void recordMsg(
+ Pipe pipe, Operator.OperatorType operatorType, ResponseType type, String inputMsg) {
+ if (runningPipe == null) {
+ logger.warn(
+ String.format("No running Pipe for recording msg [%s] %s.", type.name(), inputMsg));
+ return;
+ } else if (!pipe.equals(runningPipe)) {
+ logger.warn(
+ String.format(
+ "Input Pipe %s is not equal running Pipe %s, ignore it.",
+ pipe.getName(), runningPipe.getName()));
+ }
+
+ String msg = String.format("[%s] ", type.name()) + SyncPathUtil.createMsg(inputMsg);
+ if (Messages.size() > SyncConstant.MESSAGE_NUMBER_LIMIT) {
+ Messages.poll();
+ }
+ Messages.offer(msg);
+ if (senderLogger != null) { // not in recover
+ senderLogger.recordMsg(pipe.getName(), operatorType, msg);
+ }
+ }
+
+ public synchronized String getPipeMsg(Pipe pipe) {
+ if (runningPipe == null) {
+ return "";
+ } else if (!pipe.equals(runningPipe) || Messages.size() == 0) {
+ return "";
+ }
+
+ StringBuilder builder = new StringBuilder();
+ int size = Messages.size();
+ for (int i = 0; i < size; i++) {
+ String msg = Messages.poll();
+ if (i < SyncConstant.MESSAGE_NUMBER_LIMIT) {
+ builder.append(msg);
+ // builder.append(System.lineSeparator()); do not support multi lines now
+ }
+ Messages.offer(msg);
+ }
+ if (size > SyncConstant.MESSAGE_NUMBER_LIMIT) {
+ builder.append(" ...");
+ }
+ // builder.append(" (for More info, check $IOTDB_HOME$/log/ please.)");
+ return builder.toString();
+ }
+
+ public void recoverMsg(String[] parseStrings) {
+ if (parseStrings.length == 3) {
+ if (Messages.size() > SyncConstant.MESSAGE_NUMBER_LIMIT) {
+ Messages.poll();
+ }
+ Messages.offer(parseStrings[2]);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
index b4861053ef..116fb65bcb 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
@@ -39,8 +38,6 @@ import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.sender.recovery.SenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recovery.SenderLogger;
-import org.apache.iotdb.db.sync.transport.client.ITransportClient;
-import org.apache.iotdb.db.sync.transport.client.TransportClient;
import org.apache.iotdb.service.transport.thrift.RequestType;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -64,7 +61,7 @@ public class SenderService implements IService {
private List<Pipe> pipes;
private Pipe runningPipe;
- private String runningMsg;
+ private MsgManager msgManager;
private TransportHandler transportHandler;
@@ -167,13 +164,8 @@ public class SenderService implements IService {
PipeSink runningPipeSink = getPipeSink(plan.getPipeSinkName());
runningPipe = parseCreatePipePlan(plan, runningPipeSink, currentTime);
try {
- ITransportClient transportClient =
- new TransportClient(
- runningPipe,
- ((IoTDBPipeSink) runningPipeSink).getIp(),
- ((IoTDBPipeSink) runningPipeSink).getPort());
transportHandler =
- new TransportHandler(transportClient, runningPipe.getName(), runningPipe.getCreateTime());
+ TransportHandler.getNewTransportHandler(runningPipe, (IoTDBPipeSink) runningPipeSink);
sendMsg(RequestType.CREATE);
} catch (ClassCastException e) {
logger.error(
@@ -191,7 +183,7 @@ public class SenderService implements IService {
throw e;
}
- runningMsg = "";
+ msgManager.addPipe(runningPipe);
pipes.add(runningPipe);
senderLogger.addPipe(plan, currentTime);
}
@@ -243,6 +235,7 @@ public class SenderService implements IService {
}
runningPipe.drop();
+ msgManager.removeAllPipe();
sendMsg(RequestType.DROP);
senderLogger.operatePipe(pipeName, Operator.OperatorType.DROP_PIPE);
} catch (InterruptedException e) {
@@ -258,7 +251,7 @@ public class SenderService implements IService {
}
public synchronized String getPipeMsg(Pipe pipe) {
- return pipe == runningPipe ? runningMsg : "";
+ return msgManager.getPipeMsg(pipe);
}
private void checkRunningPipeExistAndName(String pipeName) throws PipeException {
@@ -291,7 +284,7 @@ public class SenderService implements IService {
public synchronized void receiveMsg(SyncResponse response) {
if (runningPipe == null || runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
- logger.warn(String.format("No running pipe for receiving msg %s.", response));
+ logger.info(String.format("No running pipe for receiving msg %s.", response));
return;
}
switch (response.type) {
@@ -302,21 +295,18 @@ public class SenderService implements IService {
try {
stopPipe(runningPipe.getName());
} catch (PipeException e) {
- logger.error(
+ logger.warn(
String.format(
"Stop pipe %s when meeting error in sender service.", runningPipe.getName()),
e);
}
case WARN:
- if (runningMsg.length() > 0) {
- runningMsg += System.lineSeparator();
- }
- runningMsg += (response.type.name() + " " + response.msg);
- senderLogger.recordMsg(
- runningPipe.getName(),
+ msgManager.recordMsg(
+ runningPipe,
runningPipe.getStatus() == Pipe.PipeStatus.RUNNING
? Operator.OperatorType.START_PIPE
: Operator.OperatorType.STOP_PIPE,
+ response.type,
response.msg);
break;
}
@@ -328,7 +318,7 @@ public class SenderService implements IService {
this.pipeSinks = new HashMap<>();
this.pipes = new ArrayList<>();
this.senderLogger = new SenderLogger();
- this.runningMsg = "";
+ this.msgManager = new MsgManager(senderLogger);
File senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
if (senderLog.exists()) {
@@ -346,33 +336,35 @@ public class SenderService implements IService {
if (runningPipe != null && !Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
try {
runningPipe.stop();
+ transportHandler.stop();
} catch (PipeException e) {
logger.warn(
- String.format(
- "Stop pipe %s error when stop Sender Service, because %s.",
- runningPipe.getName(), e));
+ String.format("Stop pipe %s error when stop Sender Service.", runningPipe.getName()),
+ e);
}
}
}
@Override
public void shutdown(long milliseconds) throws ShutdownException {
+ pipeSinks = null;
+ pipes = null;
+ msgManager = null;
+ senderLogger.close();
+
if (runningPipe != null && !Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
try {
runningPipe.stop();
- } catch (PipeException e) {
+ transportHandler.close();
+ runningPipe.close();
+ } catch (PipeException | InterruptedException e) {
logger.warn(
String.format(
- "Stop pipe %s error when shutdown Sender Service, because %s.",
- runningPipe.getName(), e));
+ "Stop pipe %s error when shutdown Sender Service.", runningPipe.getName()),
+ e);
throw new ShutdownException(e);
}
}
-
- pipeSinks = null;
- pipes = null;
- runningMsg = null;
- senderLogger.close();
}
@Override
@@ -380,36 +372,21 @@ public class SenderService implements IService {
return ServiceType.SENDER_SERVICE;
}
- private void recover() throws IOException, InterruptedException {
+ private void recover() throws IOException {
SenderLogAnalyzer analyzer = new SenderLogAnalyzer();
analyzer.recover();
this.pipeSinks = analyzer.getRecoveryAllPipeSinks();
this.pipes = analyzer.getRecoveryAllPipes();
this.runningPipe = analyzer.getRecoveryRunningPipe();
- this.runningMsg = analyzer.getRecoveryRunningMsg();
+ this.msgManager = analyzer.getMsgManager();
- if (runningPipe != null) {
- IoTDBPipeSink pipeSink = (IoTDBPipeSink) runningPipe.getPipeSink();
- ITransportClient transportClient =
- new TransportClient(runningPipe, pipeSink.getIp(), pipeSink.getPort());
+ if (runningPipe != null && !Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
this.transportHandler =
- new TransportHandler(transportClient, runningPipe.getName(), runningPipe.getCreateTime());
+ TransportHandler.getNewTransportHandler(
+ runningPipe, (IoTDBPipeSink) runningPipe.getPipeSink());
if (Pipe.PipeStatus.RUNNING.equals(runningPipe.getStatus())) {
transportHandler.start();
- } else if (Pipe.PipeStatus.DROP.equals(runningPipe.getStatus())) {
- transportHandler.close();
}
}
}
-
- /** test */
- @TestOnly
- public Pipe getRunningPipe() {
- return runningPipe;
- }
-
- @TestOnly
- public void setTransportHandler(TransportHandler handler) {
- this.transportHandler = handler;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
index ed71629c7c..f510dd52da 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.sync.sender.service;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
+import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.transport.client.ITransportClient;
+import org.apache.iotdb.db.sync.transport.client.TransportClient;
import org.apache.iotdb.service.transport.thrift.RequestType;
import org.apache.iotdb.service.transport.thrift.SyncRequest;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
@@ -40,22 +44,23 @@ import java.util.concurrent.TimeUnit;
public class TransportHandler {
private static final Logger logger = LoggerFactory.getLogger(TransportHandler.class);
+ private static TransportHandler DEBUG_TRANSPORT_HANDLER = null; // test only
- private final String pipeName;
+ private String pipeName;
+ private long createTime;
private final String localIp;
- private final long createTime;
- private final ITransportClient transportClient;
+ protected ITransportClient transportClient;
- private final ExecutorService transportExecutorService;
+ protected ExecutorService transportExecutorService;
private Future transportFuture;
- private final ScheduledExecutorService heartbeatExecutorService;
+ protected ScheduledExecutorService heartbeatExecutorService;
private Future heartbeatFuture;
- public TransportHandler(ITransportClient transportClient, String pipeName, long createTime) {
- this.pipeName = pipeName;
- this.createTime = createTime;
- this.transportClient = transportClient;
+ public TransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
+ this.pipeName = pipe.getName();
+ this.createTime = pipe.getCreateTime();
+ this.transportClient = new TransportClient(pipe, pipeSink.getIp(), pipeSink.getPort());
this.transportExecutorService =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -80,8 +85,8 @@ public class TransportHandler {
heartbeatFuture =
heartbeatExecutorService.scheduleWithFixedDelay(
this::sendHeartbeat,
- SyncConstant.DEFAULT_HEARTBEAT_DELAY_SECONDS,
- SyncConstant.DEFAULT_HEARTBEAT_DELAY_SECONDS,
+ SyncConstant.HEARTBEAT_DELAY_SECONDS,
+ SyncConstant.HEARTBEAT_DELAY_SECONDS,
TimeUnit.SECONDS);
}
@@ -124,4 +129,24 @@ public class TransportHandler {
pipeName, e));
}
}
+
+ public static TransportHandler getNewTransportHandler(Pipe pipe, IoTDBPipeSink pipeSink) {
+ if (DEBUG_TRANSPORT_HANDLER == null) {
+ return new TransportHandler(pipe, pipeSink);
+ }
+ DEBUG_TRANSPORT_HANDLER.resetTransportClient(pipe); // test only
+ return DEBUG_TRANSPORT_HANDLER;
+ }
+
+ /** test */
+ @TestOnly
+ public static void setDebugTransportHandler(TransportHandler transportHandler) {
+ DEBUG_TRANSPORT_HANDLER = transportHandler;
+ }
+
+ @TestOnly
+ protected void resetTransportClient(Pipe pipe) {
+ this.pipeName = pipe.getName();
+ this.createTime = pipe.getCreateTime();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
index e75ca777b2..c495a7b256 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
@@ -23,13 +23,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.conf.SyncConstant;
-import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.sync.transport.conf.TransportConstant;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.service.transport.thrift.IdentityInfo;
import org.apache.iotdb.service.transport.thrift.MetaInfo;
import org.apache.iotdb.service.transport.thrift.RequestType;
@@ -44,6 +44,7 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,8 +73,6 @@ public class TransportClient implements ITransportClient {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private static final int TIMEOUT_MS = 2000_000;
-
private static final int TRANSFER_BUFFER_SIZE_IN_BYTES = 1 * 1024 * 1024;
private TTransport transport = null;
@@ -126,7 +125,14 @@ public class TransportClient implements ITransportClient {
}
try {
- transport = RpcTransportFactory.INSTANCE.getTransport(ipAddress, port, TIMEOUT_MS);
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ ipAddress,
+ port,
+ SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+ SyncConstant.CONNECT_TIMEOUT_MILLISECONDS));
TProtocol protocol;
if (config.isRpcThriftCompressionEnable()) {
protocol = new TCompactProtocol(transport);
@@ -461,14 +467,14 @@ public class TransportClient implements ITransportClient {
while (!Thread.currentThread().isInterrupted()) {
PipeData pipeData = pipe.take();
if (!senderTransport(pipeData)) {
- logger.warn(String.format("Can not transfer pipedata %s, skip it.", pipeData));
+ logger.error(String.format("Can not transfer pipedata %s, skip it.", pipeData));
// can do something.
SenderService.getInstance()
.receiveMsg(
new SyncResponse(
ResponseType.WARN,
- SyncPathUtil.createMsg(
- String.format("Transfer piepdata %s error, skip it.", pipeData))));
+ String.format(
+ "Transfer piepdata %s error, skip it.", pipeData.getSerialNumber())));
continue;
}
pipe.commit();
@@ -482,10 +488,9 @@ public class TransportClient implements ITransportClient {
.receiveMsg(
new SyncResponse(
ResponseType.ERROR,
- SyncPathUtil.createMsg(
- String.format(
- "Can not connect to %s:%d, because %s, please check receiver and internet.",
- ipAddress, port, e.getMessage()))));
+ String.format(
+ "Can not connect to %s:%d, please check receiver and Internet.",
+ ipAddress, port)));
} finally {
close();
}
@@ -504,7 +509,13 @@ public class TransportClient implements ITransportClient {
}
try (TTransport heartbeatTransport =
- RpcTransportFactory.INSTANCE.getTransport(ipAddress, port, TIMEOUT_MS)) {
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ ipAddress,
+ port,
+ SyncConstant.SOCKET_TIMEOUT_MILLISECONDS,
+ SyncConstant.CONNECT_TIMEOUT_MILLISECONDS))) {
TProtocol protocol;
if (config.isRpcThriftCompressionEnable()) {
protocol = new TCompactProtocol(heartbeatTransport);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
index 4a9b17eafe..c051e85e77 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
@@ -73,7 +73,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput1 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
for (int i = 0; i < 4; i++) {
new TsFilePipeData("", i).serialize(pipeLogOutput1);
}
@@ -82,7 +82,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput2 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
for (int i = 4; i < 11; i++) {
new TsFilePipeData("", i).serialize(pipeLogOutput2);
}
@@ -91,7 +91,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput3 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11)), false));
pipeLogOutput3.close();
// recovery
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
@@ -210,7 +210,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput1 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
for (int i = 0; i < 4; i++) {
PipeData pipeData = new TsFilePipeData("fake" + i, i);
pipeDataList.add(pipeData);
@@ -221,7 +221,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput2 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
for (int i = 4; i < 8; i++) {
PipeData pipeData =
new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
@@ -239,7 +239,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput3 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11)), false));
pipeLogOutput3.close();
// recovery
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
@@ -300,7 +300,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput1 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
for (int i = 0; i < 4; i++) {
PipeData pipeData = new TsFilePipeData("fake" + i, i);
pipeDataList.add(pipeData);
@@ -311,7 +311,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput2 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
for (int i = 4; i < 8; i++) {
PipeData pipeData =
new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
@@ -329,7 +329,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput3 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11)), false));
pipeLogOutput3.close();
// recovery
BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
@@ -387,7 +387,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput1 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
for (int i = 0; i < 4; i++) {
PipeData pipeData = new TsFilePipeData("fake" + i, i);
pipeDataList.add(pipeData);
@@ -398,7 +398,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput2 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
for (int i = 4; i < 8; i++) {
PipeData pipeData =
new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
@@ -468,7 +468,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput1 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0)), false));
for (int i = 0; i < 4; i++) {
PipeData pipeData = new TsFilePipeData("fake" + i, i);
pipeDataList.add(pipeData);
@@ -479,7 +479,7 @@ public class BufferedPipeDataQueueTest {
DataOutputStream pipeLogOutput2 =
new DataOutputStream(
new FileOutputStream(
- new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ new File(pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4)), false));
for (int i = 4; i < 8; i++) {
PipeData pipeData =
new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
index ac41da744e..ae47d4d8d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -87,8 +87,8 @@ public class ReceiverLogAnalyzerTest {
@Test
public void testMessageLog() {
- String pipeIdentifier1 = SyncPathUtil.getReceiverPipeFolderName(pipe1, ip1, createdTime1);
- String pipeIdentifier2 = SyncPathUtil.getReceiverPipeFolderName(pipe2, ip2, createdTime2);
+ String pipeIdentifier1 = SyncPathUtil.getReceiverPipeDirName(pipe1, ip1, createdTime1);
+ String pipeIdentifier2 = SyncPathUtil.getReceiverPipeDirName(pipe2, ip2, createdTime2);
try {
ReceiverLog log = new ReceiverLog();
PipeMessage info = new PipeMessage(PipeMessage.MsgType.INFO, "info");