You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/02 14:02:18 UTC
[iotdb] branch new_sync updated: [To new_sync][IOTDB-1907] implement customized sync process: pipeDataQueue (#5123)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_sync by this push:
new b3bff7c [To new_sync][IOTDB-1907] implement customized sync process: pipeDataQueue (#5123)
b3bff7c is described below
commit b3bff7cd8c3fcd29c6fc7ddba0b8dab3cc5b3778
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Wed Mar 2 22:01:22 2022 +0800
[To new_sync][IOTDB-1907] implement customized sync process: pipeDataQueue (#5123)
---
.../sync/IoTDBSyncReceiverCollectorIT.java | 6 +-
.../BufferedPipeDataBlockingQueue.java | 2 +-
.../apache/iotdb/db/newsync/conf/SyncConstant.java | 58 +++
.../db/newsync/{utils => conf}/SyncPathUtil.java | 15 +-
.../apache/iotdb/db/newsync/pipedata/PipeData.java | 28 +-
.../iotdb/db/newsync/pipedata/TsFilePipeData.java | 50 +-
.../pipedata/queue/BufferedPipeDataQueue.java | 428 ++++++++++++++++
.../queue/PipeDataQueue.java} | 25 +-
.../iotdb/db/newsync/receiver/ReceiverService.java | 2 +-
.../db/newsync/receiver/collector/Collector.java | 6 +-
.../db/newsync/receiver/recovery/ReceiverLog.java | 4 +-
.../receiver/recovery/ReceiverLogAnalyzer.java | 4 +-
.../db/newsync/sender/conf/.SenderConf.java.swp | Bin 12288 -> 0 bytes
.../iotdb/db/newsync/sender/conf/SenderConf.java | 63 ---
.../db/newsync/sender/pipe/IoTDBPipeSink.java | 6 +-
.../iotdb/db/newsync/sender/pipe/TsFilePipe.java | 19 +-
.../newsync/sender/recovery/SenderLogAnalyzer.java | 7 +-
.../db/newsync/sender/recovery/SenderLogger.java | 11 +-
.../sender/recovery/TsFilePipeLogAnalyzer.java | 20 +-
.../{TsFilePipeLog.java => TsFilePipeLogger.java} | 75 ++-
.../db/newsync/sender/service/SenderService.java | 5 +-
.../org/apache/iotdb/db/qp/logical/Operator.java | 2 +
.../iotdb/db/qp/physical/sys/CreatePipePlan.java | 20 +-
.../db/qp/physical/sys/CreatePipeSinkPlan.java | 18 +-
.../pipedata/BufferedPipeDataQueueTest.java | 542 +++++++++++++++++++++
.../iotdb/db/newsync/pipedata/PipeDataTest.java | 4 +
26 files changed, 1210 insertions(+), 210 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
index 1b13d13..64af3c7 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
@@ -22,14 +22,14 @@ import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.conf.BufferedPipeDataBlockingQueue;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.newsync.receiver.collector.Collector;
-import org.apache.iotdb.db.newsync.utils.BufferedPipeDataBlockingQueue;
-import org.apache.iotdb.db.newsync.utils.SyncConstant;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/BufferedPipeDataBlockingQueue.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/conf/BufferedPipeDataBlockingQueue.java
index 537bc4e..dae6932 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/utils/BufferedPipeDataBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/BufferedPipeDataBlockingQueue.java
@@ -17,7 +17,7 @@
* under the License.
*
*/
-package org.apache.iotdb.db.newsync.utils;
+package org.apache.iotdb.db.newsync.conf;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
new file mode 100644
index 0000000..730c382
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncConstant.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.conf;
+
+public class SyncConstant {
+ /** sender */
+ public static final String DEFAULT_PIPE_SINK_IP = "127.0.0.1";
+
+ public static final int DEFAULT_PIPE_SINK_PORT = 6670;
+
+ public static final String SENDER_PIPE_DIR_NAME = "sender";
+ public static final String FINISH_COLLECT_LOCK_NAME = "finishCollect.lock";
+ public static final Long DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS = 500L;
+ public static final Long DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER = 10L;
+ public static final Long DEFAULT_WAITING_FOR_DEREGISTER_MILLISECONDS = 100L;
+ public static final String MODS_OFFSET_FILE_SUFFIX = ".offset";
+ public static final String PIPE_LOG_NAME_SUFFIX = "-pipe.log";
+ public static final Long DEFAULT_PIPE_LOG_SIZE_IN_BYTE = 10485760L;
+ public static final String HISTORY_PIPE_LOG_NAME = PIPE_LOG_NAME_SUFFIX + ".history";
+ public static final String COMMIT_LOG_NAME = "commit.log";
+
+ public static final String SENDER_LOG_NAME = "senderService.log";
+ public static final String PLAN_SERIALIZE_SPLIT_CHARACTER = ",";
+ public static final String SENDER_LOG_SPLIT_CHARACTER = " ";
+
+ public static String getPipeLogName(long serialNumber) {
+ return serialNumber + PIPE_LOG_NAME_SUFFIX;
+ }
+
+ public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
+ return Long.parseLong(pipeLogName.split("-")[0]);
+ }
+
+ /** receiver */
+ public static final String SYNC_SYS_DIR = "sys";
+
+ public static final String RECEIVER_DIR = "receiver";
+ public static final String RECEIVER_LOG_NAME = "receiverService.log";
+ public static final String PIPE_LOG_DIR_NAME = "pipe-log";
+ public static final String FILE_DATA_DIR_NAME = "file-data";
+ public static final String COLLECTOR_SUFFIX = ".collector";
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncPathUtil.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncPathUtil.java
index f57a080..9d20799 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncPathUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/conf/SyncPathUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.newsync.utils;
+package org.apache.iotdb.db.newsync.conf;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -24,16 +24,25 @@ import java.io.File;
/** Util for path generation in sync module */
public class SyncPathUtil {
+ /** sender */
+ public static String getSenderPipeDir(String pipeName, long createTime) {
+ return IoTDBDescriptor.getInstance().getConfig().getNewSyncDir()
+ + File.separator
+ + SyncConstant.SENDER_PIPE_DIR_NAME
+ + String.format("-%s-%d", pipeName, createTime);
+ }
+
+ /** receiver */
public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) {
return getReceiverPipeDir(pipeName, remoteIp, createTime)
+ File.separator
- + SyncConstant.PIPELOG_DIR_NAME;
+ + SyncConstant.PIPE_LOG_DIR_NAME;
}
public static String getReceiverFileDataDir(String pipeName, String remoteIp, long createTime) {
return getReceiverPipeDir(pipeName, remoteIp, createTime)
+ File.separator
- + SyncConstant.FILEDATA_DIR_NAME;
+ + SyncConstant.FILE_DATA_DIR_NAME;
}
public static String getReceiverPipeDir(String pipeName, String remoteIp, long createTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
index 2e16115..c8d9389 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.newsync.receiver.load.ILoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -32,7 +34,7 @@ import java.io.IOException;
public abstract class PipeData {
private static final Logger logger = LoggerFactory.getLogger(PipeData.class);
- protected final long serialNumber;
+ protected long serialNumber;
public PipeData(long serialNumber) {
this.serialNumber = serialNumber;
@@ -42,17 +44,9 @@ public abstract class PipeData {
return serialNumber;
}
- // abstract public Loader.Type getLoaderType() {
- // if (tsFilePath != null) {
- // return Loader.Type.TsFile;
- // } else if (deletion != null) {
- // return Loader.Type.Deletion;
- // } else if (plan != null) {
- // return Loader.Type.PhysicalPlan;
- // }
- // logger.error("Wrong type for transport type.");
- // return null;
- // }
+ public void setSerialNumber(long serialNumber) {
+ this.serialNumber = serialNumber;
+ }
public abstract Type getType();
@@ -65,6 +59,12 @@ public abstract class PipeData {
return serializeSize;
}
+ public byte[] serialize() throws IOException {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ serialize(new DataOutputStream(byteStream));
+ return byteStream.toByteArray();
+ }
+
public static PipeData deserialize(DataInputStream stream)
throws IOException, IllegalPathException {
Type type = Type.values()[stream.readByte()];
@@ -82,6 +82,10 @@ public abstract class PipeData {
}
}
+ public static PipeData deserialize(byte[] bytes) throws IllegalPathException, IOException {
+ return deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+ }
+
public abstract ILoader createLoader();
public abstract void sendToTransport();
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
index 3ff101f..aec255d 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.newsync.pipedata;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.newsync.receiver.load.ILoader;
import org.apache.iotdb.db.newsync.receiver.load.TsFileLoader;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
@@ -48,6 +48,10 @@ public class TsFilePipeData extends PipeData {
this.tsFilePath = tsFilePath;
}
+ public void setTsFilePath(String tsFilePath) {
+ this.tsFilePath = tsFilePath;
+ }
+
@Override
public Type getType() {
return Type.TSFILE;
@@ -77,20 +81,39 @@ public class TsFilePipeData extends PipeData {
}
}
+ public List<File> getTsFiles() throws FileNotFoundException {
+ File tsFile = new File(tsFilePath).getAbsoluteFile();
+ File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ File mods = new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
+
+ List<File> files = new ArrayList<>();
+ if (!tsFile.exists()) {
+ throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath()));
+ }
+ files.add(tsFile);
+ if (resource.exists()) {
+ files.add(resource);
+ }
+ if (mods.exists()) {
+ files.add(mods);
+ }
+ return files;
+ }
+
private boolean waitForTsFileClose() {
- for (int i = 0; i < SenderConf.defaultWaitingForTsFileRetryNumber; i++) {
+ for (int i = 0; i < SyncConstant.DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER; i++) {
if (isTsFileClosed()) {
return true;
}
try {
- Thread.sleep(SenderConf.defaultWaitingForTsFileCloseMilliseconds);
+ Thread.sleep(SyncConstant.DEFAULT_WAITING_FOR_TSFILE_CLOSE_MILLISECONDS);
} catch (InterruptedException e) {
logger.warn(String.format("Be Interrupted when waiting for tsfile %s closed", tsFilePath));
}
logger.info(
String.format(
"Waiting for tsfile %s close, retry %d / %d.",
- tsFilePath, (i + 1), SenderConf.defaultWaitingForTsFileRetryNumber));
+ tsFilePath, (i + 1), SyncConstant.DEFAULT_WAITING_FOR_TSFILE_RETRY_NUMBER));
}
return false;
}
@@ -101,25 +124,6 @@ public class TsFilePipeData extends PipeData {
return resource.exists();
}
- public List<File> getTsFiles() throws FileNotFoundException {
- File tsFile = new File(tsFilePath).getAbsoluteFile();
- File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
- File mods = new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
-
- List<File> files = new ArrayList<>();
- if (!tsFile.exists()) {
- throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath()));
- }
- files.add(tsFile);
- if (resource.exists()) {
- files.add(resource);
- }
- if (mods.exists()) {
- files.add(mods);
- }
- return files;
- }
-
@Override
public String toString() {
return "TsFilePipeData{"
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/BufferedPipeDataQueue.java
new file mode 100644
index 0000000..62020fd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/BufferedPipeDataQueue.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.pipedata.queue;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
+import org.apache.iotdb.db.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+
+public class BufferedPipeDataQueue implements PipeDataQueue {
+ private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueue.class);
+
+ private final String pipeLogDir;
+
+ /** input */
+ private long lastMaxSerialNumber;
+
+ private BlockingDeque<PipeData> inputDeque;
+
+ private BlockingDeque<Long> pipeLogStartNumber;
+ private DataOutputStream outputStream;
+ private long currentPipeLogSize;
+
+ /** output */
+ private final Object waitLock = new Object();
+
+ private BlockingDeque<PipeData> outputDeque;
+
+ private long pullSerialNumber;
+ private long commitSerialNumber;
+ private DataOutputStream commitLogWriter;
+ private long currentCommitLogSize;
+
+ public BufferedPipeDataQueue(String pipeLogDir) {
+ this.pipeLogDir = pipeLogDir;
+
+ this.lastMaxSerialNumber = 0;
+ this.pipeLogStartNumber = new LinkedBlockingDeque<>();
+
+ this.outputDeque = new LinkedBlockingDeque<>();
+ this.commitSerialNumber = Long.MIN_VALUE;
+
+ recover();
+ }
+
+ /** recover */
+ private void recover() {
+ if (!new File(pipeLogDir).exists()) {
+ return;
+ }
+
+ recoverPipeLogStartNumber();
+ recoverLastMaxSerialNumber();
+ recoverCommitSerialNumber();
+ recoverOutputDeque();
+ }
+
+ private void recoverPipeLogStartNumber() {
+ File logDir = new File(pipeLogDir);
+ List<Long> startNumbers = new ArrayList<>();
+
+ for (File file : logDir.listFiles())
+ if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX)) {
+ startNumbers.add(SyncConstant.getSerialNumberFromPipeLogName(file.getName()));
+ }
+ if (startNumbers.size() != 0) {
+ Collections.sort(startNumbers);
+ for (Long startTime : startNumbers) {
+ pipeLogStartNumber.offer(startTime);
+ }
+ }
+ }
+
+ private void recoverLastMaxSerialNumber() {
+ if (pipeLogStartNumber.isEmpty()) {
+ return;
+ }
+
+ File writingPipeLog =
+ new File(pipeLogDir, SyncConstant.getPipeLogName(pipeLogStartNumber.peekLast()));
+ try {
+ List<PipeData> recoverPipeData = parsePipeLog(writingPipeLog);
+ int recoverPipeDataSize = recoverPipeData.size();
+ lastMaxSerialNumber =
+ recoverPipeDataSize == 0
+ ? pipeLogStartNumber.peekLast() - 1
+ : recoverPipeData.get(recoverPipeDataSize - 1).getSerialNumber();
+ } catch (IOException e) {
+ logger.error(
+ String.format(
+ "Can not recover inputQueue from %s, because %s.", writingPipeLog.getPath(), e));
+ }
+ }
+
+ private void recoverCommitSerialNumber() {
+ File commitLog = new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME);
+ if (!commitLog.exists()) {
+ if (!pipeLogStartNumber.isEmpty()) {
+ commitSerialNumber = pipeLogStartNumber.peek() - 1;
+ }
+ return;
+ }
+
+ try (RandomAccessFile raf = new RandomAccessFile(commitLog, "r")) {
+ if (raf.length() >= Long.BYTES) {
+ raf.seek(raf.length() - Long.BYTES);
+ commitSerialNumber = raf.readLong();
+ }
+ } catch (IOException e) {
+ logger.error(
+ String.format(
+ "deserialize remove serial number error, remove serial number has been set to %d, because %s",
+ commitSerialNumber, e));
+ }
+ }
+
+ private void recoverOutputDeque() {
+ if (pipeLogStartNumber.isEmpty()) {
+ return;
+ }
+
+ File readingPipeLog =
+ new File(pipeLogDir, SyncConstant.getPipeLogName(pipeLogStartNumber.peek()));
+ try {
+ List<PipeData> recoverPipeData = parsePipeLog(readingPipeLog);
+ int recoverPipeDataSize = recoverPipeData.size();
+ for (int i = recoverPipeDataSize - 1; i >= 0; --i) {
+ PipeData pipeData = recoverPipeData.get(i);
+ if (pipeData.getSerialNumber() <= commitSerialNumber) {
+ break;
+ }
+ outputDeque.addFirst(pipeData);
+ }
+ } catch (IOException e) {
+ logger.error(
+ String.format(
+ "Recover output deque from pipe log %s error, because %s.",
+ readingPipeLog.getPath(), e));
+ }
+ }
+
+ public long getLastMaxSerialNumber() {
+ return lastMaxSerialNumber;
+ }
+
+ public long getCommitSerialNumber() {
+ return commitSerialNumber;
+ }
+
+ /** input */
+ @Override
+ public boolean offer(PipeData pipeData) {
+ if (outputStream == null || currentPipeLogSize > SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
+ try {
+ moveToNextPipeLog(pipeData.getSerialNumber());
+ } catch (IOException e) {
+ logger.warn(String.format("Move to next pipe log %s error, because %s.", pipeData, e));
+ }
+ }
+ if (!inputDeque.offer(pipeData)) {
+ return false;
+ }
+ synchronized (waitLock) {
+ waitLock.notifyAll();
+ }
+
+ try {
+ writeToDisk(pipeData);
+ } catch (IOException e) {
+ logger.error(String.format("Record pipe data %s error, because %s.", pipeData, e));
+ return false;
+ }
+ return true;
+ }
+
+ private synchronized void moveToNextPipeLog(long startSerialNumber) throws IOException {
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ File newPipeLog = new File(pipeLogDir, SyncConstant.getPipeLogName(startSerialNumber));
+ createFile(newPipeLog);
+
+ outputStream = new DataOutputStream(new FileOutputStream(newPipeLog));
+ pipeLogStartNumber.offer(startSerialNumber);
+ currentPipeLogSize = 0;
+
+ inputDeque = new LinkedBlockingDeque<>();
+ if (commitSerialNumber == Long.MIN_VALUE) {
+ commitSerialNumber = startSerialNumber - 1;
+ }
+ }
+
+ private void writeToDisk(PipeData pipeData) throws IOException {
+ // skip trick
+
+ currentPipeLogSize += pipeData.serialize(outputStream);
+ outputStream.flush();
+ }
+
+ /** output */
+ private synchronized PipeData pullOnePipeData(long lastSerialNumber) throws IOException {
+ long serialNumber = lastSerialNumber + 1;
+ if (!outputDeque.isEmpty()) {
+ return outputDeque.poll();
+ } else if (outputDeque != inputDeque) {
+ if (pipeLogStartNumber.isEmpty()) {
+ return null;
+ }
+
+ if (serialNumber > pipeLogStartNumber.peekLast()) {
+ return null;
+ } else if (serialNumber == pipeLogStartNumber.peekLast() && inputDeque != null) {
+ outputDeque = inputDeque;
+ } else {
+ List<PipeData> parsePipeData =
+ parsePipeLog(new File(pipeLogDir, SyncConstant.getPipeLogName(serialNumber)));
+ int parsePipeDataSize = parsePipeData.size();
+ outputDeque = new LinkedBlockingDeque<>();
+ for (int i = 0; i < parsePipeDataSize; i++) {
+ outputDeque.offer(parsePipeData.get(i));
+ }
+ }
+ return outputDeque.poll();
+ }
+ return null;
+ }
+
+ @Override
+ public List<PipeData> pull(long serialNumber) {
+ List<PipeData> resPipeData = new ArrayList<>();
+
+ pullSerialNumber = commitSerialNumber;
+ while (pullSerialNumber < serialNumber) {
+ try {
+ PipeData pullPipeData = pullOnePipeData(pullSerialNumber);
+ if (pullPipeData != null) {
+ resPipeData.add(pullPipeData);
+ pullSerialNumber = pullPipeData.getSerialNumber();
+ } else {
+ break;
+ }
+ } catch (IOException e) {
+ logger.error(
+ String.format(
+ "Pull pipe data serial number %s error, because %s.", pullSerialNumber + 1, e));
+ break;
+ }
+ }
+
+ for (int i = resPipeData.size() - 1; i >= 0; --i) {
+ outputDeque.addFirst(resPipeData.get(i));
+ }
+ return resPipeData;
+ }
+
+ @Override
+ public PipeData take() throws InterruptedException {
+ PipeData pipeData = null;
+ try {
+ synchronized (waitLock) {
+ pipeData = pullOnePipeData(commitSerialNumber);
+ if (pipeData == null) {
+ waitLock.wait();
+ waitLock.notifyAll();
+ pipeData = pullOnePipeData(commitSerialNumber);
+ }
+ }
+ } catch (IOException e) {
+ logger.error(
+ String.format(
+ "Blocking pull pipe data number %s error, because %s", commitSerialNumber + 1, e));
+ }
+ outputDeque.addFirst(pipeData);
+ pullSerialNumber = pipeData.getSerialNumber();
+ return pipeData;
+ }
+
+ @Override
+ public void commit() {
+ deletePipeData();
+ deletePipeLog();
+ serializeCommitSerialNumber();
+ }
+
+ private void deletePipeData() {
+ while (commitSerialNumber < pullSerialNumber) {
+ commitSerialNumber += 1;
+ try {
+ PipeData commitData = pullOnePipeData(commitSerialNumber);
+ if (PipeData.Type.TSFILE.equals(commitData.getType())) {
+ List<File> tsFiles = ((TsFilePipeData) commitData).getTsFiles();
+ for (File file : tsFiles) {
+ Files.deleteIfExists(file.toPath());
+ }
+ }
+ } catch (IOException e) {
+ logger.error(
+ String.format(
+ "Commit pipe data serial number %s error, because %s.", commitSerialNumber, e));
+ }
+ }
+ }
+
+ private void deletePipeLog() {
+ if (pipeLogStartNumber.size() >= 2) {
+ long nowPipeLogStartNumber;
+ while (true) {
+ nowPipeLogStartNumber = pipeLogStartNumber.poll();
+ if (!pipeLogStartNumber.isEmpty() && pipeLogStartNumber.peek() <= commitSerialNumber) {
+ try {
+ Files.deleteIfExists(
+ new File(pipeLogDir, SyncConstant.getPipeLogName(nowPipeLogStartNumber)).toPath());
+ } catch (IOException e) {
+ logger.warn(
+ String.format("Delete %s-pipe.log error, because %s.", nowPipeLogStartNumber, e));
+ }
+ } else {
+ break;
+ }
+ }
+ pipeLogStartNumber.addFirst(nowPipeLogStartNumber);
+ }
+ }
+
+ private void serializeCommitSerialNumber() {
+ try {
+ if (commitLogWriter == null) {
+ commitLogWriter =
+ new DataOutputStream(
+ new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME)));
+ currentCommitLogSize = 0;
+ }
+ commitLogWriter.writeLong(commitSerialNumber);
+ commitLogWriter.flush();
+ currentCommitLogSize += Long.BYTES;
+ if (currentCommitLogSize >= SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
+ commitLogWriter.close();
+ commitLogWriter = null;
+ }
+ } catch (IOException e) {
+ logger.error(
+ String.format(
+ "Serialize commit serial number %s error, because %s.", commitSerialNumber, e));
+ }
+ }
+
+ /** common */
+ @Override
+ public void clear() {
+ try {
+ if (outputStream != null) {
+ outputStream.close();
+ outputStream = null;
+ }
+ if (commitLogWriter != null) {
+ commitLogWriter.close();
+ commitLogWriter = null;
+ }
+
+ inputDeque = null;
+ pipeLogStartNumber = null;
+ outputDeque = null;
+ File logDir = new File(pipeLogDir);
+ if (logDir.exists()) {
+ FileUtils.deleteDirectory(logDir);
+ }
+ } catch (IOException e) {
+ logger.warn(String.format("Clear pipe log dir %s error, because %s.", pipeLogDir, e));
+ }
+ }
+
+ private boolean createFile(File file) throws IOException {
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdirs();
+ }
+ return file.createNewFile();
+ }
+
+ public static List<PipeData> parsePipeLog(File file) throws IOException {
+ List<PipeData> pipeData = new ArrayList<>();
+ try (DataInputStream inputStream = new DataInputStream(new FileInputStream(file))) {
+ while (true) {
+ pipeData.add(PipeData.deserialize(inputStream));
+ }
+ } catch (EOFException e) {
+ logger.info(String.format("Finish parsing pipeLog %s.", file.getPath()));
+ } catch (IllegalPathException e) {
+ logger.error(String.format("Parsing pipeLog %s error, because %s", file.getPath(), e));
+ throw new IOException(e);
+ }
+ return pipeData;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/PipeDataQueue.java
similarity index 61%
rename from server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/PipeDataQueue.java
index ecf0bd3..6adac03 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/utils/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/queue/PipeDataQueue.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,13 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.newsync.utils;
+package org.apache.iotdb.db.newsync.pipedata.queue;
-public class SyncConstant {
- public static final String SYNC_SYS_DIR = "sys";
- public static final String RECEIVER_DIR = "receiver";
- public static final String RECEIVER_LOG_NAME = "receiverService.log";
- public static final String PIPELOG_DIR_NAME = "pipe-log";
- public static final String FILEDATA_DIR_NAME = "file-data";
- public static final String COLLECTOR_SUFFIX = ".collector";
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+
+import java.util.List;
+
+public interface PipeDataQueue {
+ boolean offer(PipeData data);
+
+ List<PipeData> pull(long serialNumber);
+
+ PipeData take() throws InterruptedException;
+
+ void commit();
+
+ void clear();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
index 7729815..4ded585 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/ReceiverService.java
@@ -20,11 +20,11 @@ package org.apache.iotdb.db.newsync.receiver;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.receiver.collector.Collector;
import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
import org.apache.iotdb.db.newsync.receiver.manager.ReceiverManager;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
index 9321f5c..c278b74 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/collector/Collector.java
@@ -22,10 +22,10 @@ package org.apache.iotdb.db.newsync.receiver.collector;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.conf.BufferedPipeDataBlockingQueue;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
-import org.apache.iotdb.db.newsync.utils.BufferedPipeDataBlockingQueue;
-import org.apache.iotdb.db.newsync.utils.SyncConstant;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
index c0dc7b4..7222919 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLog.java
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.db.newsync.receiver.recovery;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
-import org.apache.iotdb.db.newsync.utils.SyncConstant;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
import java.io.BufferedWriter;
import java.io.File;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
index 5ffe24c..bbf55a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.db.newsync.receiver.recovery;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.newsync.receiver.manager.PipeStatus;
-import org.apache.iotdb.db.newsync.utils.SyncConstant;
-import org.apache.iotdb.db.newsync.utils.SyncPathUtil;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/.SenderConf.java.swp b/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/.SenderConf.java.swp
deleted file mode 100644
index 18a3081..0000000
Binary files a/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/.SenderConf.java.swp and /dev/null differ
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/SenderConf.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/SenderConf.java
deleted file mode 100644
index 813f259..0000000
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/conf/SenderConf.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.newsync.sender.conf;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
-
-import java.io.File;
-
-public class SenderConf {
- public static final String defaultPipeSinkIP = "127.0.0.1";
- public static final int defaultPipeSinkPort = 6670;
-
- public static final String syncBaseDir =
- IoTDBDescriptor.getInstance().getConfig().getNewSyncDir();
-
- public static final String pipeDir = syncBaseDir + File.separator + "sender_pipe ";
- public static final String pipeCollectFinishLockName = "finishCollect.lock";
- public static final Long defaultWaitingForTsFileCloseMilliseconds = 500L;
- public static final Long defaultWaitingForTsFileRetryNumber = 10L;
- public static final Long defaultWaitingForDeregisterMilliseconds = 100L;
- public static final String tsFileDirName = "TsFile_data";
- public static final String modsOffsetFileSuffix = ".offset";
- public static final String pipeLogDirName = "Pipe_log";
- public static final String historyPipeLogName = "pipe_data.log.history";
- public static final String realTimePipeLogNameSuffix = "-pipe_data.log";
- public static final Long defaultPipeLogSizeInByte = 10485760L;
- public static final String removeSerialNumberLogName = "remove_serial_number.log";
-
- public static final String sysDir = syncBaseDir + File.separator + "sys";
- public static final String senderLog = sysDir + File.separator + "senderService.log";
- public static final String planSplitCharacter = ",";
- public static final String senderLogSplitCharacter = " ";
-
- public static String getPipeDir(Pipe pipe) {
- return pipeDir + "_" + pipe.getName() + "_" + pipe.getCreateTime();
- }
-
- public static String getRealTimePipeLogName(long serialNumber) {
- return serialNumber + realTimePipeLogNameSuffix;
- }
-
- public static Long getSerialNumberFromPipeLogName(String pipeLogName) {
- return Long.parseLong(pipeLogName.split("-")[0]);
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/IoTDBPipeSink.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/IoTDBPipeSink.java
index d4ea459..5180919 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/IoTDBPipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/IoTDBPipeSink.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.newsync.sender.pipe;
import org.apache.iotdb.db.exception.PipeSinkException;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class IoTDBPipeSink implements PipeSink {
@@ -31,8 +31,8 @@ public class IoTDBPipeSink implements PipeSink {
private int port;
public IoTDBPipeSink(String name) {
- ip = SenderConf.defaultPipeSinkIP;
- port = SenderConf.defaultPipeSinkPort;
+ ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
+ port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
this.name = name;
type = Type.IoTDB;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
index 99e6d44..6fd1fb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
@@ -27,13 +27,14 @@ import org.apache.iotdb.db.engine.storagegroup.virtualSg.StorageGroupManager;
import org.apache.iotdb.db.exception.PipeException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
-import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLog;
import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLogAnalyzer;
+import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLogger;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.service.IoTDB;
@@ -64,7 +65,7 @@ public class TsFilePipe implements Pipe {
private final boolean syncDelOp;
private final ExecutorService singleExecutorService;
- private final TsFilePipeLog pipeLog;
+ private final TsFilePipeLogger pipeLog;
private final ReentrantLock collectRealTimeDataLock;
private BlockingDeque<PipeData> pipeDataDeque;
@@ -81,7 +82,7 @@ public class TsFilePipe implements Pipe {
this.dataStartTime = dataStartTime;
this.syncDelOp = syncDelOp;
- this.pipeLog = new TsFilePipeLog(this);
+ this.pipeLog = new TsFilePipeLogger(this);
this.singleExecutorService =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.PIPE_SERVICE.getName() + "-" + name);
@@ -106,7 +107,7 @@ public class TsFilePipe implements Pipe {
public synchronized void start() throws PipeException {
if (status == PipeStatus.DROP) {
throw new PipeException(
- String.format("Can not start pipe %s, because the pipe is drop.", name));
+ String.format("Can not start pipe %s, because the pipe has been drop.", name));
} else if (status == PipeStatus.RUNNING) {
return;
}
@@ -128,7 +129,9 @@ public class TsFilePipe implements Pipe {
status = PipeStatus.RUNNING;
} catch (IOException e) {
logger.error(
- String.format("Clear pipe dir %s error, because %s.", SenderConf.getPipeDir(this), e));
+ String.format(
+ "Clear pipe dir %s error, because %s.",
+ SyncPathUtil.getSenderPipeDir(name, createTime), e));
throw new PipeException("Start error, can not clear pipe log.");
}
}
@@ -384,7 +387,7 @@ public class TsFilePipe implements Pipe {
while (!pipeDataDeque.isEmpty() && pipeDataDeque.peek().getSerialNumber() <= serialNumber) {
PipeData data = pipeDataDeque.poll();
try {
- pipeLog.removePipeData(pipeDataDeque.poll().getSerialNumber());
+ pipeLog.removePipeData(data);
} catch (IOException e) {
logger.warn(
String.format(
@@ -432,7 +435,7 @@ public class TsFilePipe implements Pipe {
singleExecutorService.shutdown();
try {
- Thread.sleep(SenderConf.defaultWaitingForDeregisterMilliseconds);
+ Thread.sleep(SyncConstant.DEFAULT_WAITING_FOR_DEREGISTER_MILLISECONDS);
} catch (InterruptedException e) {
logger.warn(
String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
index 8f2364d..2504145 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.db.newsync.sender.recovery;
import org.apache.iotdb.db.exception.PipeException;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
import org.apache.iotdb.db.newsync.sender.pipe.PipeSink;
import org.apache.iotdb.db.newsync.sender.service.SenderService;
@@ -48,7 +49,7 @@ public class SenderLogAnalyzer {
private Pipe.PipeStatus runningPipeStatus;
public SenderLogAnalyzer() throws IOException {
- senderLog = new File(SenderConf.senderLog);
+ senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
if (!senderLog.exists()) {
senderLog.createNewFile();
}
@@ -67,7 +68,7 @@ public class SenderLogAnalyzer {
try {
while ((readLine = br.readLine()) != null) {
lineNumber += 1;
- parseStrings = readLine.split(SenderConf.senderLogSplitCharacter);
+ parseStrings = readLine.split(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
Operator.OperatorType type = Operator.OperatorType.valueOf(parseStrings[0]);
switch (type) {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogger.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogger.java
index 7f18cda..9afba24 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogger.java
@@ -19,7 +19,8 @@
*/
package org.apache.iotdb.db.newsync.sender.recovery;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
@@ -45,7 +46,7 @@ public class SenderLogger {
return;
}
- File senderLog = new File(SenderConf.senderLog);
+ File senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
if (!senderLog.exists()) {
if (!senderLog.getParentFile().exists()) {
senderLog.getParentFile().mkdirs();
@@ -76,7 +77,7 @@ public class SenderLogger {
getBufferedWriter();
try {
bw.write(Operator.OperatorType.DROP_PIPESINK.name());
- bw.write(SenderConf.senderLogSplitCharacter);
+ bw.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
bw.write(pipeSinkName);
bw.newLine();
bw.flush();
@@ -89,7 +90,7 @@ public class SenderLogger {
getBufferedWriter();
try {
bw.write(Operator.OperatorType.CREATE_PIPE.name());
- bw.write(SenderConf.senderLogSplitCharacter);
+ bw.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
bw.write(String.valueOf(pipeCreateTime));
bw.newLine();
bw.write(plan.toString());
@@ -104,7 +105,7 @@ public class SenderLogger {
getBufferedWriter();
try {
bw.write(type.name());
- bw.write(SenderConf.senderLogSplitCharacter);
+ bw.write(SyncConstant.SENDER_LOG_SPLIT_CHARACTER);
bw.write(pipeName);
bw.newLine();
bw.flush();
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
index 13750f1..f0ac0ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
@@ -20,8 +20,9 @@
package org.apache.iotdb.db.newsync.sender.recovery;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
import org.slf4j.Logger;
@@ -51,8 +52,8 @@ public class TsFilePipeLogAnalyzer {
private long removeSerialNumber;
public TsFilePipeLogAnalyzer(TsFilePipe pipe) {
- pipeDir = SenderConf.getPipeDir(pipe);
- pipeLogDir = new File(pipeDir, SenderConf.pipeLogDirName).getPath();
+ pipeDir = SyncPathUtil.getSenderPipeDir(pipe.getName(), pipe.getCreateTime());
+ pipeLogDir = new File(pipeDir, SyncConstant.PIPE_LOG_DIR_NAME).getPath();
}
public BlockingDeque<PipeData> recover() {
@@ -71,7 +72,7 @@ public class TsFilePipeLogAnalyzer {
}
private void deserializeRemoveSerialNumber() {
- File removeSerialNumberLog = new File(pipeLogDir, SenderConf.removeSerialNumberLogName);
+ File removeSerialNumberLog = new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME);
if (!removeSerialNumberLog.exists()) {
return;
}
@@ -98,7 +99,7 @@ public class TsFilePipeLogAnalyzer {
}
private void recoverHistoryData() {
- File historyPipeLog = new File(pipeLogDir, SenderConf.historyPipeLogName);
+ File historyPipeLog = new File(pipeLogDir, SyncConstant.HISTORY_PIPE_LOG_NAME);
if (!historyPipeLog.exists()) {
return;
}
@@ -134,14 +135,13 @@ public class TsFilePipeLogAnalyzer {
List<Long> startNumbers = new ArrayList<>();
for (File file : pipeLogDir.listFiles())
- if (file.getName().endsWith(SenderConf.realTimePipeLogNameSuffix)) {
- startNumbers.add(SenderConf.getSerialNumberFromPipeLogName(file.getName()));
+ if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX)) {
+ startNumbers.add(SyncConstant.getSerialNumberFromPipeLogName(file.getName()));
}
if (startNumbers.size() != 0) {
Collections.sort(startNumbers);
for (Long startNumber : startNumbers) {
- File realTimePipeLog =
- new File(this.pipeLogDir, SenderConf.getRealTimePipeLogName(startNumber));
+ File realTimePipeLog = new File(this.pipeLogDir, SyncConstant.getPipeLogName(startNumber));
try {
List<PipeData> realTimeData = parseFile(realTimePipeLog);
for (PipeData data : realTimeData)
@@ -158,7 +158,7 @@ public class TsFilePipeLogAnalyzer {
}
public boolean isCollectFinished() {
- return new File(pipeDir, SenderConf.pipeCollectFinishLockName).exists();
+ return new File(pipeDir, SyncConstant.FINISH_COLLECT_LOCK_NAME).exists();
}
public static List<PipeData> parseFile(File file) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogger.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
rename to server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogger.java
index 68a09d2..4169e8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogger.java
@@ -22,9 +22,10 @@ package org.apache.iotdb.db.newsync.sender.recovery;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.pipedata.PipeData;
import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.utils.FileUtils;
@@ -47,8 +48,8 @@ import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
-public class TsFilePipeLog {
- private static final Logger logger = LoggerFactory.getLogger(TsFilePipeLog.class);
+public class TsFilePipeLogger {
+ private static final Logger logger = LoggerFactory.getLogger(TsFilePipeLogger.class);
private final String pipeDir;
private final String tsFileDir;
@@ -63,10 +64,10 @@ public class TsFilePipeLog {
private BufferedWriter removeSerialNumberWriter;
private long currentRemoveLogSize;
- public TsFilePipeLog(TsFilePipe tsFilePipe) {
- pipeDir = SenderConf.getPipeDir(tsFilePipe);
- tsFileDir = new File(pipeDir, SenderConf.tsFileDirName).getPath();
- pipeLogDir = new File(pipeDir, SenderConf.pipeLogDirName).getPath();
+ public TsFilePipeLogger(TsFilePipe tsFilePipe) {
+ pipeDir = SyncPathUtil.getSenderPipeDir(tsFilePipe.getName(), tsFilePipe.getCreateTime());
+ tsFileDir = new File(pipeDir, SyncConstant.FILE_DATA_DIR_NAME).getPath();
+ pipeLogDir = new File(pipeDir, SyncConstant.PIPE_LOG_DIR_NAME).getPath();
}
/** make hard link for tsfile * */
@@ -77,7 +78,7 @@ public class TsFilePipeLog {
File modsHardLink = createHardLink(mods);
if (modsOffset != 0L) {
serializeModsOffset(
- new File(modsHardLink.getPath() + SenderConf.modsOffsetFileSuffix), modsOffset);
+ new File(modsHardLink.getPath() + SyncConstant.MODS_OFFSET_FILE_SUFFIX), modsOffset);
}
} else if (modsOffset != 0L) {
logger.warn(
@@ -147,6 +148,7 @@ public class TsFilePipeLog {
public void addHistoryPipeData(PipeData pipeData) throws IOException {
getHistoryOutputStream();
pipeData.serialize(historyOutputStream);
+ historyOutputStream.flush();
}
private void getHistoryOutputStream() throws IOException {
@@ -157,7 +159,7 @@ public class TsFilePipeLog {
// recover history pipe log
File logDir = new File(pipeLogDir);
logDir.mkdirs();
- File historyPipeLog = new File(pipeLogDir, SenderConf.historyPipeLogName);
+ File historyPipeLog = new File(pipeLogDir, SyncConstant.HISTORY_PIPE_LOG_NAME);
createFile(historyPipeLog);
historyOutputStream = new DataOutputStream(new FileOutputStream(historyPipeLog, true));
}
@@ -165,6 +167,7 @@ public class TsFilePipeLog {
public synchronized void addRealTimePipeData(PipeData pipeData) throws IOException {
getRealTimeOutputStream(pipeData.getSerialNumber());
currentPipeLogSize += pipeData.serialize(realTimeOutputStream);
+ realTimeOutputStream.flush();
}
private void getRealTimeOutputStream(long serialNumber) throws IOException {
@@ -176,8 +179,7 @@ public class TsFilePipeLog {
if (!realTimePipeLogStartNumber.isEmpty()) {
File writingPipeLog =
new File(
- pipeLogDir,
- SenderConf.getRealTimePipeLogName(realTimePipeLogStartNumber.peekLast()));
+ pipeLogDir, SyncConstant.getPipeLogName(realTimePipeLogStartNumber.peekLast()));
realTimeOutputStream = new DataOutputStream(new FileOutputStream(writingPipeLog, true));
currentPipeLogSize = writingPipeLog.length();
} else {
@@ -185,7 +187,7 @@ public class TsFilePipeLog {
}
}
- if (currentPipeLogSize > SenderConf.defaultPipeLogSizeInByte) {
+ if (currentPipeLogSize > SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
moveToNextPipeLog(serialNumber);
}
}
@@ -197,8 +199,8 @@ public class TsFilePipeLog {
logDir.mkdirs();
for (File file : logDir.listFiles())
- if (file.getName().endsWith(SenderConf.realTimePipeLogNameSuffix)) {
- startNumbers.add(SenderConf.getSerialNumberFromPipeLogName(file.getName()));
+ if (file.getName().endsWith(SyncConstant.PIPE_LOG_NAME_SUFFIX)) {
+ startNumbers.add(SyncConstant.getSerialNumberFromPipeLogName(file.getName()));
}
if (startNumbers.size() != 0) {
Collections.sort(startNumbers);
@@ -212,7 +214,7 @@ public class TsFilePipeLog {
if (realTimeOutputStream != null) {
realTimeOutputStream.close();
}
- File newPipeLog = new File(pipeLogDir, SenderConf.getRealTimePipeLogName(startSerialNumber));
+ File newPipeLog = new File(pipeLogDir, SyncConstant.getPipeLogName(startSerialNumber));
createFile(newPipeLog);
realTimeOutputStream = new DataOutputStream(new FileOutputStream(newPipeLog));
@@ -221,9 +223,19 @@ public class TsFilePipeLog {
}
/** remove pipe log data */
- public void removePipeData(long serialNumber) throws IOException {
+ public void removePipeData(PipeData pipeData) throws IOException {
+ long serialNumber = pipeData.getSerialNumber();
serializeRemoveSerialNumber(serialNumber);
+ // delete tsfile
+ if (PipeData.Type.TSFILE.equals(pipeData.getType())) {
+ List<File> tsFiles = ((TsFilePipeData) pipeData).getTsFiles();
+ for (File file : tsFiles) {
+ Files.deleteIfExists(file.toPath());
+ }
+ }
+
+ // delete pipe log
if (serialNumber >= 0) {
if (historyOutputStream != null) {
removeHistoryPipeLog();
@@ -250,7 +262,7 @@ public class TsFilePipeLog {
private void removeHistoryPipeLog() throws IOException {
historyOutputStream.close();
historyOutputStream = null;
- File historyPipeLog = new File(pipeLogDir, SenderConf.historyPipeLogName);
+ File historyPipeLog = new File(pipeLogDir, SyncConstant.HISTORY_PIPE_LOG_NAME);
try {
Files.delete(historyPipeLog.toPath());
} catch (NoSuchFileException e) {
@@ -260,8 +272,7 @@ public class TsFilePipeLog {
}
private void removeRealTimePipeLog(long serialNumber) throws IOException {
- File realTimePipeLog = new File(pipeLogDir, SenderConf.getRealTimePipeLogName(serialNumber));
- removeTsFile(realTimePipeLog);
+ File realTimePipeLog = new File(pipeLogDir, SyncConstant.getPipeLogName(serialNumber));
try {
Files.delete(realTimePipeLog.toPath());
} catch (NoSuchFileException e) {
@@ -270,37 +281,17 @@ public class TsFilePipeLog {
}
}
- private void removeTsFile(File realTimePipeLog) {
- try {
- List<PipeData> pipeData = TsFilePipeLogAnalyzer.parseFile(realTimePipeLog);
- List<File> tsFiles;
- for (PipeData data : pipeData)
- if (PipeData.Type.TSFILE.equals(data.getType())) {
- tsFiles = ((TsFilePipeData) data).getTsFiles();
- for (File file : tsFiles) {
- Files.deleteIfExists(file.toPath());
- }
- }
- } catch (IOException e) {
- logger.warn(
- String.format(
- "Can not parse pipe log %s, the tsfiles in this pipe log will not be deleted, because %s",
- realTimePipeLog.getPath(), e));
- }
- }
-
private void serializeRemoveSerialNumber(long serialNumber) throws IOException {
if (removeSerialNumberWriter == null) {
removeSerialNumberWriter =
- new BufferedWriter(
- new FileWriter(new File(pipeLogDir, SenderConf.removeSerialNumberLogName)));
+ new BufferedWriter(new FileWriter(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME)));
currentRemoveLogSize = 0;
}
removeSerialNumberWriter.write(String.valueOf(serialNumber));
removeSerialNumberWriter.newLine();
removeSerialNumberWriter.flush();
currentRemoveLogSize += Long.BYTES;
- if (currentRemoveLogSize >= SenderConf.defaultPipeLogSizeInByte) {
+ if (currentRemoveLogSize >= SyncConstant.DEFAULT_PIPE_LOG_SIZE_IN_BYTE) {
removeSerialNumberWriter.close();
removeSerialNumberWriter = null;
}
@@ -308,7 +299,7 @@ public class TsFilePipeLog {
public void finishCollect() {
try {
- if (createFile(new File(pipeDir, SenderConf.pipeCollectFinishLockName))) {
+ if (createFile(new File(pipeDir, SyncConstant.FINISH_COLLECT_LOCK_NAME))) {
logger.info(String.format("Create finish collecting Lock file in %s.", pipeDir));
}
} catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
index 2caa321..fec7995 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
@@ -22,7 +22,8 @@ package org.apache.iotdb.db.newsync.sender.service;
import org.apache.iotdb.db.exception.PipeException;
import org.apache.iotdb.db.exception.PipeSinkException;
import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
import org.apache.iotdb.db.newsync.sender.pipe.PipeSink;
import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
@@ -227,7 +228,7 @@ public class SenderService implements IService {
/** IService * */
@Override
public void start() throws StartupException {
- File senderLog = new File(SenderConf.senderLog);
+ File senderLog = new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME);
if (senderLog.exists()) {
try {
recover();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 2a82e6e..4aa910e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -193,7 +193,9 @@ public abstract class Operator {
PRUNE_TEMPLATE,
APPEND_TEMPLATE,
DROP_TEMPLATE,
+
SHOW_QUERY_RESOURCE,
+
CREATE_PIPESINK,
DROP_PIPESINK,
SHOW_PIPESINK,
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
index a6ca08b..97b4996 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.qp.physical.sys;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -74,7 +74,7 @@ public class CreatePipePlan extends PhysicalPlan {
}
public static CreatePipePlan parseString(String parsedString) throws IOException {
- String[] attributes = parsedString.split(SenderConf.planSplitCharacter);
+ String[] attributes = parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
if (attributes.length < 4) {
throw new IOException("Parsing CreatePipePlan error. Attributes is less than expected.");
}
@@ -93,13 +93,17 @@ public class CreatePipePlan extends PhysicalPlan {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append(pipeName).append(SenderConf.planSplitCharacter);
- builder.append(pipeSinkName).append(SenderConf.planSplitCharacter);
- builder.append(dataStartTimestamp).append(SenderConf.planSplitCharacter);
- builder.append(pipeAttributes.size()).append(SenderConf.planSplitCharacter);
+ builder.append(pipeName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ builder.append(dataStartTimestamp).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ builder.append(pipeAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
for (int i = 0; i < pipeAttributes.size(); i++) {
- builder.append(pipeAttributes.get(i).left).append(SenderConf.planSplitCharacter);
- builder.append(pipeAttributes.get(i).right).append(SenderConf.planSplitCharacter);
+ builder
+ .append(pipeAttributes.get(i).left)
+ .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ builder
+ .append(pipeAttributes.get(i).right)
+ .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
}
return builder.toString();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
index ca11f01..5321845 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.qp.physical.sys;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -64,7 +64,7 @@ public class CreatePipeSinkPlan extends PhysicalPlan {
}
public static CreatePipeSinkPlan parseString(String parsedString) throws IOException {
- String[] attributes = parsedString.split(SenderConf.planSplitCharacter);
+ String[] attributes = parsedString.split(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
if (attributes.length < 3) {
throw new IOException("Parsing CreatePipeSinkPlan error. Attributes is less than expected.");
}
@@ -82,12 +82,16 @@ public class CreatePipeSinkPlan extends PhysicalPlan {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append(pipeSinkName).append(SenderConf.planSplitCharacter);
- builder.append(pipeSinkType).append(SenderConf.planSplitCharacter);
- builder.append(pipeSinkAttributes.size()).append(SenderConf.planSplitCharacter);
+ builder.append(pipeSinkName).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ builder.append(pipeSinkType).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ builder.append(pipeSinkAttributes.size()).append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
for (int i = 0; i < pipeSinkAttributes.size(); i++) {
- builder.append(pipeSinkAttributes.get(i).left).append(SenderConf.planSplitCharacter);
- builder.append(pipeSinkAttributes.get(i).right).append(SenderConf.planSplitCharacter);
+ builder
+ .append(pipeSinkAttributes.get(i).left)
+ .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
+ builder
+ .append(pipeSinkAttributes.get(i).right)
+ .append(SyncConstant.PLAN_SERIALIZE_SPLIT_CHARACTER);
}
return builder.toString();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/BufferedPipeDataQueueTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/BufferedPipeDataQueueTest.java
new file mode 100644
index 0000000..33afae8
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/BufferedPipeDataQueueTest.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.conf.SyncConstant;
+import org.apache.iotdb.db.newsync.conf.SyncPathUtil;
+import org.apache.iotdb.db.newsync.pipedata.queue.BufferedPipeDataQueue;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+public class BufferedPipeDataQueueTest {
+ File pipeLogDir =
+ new File(
+ SyncPathUtil.getReceiverPipeLogDir("pipe", "192.168.0.11", System.currentTimeMillis()));
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ if (!pipeLogDir.exists()) {
+ pipeLogDir.mkdirs();
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ FileUtils.deleteDirectory(pipeLogDir);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testRecoveryAndClear() {
+ try {
+ DataOutputStream outputStream =
+ new DataOutputStream(
+ new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+ outputStream.writeLong(1);
+ outputStream.close();
+ // pipelog1: 0~3
+ DataOutputStream pipeLogOutput1 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ for (int i = 0; i < 4; i++) {
+ new TsFilePipeData(null, i).serialize(pipeLogOutput1);
+ }
+ pipeLogOutput1.close();
+ // pipelog2: 4~10
+ DataOutputStream pipeLogOutput2 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ for (int i = 4; i < 11; i++) {
+ new TsFilePipeData(null, i).serialize(pipeLogOutput2);
+ }
+ pipeLogOutput2.close();
+ // pipelog3: 11 without pipedata
+ DataOutputStream pipeLogOutput3 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+ pipeLogOutput3.close();
+ // recovery
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+ pipeDataQueue.clear();
+ Assert.assertFalse(pipeLogDir.exists());
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ /** Try to take data from a new pipe. Expect to wait indefinitely if no data offer. */
+ // TODO: 抛出NPE
+ @Test
+ public void testTake() {
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ List<PipeData> pipeDatas = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ try {
+ pipeDatas.add(pipeDataQueue.take());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+
+ Assert.assertEquals(0, pipeDatas.size());
+ }
+
+ /** Try to take data from a new pipe. Expect to wake after offer. */
+ @Test
+ public void testTakeAndOffer() {
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ List<PipeData> pipeDatas = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ try {
+ pipeDatas.add(pipeDataQueue.take());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ pipeDataQueue.offer(new TsFilePipeData(null, 0));
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(1, pipeDatas.size());
+ pipeDataQueue.clear();
+ }
+
+ /** Try to offer data to a new pipe. */
+ @Test
+ public void testOfferNewPipe() {
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ PipeData pipeData = new TsFilePipeData("fakePath", 1);
+ pipeDataQueue.offer(pipeData);
+ List<PipeData> pipeDatas = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ try {
+ pipeDatas.add(pipeDataQueue.take());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(1, pipeDatas.size());
+ Assert.assertEquals(pipeData, pipeDatas.get(0));
+ pipeDataQueue.clear();
+ }
+
+ /**
+ * Step1: recover pipeDataQueue (with an empty latest pipelog) Step2: offer new pipeData Step3:
+ * check result
+ */
+ @Test
+ public void testOfferAfterRecoveryWithEmptyPipeLog() {
+ try {
+ DataOutputStream outputStream =
+ new DataOutputStream(
+ new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+ outputStream.writeLong(1);
+ outputStream.close();
+ List<PipeData> pipeDataList = new ArrayList<>();
+ // pipelog1: 0~3
+ DataOutputStream pipeLogOutput1 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ for (int i = 0; i < 4; i++) {
+ PipeData pipeData = new TsFilePipeData("fake" + i, i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput1);
+ }
+ pipeLogOutput1.close();
+ // pipelog2: 4~10
+ DataOutputStream pipeLogOutput2 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ for (int i = 4; i < 8; i++) {
+ PipeData pipeData =
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ for (int i = 8; i < 11; i++) {
+ PipeData pipeData =
+ new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake" + i)), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ pipeLogOutput2.close();
+ // pipelog3: 11 without pipedata
+ DataOutputStream pipeLogOutput3 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+ pipeLogOutput3.close();
+ // recovery
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+ PipeData offerPipeData = new TsFilePipeData(null, 11);
+ pipeDataList.add(offerPipeData);
+ pipeDataQueue.offer(offerPipeData);
+
+ // take and check
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ pipeDataTakeList.add(pipeDataQueue.take());
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(10, pipeDataTakeList.size());
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ }
+ pipeDataQueue.clear();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /** Step1: recover pipeDataQueue (without empty latest pipelog) Step2: check result */
+ @Test
+ public void testRecoveryWithEmptyPipeLog() {
+ try {
+ DataOutputStream outputStream =
+ new DataOutputStream(
+ new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+ outputStream.writeLong(1);
+ outputStream.close();
+ List<PipeData> pipeDataList = new ArrayList<>();
+ // pipelog1: 0~3
+ DataOutputStream pipeLogOutput1 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ for (int i = 0; i < 4; i++) {
+ PipeData pipeData = new TsFilePipeData("fake" + i, i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput1);
+ }
+ pipeLogOutput1.close();
+ // pipelog2: 4~10
+ DataOutputStream pipeLogOutput2 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ for (int i = 4; i < 8; i++) {
+ PipeData pipeData =
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ for (int i = 8; i < 11; i++) {
+ PipeData pipeData =
+ new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake" + i)), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ pipeLogOutput2.close();
+ // pipelog3: 11 without pipedata
+ DataOutputStream pipeLogOutput3 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(11)), false));
+ pipeLogOutput3.close();
+ // recovery
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+ // take and check
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ pipeDataTakeList.add(pipeDataQueue.take());
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(9, pipeDataTakeList.size());
+ for (int i = 0; i < 9; i++) {
+ Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ }
+ pipeDataQueue.clear();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ /** Step1: recover pipeDataQueue (without empty latest pipelog) Step2: check result */
+ @Test
+ public void testRecoveryWithoutEmptyPipeLog() {
+ try {
+ DataOutputStream outputStream =
+ new DataOutputStream(
+ new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+ outputStream.writeLong(1);
+ outputStream.close();
+ List<PipeData> pipeDataList = new ArrayList<>();
+ // pipelog1: 0~3
+ DataOutputStream pipeLogOutput1 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ for (int i = 0; i < 4; i++) {
+ PipeData pipeData = new TsFilePipeData("fake" + i, i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput1);
+ }
+ pipeLogOutput1.close();
+ // pipelog2: 4~10
+ DataOutputStream pipeLogOutput2 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ for (int i = 4; i < 8; i++) {
+ PipeData pipeData =
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ for (int i = 8; i < 11; i++) {
+ PipeData pipeData =
+ new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake" + i)), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ pipeLogOutput2.close();
+ ;
+ // recovery
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+ // take and check
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ pipeDataTakeList.add(pipeDataQueue.take());
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ });
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(9, pipeDataTakeList.size());
+ for (int i = 0; i < 9; i++) {
+ Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ }
+ pipeDataQueue.clear();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testOfferWhileTaking() {
+ try {
+ DataOutputStream outputStream =
+ new DataOutputStream(
+ new FileOutputStream(new File(pipeLogDir, SyncConstant.COMMIT_LOG_NAME), true));
+ outputStream.writeLong(1);
+ outputStream.close();
+ List<PipeData> pipeDataList = new ArrayList<>();
+ // pipelog1: 0~3
+ DataOutputStream pipeLogOutput1 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(0)), false));
+ for (int i = 0; i < 4; i++) {
+ PipeData pipeData = new TsFilePipeData("fake" + i, i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput1);
+ }
+ pipeLogOutput1.close();
+ // pipelog2: 4~10
+ DataOutputStream pipeLogOutput2 =
+ new DataOutputStream(
+ new FileOutputStream(
+ new File(pipeLogDir.getPath(), SyncConstant.getPipeLogName(4)), false));
+ for (int i = 4; i < 8; i++) {
+ PipeData pipeData =
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 99), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ for (int i = 8; i < 11; i++) {
+ PipeData pipeData =
+ new SchemaPipeData(new SetStorageGroupPlan(new PartialPath("fake" + i)), i);
+ pipeDataList.add(pipeData);
+ pipeData.serialize(pipeLogOutput2);
+ }
+ pipeLogOutput2.close();
+ ;
+ // recovery
+ BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(pipeLogDir.getPath());
+ Assert.assertEquals(1, pipeDataQueue.getCommitSerialNumber());
+ Assert.assertEquals(10, pipeDataQueue.getLastMaxSerialNumber());
+
+ // take
+ List<PipeData> pipeDataTakeList = new ArrayList<>();
+ ExecutorService es1 = Executors.newSingleThreadExecutor();
+ es1.execute(
+ () -> {
+ while (true) {
+ try {
+ pipeDataTakeList.add(pipeDataQueue.take());
+ pipeDataQueue.commit();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ });
+ // offer
+ for (int i = 11; i < 20; i++) {
+ pipeDataQueue.offer(
+ new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0, 0), i));
+ }
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ es1.shutdownNow();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertEquals(18, pipeDataTakeList.size());
+ for (int i = 0; i < 9; i++) {
+ Assert.assertEquals(pipeDataList.get(i + 2), pipeDataTakeList.get(i));
+ }
+ pipeDataQueue.clear();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
index ed9d79f..ce65549 100644
--- a/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/newsync/pipedata/PipeDataTest.java
@@ -74,6 +74,10 @@ public class PipeDataTest {
Assert.assertEquals(pipeData3, PipeData.deserialize(inputStream));
inputStream.close();
outputStream.close();
+
+ Assert.assertEquals(pipeData1, PipeData.deserialize(pipeData1.serialize()));
+ Assert.assertEquals(pipeData2, PipeData.deserialize(pipeData2.serialize()));
+ Assert.assertEquals(pipeData3, PipeData.deserialize(pipeData3.serialize()));
} catch (Exception e) {
logger.error(e.getMessage());
Assert.fail();