You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/01/26 01:54:02 UTC
[iotdb] branch new_sync updated: [To new_sync][IOTDB-1907] implement customized sync process: sender 2 (#4974)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_sync by this push:
new 442e6fa [To new_sync][IOTDB-1907] implement customized sync process: sender 2 (#4974)
442e6fa is described below
commit 442e6fafbb5d6a752719b8e76975c338d5f3e47a
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Wed Jan 26 09:53:21 2022 +0800
[To new_sync][IOTDB-1907] implement customized sync process: sender 2 (#4974)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4 | 8 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 +-
.../db/integration/sync/IoTDBSyncSenderIT.java | 35 ++--
.../iotdb/db/engine/modification/Deletion.java | 43 ++---
.../db/newsync/pipedata/DeletionPipeData.java | 82 +++++++++
.../apache/iotdb/db/newsync/pipedata/PipeData.java | 97 ++++++++++
.../iotdb/db/newsync/pipedata/SchemaPipeData.java | 83 +++++++++
.../iotdb/db/newsync/pipedata/TsFilePipeData.java | 128 +++++++++++++
.../iotdb/db/newsync/sender/pipe/PipeSink.java | 3 +-
.../iotdb/db/newsync/sender/pipe/TsFilePipe.java | 135 ++++++++------
.../db/newsync/sender/pipe/TsFilePipeData.java | 198 ---------------------
.../newsync/sender/recovery/SenderLogAnalyzer.java | 3 +-
.../db/newsync/sender/recovery/TsFilePipeLog.java | 62 ++++---
.../sender/recovery/TsFilePipeLogAnalyzer.java | 32 ++--
.../db/newsync/sender/service/SenderService.java | 20 +--
.../org/apache/iotdb/db/qp/logical/Operator.java | 2 +
16 files changed, 578 insertions(+), 359 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
index f406677..9fad63a 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
@@ -362,6 +362,10 @@ PIPE
: P I P E
;
+PIPES
+ : P I P E S
+ ;
+
PIPESERVER
: P I P E S E R V E R
;
@@ -370,6 +374,10 @@ PIPESINK
: P I P E S I N K
;
+PIPESINKS
+ : P I P E S I N K S
+ ;
+
PIPESINKTYPE
: P I P E S I N K T Y P E
;
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 24dbc79..7e4cd62 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -666,7 +666,7 @@ unloadFile
// pipesink statement
createPipeSink
- : CREATE PIPESINK pipeSinkType=ID (LR_BRACKET syncAttributeClauses RR_BRACKET)? AS pipeSinkName=ID
+ : CREATE PIPESINK pipeSinkName=ID AS pipeSinkType=ID (LR_BRACKET syncAttributeClauses RR_BRACKET)?
;
showPipeSinkType
@@ -674,7 +674,7 @@ showPipeSinkType
;
showPipeSink
- : SHOW PIPESINK (pipeSinkName=ID)?
+ : SHOW ((PIPESINK (pipeSinkName=ID)?) | PIPESINKS)
;
dropPipeSink
@@ -687,7 +687,7 @@ createPipe
;
showPipe
- : SHOW PIPE (pipeName=ID)?
+ : SHOW ((PIPE (pipeName=ID)?) | PIPES)
;
stopPipe
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
copy to integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
index 4b58c78..9a36b9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
@@ -15,32 +15,23 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.iotdb.db.newsync.sender.pipe;
-
-import org.apache.iotdb.db.exception.PipeSinkException;
-
-public interface PipeSink {
- void setAttribute(String attr, String value) throws PipeSinkException;
+package org.apache.iotdb.db.integration.sync;
- String getName();
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
- Type getType();
+import org.junit.Before;
+import org.junit.experimental.categories.Category;
- String showAllAttributes();
-
- enum Type {
- IoTDB;
- }
+import java.io.IOException;
- class PipeSinkFactory {
- public static PipeSink createPipeSink(String type, String name) {
- type = type.toLowerCase();
- if (Type.IoTDB.name().toLowerCase().equals(type)) {
- return new IoTDBPipeSink(name);
- }
- throw new UnsupportedOperationException("Not support for " + type + " pipeSink");
- }
+@Category({LocalStandaloneTest.class})
+public class IoTDBSyncSenderIT {
+ @Before
+ public void setUp() throws StorageEngineException, IOException {
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.envSetUp();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
index 10924ab..edb8d4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import java.nio.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.Objects;
/** Deletion is a delete operation on a timeseries. */
@@ -75,33 +77,22 @@ public class Deletion extends Modification {
this.endTime = timestamp;
}
- public void serializeWithoutFileOffset(ByteBuffer byteBuffer) {
- byteBuffer.mark();
- try {
- byteBuffer.putLong(startTime);
- byteBuffer.putLong(endTime);
- ReadWriteIOUtils.write(getPathString(), byteBuffer);
- } catch (Exception e) {
- byteBuffer.reset();
- throw e;
- }
+ public long serializeWithoutFileOffset(DataOutputStream stream) throws IOException {
+ long serializeSize = 0;
+ stream.writeLong(startTime);
+ serializeSize += Long.BYTES;
+ stream.writeLong(endTime);
+ serializeSize += Long.BYTES;
+ serializeSize += ReadWriteIOUtils.write(getPathString(), stream);
+ return serializeSize;
}
- public static Deletion deserializeWithoutFileOffset(ByteBuffer byteBuffer)
- throws IllegalPathException {
- byteBuffer.mark();
- Deletion deletion;
- try {
- long startTime = byteBuffer.getLong();
- long endTime = byteBuffer.getLong();
- deletion =
- new Deletion(
- new PartialPath(ReadWriteIOUtils.readString(byteBuffer)), 0, startTime, endTime);
- } catch (Exception e) {
- byteBuffer.reset();
- throw e;
- }
- return deletion;
+ public static Deletion deserializeWithoutFileOffset(DataInputStream stream)
+ throws IOException, IllegalPathException {
+ long startTime = stream.readLong();
+ long endTime = stream.readLong();
+ return new Deletion(
+ new PartialPath(ReadWriteIOUtils.readString(stream)), 0, startTime, endTime);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
new file mode 100644
index 0000000..99d5638
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/DeletionPipeData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class DeletionPipeData extends PipeData {
+ private static final Logger logger = LoggerFactory.getLogger(DeletionPipeData.class);
+
+ private Deletion deletion;
+
+ public DeletionPipeData(long serialNumber) {
+ super(serialNumber);
+ }
+
+ public DeletionPipeData(Deletion deletion, long serialNumber) {
+ super(serialNumber);
+ this.deletion = deletion;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.DELETION;
+ }
+
+ @Override
+ public long serializeImpl(DataOutputStream stream) throws IOException {
+ return deletion.serializeWithoutFileOffset(stream);
+ }
+
+ @Override
+ public void deserializeImpl(DataInputStream stream) throws IOException, IllegalPathException {
+ this.deletion = Deletion.deserializeWithoutFileOffset(stream);
+ }
+
+ @Override
+ public void sendToTransport() {
+ ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+ DataOutputStream stream = new DataOutputStream(bytesStream);
+ try {
+ serialize(stream);
+ // senderTransport(bytesStream.toArray, this);
+ System.out.println(this);
+ } catch (IOException e) {
+ logger.warn(
+ String.format(
+ "Serialize deletion pipeData %s error, can not send to transport, because %s.",
+ this, e));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DeletionData{" + "deletion=" + deletion + ", serialNumber=" + serialNumber + '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
new file mode 100644
index 0000000..0575d5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/PipeData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public abstract class PipeData {
+
+ protected final long serialNumber;
+
+ public PipeData(long serialNumber) {
+ this.serialNumber = serialNumber;
+ }
+
+ public long getSerialNumber() {
+ return serialNumber;
+ }
+
+ // abstract public Loader.Type getLoaderType() {
+ // if (tsFilePath != null) {
+ // return Loader.Type.TsFile;
+ // } else if (deletion != null) {
+ // return Loader.Type.Deletion;
+ // } else if (plan != null) {
+ // return Loader.Type.PhysicalPlan;
+ // }
+ // logger.error("Wrong type for transport type.");
+ // return null;
+ // }
+
+ public abstract Type getType();
+
+ public long serialize(DataOutputStream stream) throws IOException {
+ long serializeSize = 0;
+ stream.writeLong(serialNumber);
+ serializeSize += Long.BYTES;
+ stream.writeByte((byte) getType().ordinal());
+ serializeSize += Byte.BYTES;
+ serializeSize += serializeImpl(stream);
+ return serializeSize;
+ }
+
+ public abstract long serializeImpl(DataOutputStream stream) throws IOException;
+
+ public static PipeData deserialize(DataInputStream stream)
+ throws IOException, IllegalPathException {
+ long serialNumber = stream.readLong();
+ Type type = Type.values()[stream.readByte()];
+
+ PipeData pipeData = null;
+ switch (type) {
+ case TSFILE:
+ pipeData = new TsFilePipeData(serialNumber);
+ break;
+ case DELETION:
+ pipeData = new DeletionPipeData(serialNumber);
+ break;
+ case PHYSICALPLAN:
+ pipeData = new SchemaPipeData(serialNumber);
+ break;
+ }
+ pipeData.deserializeImpl(stream);
+ return pipeData;
+ }
+
+ public abstract void deserializeImpl(DataInputStream stream)
+ throws IOException, IllegalPathException;
+
+ public abstract void sendToTransport();
+
+ public enum Type {
+ TSFILE,
+ DELETION,
+ PHYSICALPLAN
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
new file mode 100644
index 0000000..28f068a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/SchemaPipeData.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SchemaPipeData extends PipeData {
+ private static final int SERIALIZE_BUFFER_SIZE = 1024;
+
+ private PhysicalPlan plan;
+
+ public SchemaPipeData(long serialNumber) {
+ super(serialNumber);
+ }
+
+ public SchemaPipeData(PhysicalPlan plan, long serialNumber) {
+ super(serialNumber);
+ this.plan = plan;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.PHYSICALPLAN;
+ }
+
+ @Override
+ public long serializeImpl(DataOutputStream stream) throws IOException {
+ byte[] bytes = getBytes();
+ stream.writeInt(bytes.length);
+ stream.write(bytes);
+ return Integer.BYTES + bytes.length;
+ }
+
+ private byte[] getBytes() {
+ ByteBuffer buffer = ByteBuffer.allocate(SERIALIZE_BUFFER_SIZE);
+ plan.serialize(buffer);
+ byte[] bytes = new byte[buffer.position()];
+ buffer.flip();
+ buffer.get(bytes);
+ return bytes;
+ }
+
+ @Override
+ public void deserializeImpl(DataInputStream stream) throws IOException, IllegalPathException {
+ byte[] bytes = new byte[stream.readInt()];
+ stream.read(bytes);
+ this.plan = PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes));
+ }
+
+ @Override
+ public void sendToTransport() {
+ // senderTransport(getBytes(), this);
+ System.out.println(this);
+ }
+
+ @Override
+ public String toString() {
+ return "SchemaPipeData{" + "serialNumber=" + serialNumber + ", plan=" + plan + '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
new file mode 100644
index 0000000..503486a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/pipedata/TsFilePipeData.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.iotdb.db.newsync.pipedata;
+
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TsFilePipeData extends PipeData {
+ private static final Logger logger = LoggerFactory.getLogger(TsFilePipeData.class);
+
+ private String tsFilePath;
+
+ public TsFilePipeData(long serialNumber) {
+ super(serialNumber);
+ }
+
+ public TsFilePipeData(String tsFilePath, long serialNumber) {
+ super(serialNumber);
+ this.tsFilePath = tsFilePath;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.TSFILE;
+ }
+
+ @Override
+ public long serializeImpl(DataOutputStream stream) throws IOException {
+ return ReadWriteIOUtils.write(tsFilePath, stream);
+ }
+
+ @Override
+ public void deserializeImpl(DataInputStream stream) throws IOException {
+ this.tsFilePath = ReadWriteIOUtils.readString(stream);
+ }
+
+ @Override
+ public void sendToTransport() {
+ if (waitForTsFileClose()) {
+ // senderTransprot(getFiles(), this);
+ System.out.println(this);
+ }
+ }
+
+ private boolean waitForTsFileClose() {
+ for (int i = 0; i < SenderConf.defaultWaitingForTsFileRetryNumber; i++) {
+ if (isTsFileClosed()) {
+ return true;
+ }
+ try {
+ Thread.sleep(SenderConf.defaultWaitingForTsFileCloseMilliseconds);
+ } catch (InterruptedException e) {
+ logger.warn(String.format("Be Interrupted when waiting for tsfile %s closed", tsFilePath));
+ }
+ logger.info(
+ String.format(
+ "Waiting for tsfile %s close, retry %d / %d.",
+ tsFilePath, (i + 1), SenderConf.defaultWaitingForTsFileRetryNumber));
+ }
+ return false;
+ }
+
+ private boolean isTsFileClosed() {
+ File tsFile = new File(tsFilePath).getAbsoluteFile();
+ File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ return resource.exists();
+ }
+
+ public List<File> getTsFiles() throws FileNotFoundException {
+ File tsFile = new File(tsFilePath).getAbsoluteFile();
+ File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ File mods = new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
+
+ List<File> files = new ArrayList<>();
+ if (!tsFile.exists()) {
+ throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath()));
+ }
+ files.add(tsFile);
+ if (resource.exists()) {
+ files.add(resource);
+ }
+ if (mods.exists()) {
+ files.add(mods);
+ }
+ return files;
+ }
+
+ @Override
+ public String toString() {
+ return "TsFilePipeData{"
+ + "serialNumber="
+ + serialNumber
+ + ", tsFilePath='"
+ + tsFilePath
+ + '\''
+ + '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
index 4b58c78..50383c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/PipeSink.java
@@ -40,7 +40,8 @@ public interface PipeSink {
if (Type.IoTDB.name().toLowerCase().equals(type)) {
return new IoTDBPipeSink(name);
}
- throw new UnsupportedOperationException("Not support for " + type + " pipeSink");
+ throw new UnsupportedOperationException(
+ String.format("Do not support pipeSink type %s", type));
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
index d0929068..99e6d44 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipe.java
@@ -27,6 +27,10 @@ import org.apache.iotdb.db.engine.storagegroup.virtualSg.StorageGroupManager;
import org.apache.iotdb.db.exception.PipeException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.newsync.pipedata.DeletionPipeData;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.SchemaPipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLog;
import org.apache.iotdb.db.newsync.sender.recovery.TsFilePipeLogAnalyzer;
@@ -55,22 +59,22 @@ public class TsFilePipe implements Pipe {
private final long createTime;
private final String name;
- private final IoTDBPipeSink pipeSink;
+ private final PipeSink pipeSink;
private final long dataStartTime;
private final boolean syncDelOp;
- private ExecutorService singleExecutorService;
- private TsFilePipeLog pipeLog;
+ private final ExecutorService singleExecutorService;
+ private final TsFilePipeLog pipeLog;
private final ReentrantLock collectRealTimeDataLock;
- private BlockingDeque<TsFilePipeData> pipeData;
+ private BlockingDeque<PipeData> pipeDataDeque;
private long maxSerialNumber;
private PipeStatus status;
private boolean isCollectingRealTimeData;
public TsFilePipe(
- long createTime, String name, IoTDBPipeSink pipeSink, long dataStartTime, boolean syncDelOp) {
+ long createTime, String name, PipeSink pipeSink, long dataStartTime, boolean syncDelOp) {
this.createTime = createTime;
this.name = name;
this.pipeSink = pipeSink;
@@ -91,10 +95,10 @@ public class TsFilePipe implements Pipe {
}
private void recover() {
- this.pipeData = new TsFilePipeLogAnalyzer(this).recover();
+ this.pipeDataDeque = new TsFilePipeLogAnalyzer(this).recover();
this.maxSerialNumber = 0L;
- if (pipeData.size() != 0) {
- this.maxSerialNumber = Math.max(maxSerialNumber, pipeData.getLast().getSerialNumber());
+ if (pipeDataDeque.size() != 0) {
+ this.maxSerialNumber = Math.max(maxSerialNumber, pipeDataDeque.getLast().getSerialNumber());
}
}
@@ -120,6 +124,7 @@ public class TsFilePipe implements Pipe {
}
singleExecutorService.submit(this::transport);
+ //
status = PipeStatus.RUNNING;
} catch (IOException e) {
logger.error(
@@ -138,13 +143,13 @@ public class TsFilePipe implements Pipe {
// get all history data
int historyMetadataSize = historyMetadata.size();
int historyTsFilesSize = historyTsFiles.size();
- List<TsFilePipeData> historyData = new ArrayList<>();
+ List<PipeData> historyData = new ArrayList<>();
for (int i = 0; i < historyMetadataSize; i++) {
- long serialNumber = 1 - historyTsFilesSize - i;
- historyData.add(new TsFilePipeData(historyMetadata.get(i), serialNumber));
+ long serialNumber = 1 - historyTsFilesSize - historyMetadataSize + i;
+ historyData.add(new SchemaPipeData(historyMetadata.get(i), serialNumber));
}
for (int i = 0; i < historyTsFilesSize; i++) {
- long serialNumber = 1 - i;
+ long serialNumber = 1 - historyTsFilesSize + i;
try {
File hardLink =
pipeLog.addHistoryTsFile(historyTsFiles.get(i).left, historyTsFiles.get(i).right);
@@ -160,11 +165,11 @@ public class TsFilePipe implements Pipe {
// add history data into blocking deque
int historyDataSize = historyData.size();
for (int i = 0; i < historyDataSize; i++) {
- pipeData.addFirst(historyData.get(historyDataSize - 1 - i));
+ pipeDataDeque.addFirst(historyData.get(historyDataSize - 1 - i));
}
// record history data
for (int i = 0; i < historyDataSize; i++) {
- TsFilePipeData data = historyData.get(i);
+ PipeData data = historyData.get(i);
try {
pipeLog.addHistoryPipeData(data);
} catch (IOException e) {
@@ -206,8 +211,8 @@ public class TsFilePipe implements Pipe {
collectRealTimeDataLock.lock();
try {
maxSerialNumber += 1L;
- TsFilePipeData metaData = new TsFilePipeData(plan, maxSerialNumber);
- pipeData.offer(metaData); // ensure can be transport
+ PipeData metaData = new SchemaPipeData(plan, maxSerialNumber);
+ collectRealTimePipeData(metaData); // ensure can be transport
pipeLog.addRealTimePipeData(metaData);
} catch (IOException e) {
logger.warn(
@@ -264,9 +269,9 @@ public class TsFilePipe implements Pipe {
deletion.getStartTime(),
deletion.getEndTime());
maxSerialNumber += 1L;
- TsFilePipeData deletionData = new TsFilePipeData(splitDeletion, maxSerialNumber);
+ PipeData deletionData = new DeletionPipeData(splitDeletion, maxSerialNumber);
pipeLog.addRealTimePipeData(deletionData);
- pipeData.offer(deletionData);
+ collectRealTimePipeData(deletionData);
}
} catch (MetadataException e) {
logger.warn(String.format("Collect deletion %s error, because %s.", deletion, e));
@@ -284,10 +289,10 @@ public class TsFilePipe implements Pipe {
collectRealTimeDataLock.lock();
try {
maxSerialNumber += 1L;
- TsFilePipeData tsFileData =
+ PipeData tsFileData =
new TsFilePipeData(pipeLog.addRealTimeTsFile(tsFile).getPath(), maxSerialNumber);
pipeLog.addRealTimePipeData(tsFileData);
- pipeData.offer(tsFileData);
+ collectRealTimePipeData(tsFileData);
} catch (IOException e) {
logger.warn(
String.format(
@@ -308,6 +313,13 @@ public class TsFilePipe implements Pipe {
}
}
+ private void collectRealTimePipeData(PipeData data) {
+ pipeDataDeque.offer(data);
+ synchronized (pipeDataDeque) {
+ pipeDataDeque.notifyAll();
+ }
+ }
+
/** transport data * */
private void transport() {
// handshake
@@ -318,14 +330,14 @@ public class TsFilePipe implements Pipe {
break;
}
- TsFilePipeData data;
+ PipeData data;
try {
- synchronized (pipeData) {
- if (pipeData.isEmpty()) {
- pipeData.wait();
- pipeData.notifyAll();
+ synchronized (pipeDataDeque) {
+ if (pipeDataDeque.isEmpty()) {
+ pipeDataDeque.wait();
+ pipeDataDeque.notifyAll();
}
- data = pipeData.poll();
+ data = pipeDataDeque.poll();
}
} catch (InterruptedException e) {
logger.warn(String.format("TsFile pipe %s has been interrupted.", name));
@@ -335,38 +347,51 @@ public class TsFilePipe implements Pipe {
if (data == null) {
continue;
}
- if (data.isTsFile()) {
- if (waitForTsFileClose(data)) {
- // senderTransport(data.getTsFiles, data.getLoaderType());
- }
- } else {
- // senderTransport(data.getBytes, data.getLoaderType());
- }
- // pipeLog.removePipeData(data.getSerialNumber);
+ // data.sendToTransport();
+ // pipeLog.removePipeData(data.getSerialNumber);
+ // data.sendToTransport();
+ // Thread.sleep(1000);
+ // pipeLog.removePipeData(data.getSerialNumber());
}
} catch (Exception e) {
logger.error(String.format("TsFile pipe %s stops transportng data, because %s.", name, e));
}
}
- private boolean waitForTsFileClose(TsFilePipeData data) {
- for (int i = 0; i < SenderConf.defaultWaitingForTsFileRetryNumber; i++) {
- if (data.isTsFileClosed()) {
- return true;
+ public List<PipeData> pull(long serialNumber) {
+ if (pipeDataDeque.isEmpty()) {
+ return null;
+ }
+ List<PipeData> pullPipeData = new ArrayList<>();
+ PipeData data = pipeDataDeque.poll();
+ while (data.getSerialNumber() <= serialNumber) {
+ pullPipeData.add(data);
+ if (pipeDataDeque.isEmpty()) {
+ break;
+ } else {
+ data = pipeDataDeque.poll();
}
+ }
+
+ int pullPipeDataSize = pullPipeData.size();
+ for (int i = 0; i < pullPipeDataSize; i++) {
+ pipeDataDeque.addFirst(pullPipeData.get(pullPipeDataSize - i - 1));
+ }
+ return pullPipeData;
+ }
+
+ public void commit(long serialNumber) {
+ while (!pipeDataDeque.isEmpty() && pipeDataDeque.peek().getSerialNumber() <= serialNumber) {
+ PipeData data = pipeDataDeque.poll();
try {
- Thread.sleep(SenderConf.defaultWaitingForTsFileCloseMilliseconds);
- } catch (InterruptedException e) {
+ pipeLog.removePipeData(pipeDataDeque.poll().getSerialNumber());
+ } catch (IOException e) {
logger.warn(
String.format(
- "Be Interrupted when waiting for tsfile %s closed", data.getTsFilePath()));
+ "Commit pipe data %s error, serial number is %s, because %s",
+ data, data.getSerialNumber(), e));
}
- logger.info(
- String.format(
- "Waiting for tsfile %s close, retry %d / %d.",
- data.getTsFilePath(), (i + 1), SenderConf.defaultWaitingForTsFileRetryNumber));
}
- return false;
}
@Override
@@ -375,13 +400,15 @@ public class TsFilePipe implements Pipe {
throw new PipeException(
String.format("Can not stop pipe %s, because the pipe is drop.", name));
}
- if (status == PipeStatus.STOP) {
- return;
- }
+ if (!isCollectingRealTimeData) {
+ registerMetadata();
+ registerTsFile();
+ isCollectingRealTimeData = true;
+ }
status = PipeStatus.STOP;
- synchronized (pipeData) {
- pipeData.notifyAll();
+ synchronized (pipeDataDeque) {
+ pipeDataDeque.notifyAll();
}
}
@@ -392,8 +419,8 @@ public class TsFilePipe implements Pipe {
}
status = PipeStatus.DROP;
- synchronized (pipeData) {
- pipeData.notifyAll();
+ synchronized (pipeDataDeque) {
+ pipeDataDeque.notifyAll();
}
clear();
}
@@ -418,7 +445,7 @@ public class TsFilePipe implements Pipe {
logger.warn(String.format("Clear pipe %s %d error, because %s.", name, createTime, e));
}
- pipeData = null;
+ pipeDataDeque = null;
isCollectingRealTimeData = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipeData.java
deleted file mode 100644
index 52e19a3..0000000
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/pipe/TsFilePipeData.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.newsync.sender.pipe;
-
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class TsFilePipeData {
- private static final Logger logger = LoggerFactory.getLogger(TsFilePipeData.class);
- private static final int SERIALIZE_BUFFER_SIZE = 256;
-
- private final long serialNumber;
- private String tsFilePath;
- private Deletion deletion;
- private PhysicalPlan plan;
-
- public TsFilePipeData(String tsFilePath, long serialNumber) {
- this.tsFilePath = tsFilePath;
- this.serialNumber = serialNumber;
- }
-
- public TsFilePipeData(Deletion deletion, long serialNumber) {
- this.deletion = deletion;
- this.serialNumber = serialNumber;
- }
-
- public TsFilePipeData(PhysicalPlan plan, long serialNumber) {
- this.plan = plan;
- this.serialNumber = serialNumber;
- }
-
- public boolean isTsFile() {
- return tsFilePath != null;
- }
-
- public boolean isTsFileClosed() {
- File tsFile = new File(tsFilePath).getAbsoluteFile();
- File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
- return resource.exists();
- }
-
- public String getTsFilePath() {
- return tsFilePath == null ? "null" : tsFilePath;
- }
-
- public long getSerialNumber() {
- return serialNumber;
- }
-
- // public Loader.Type getLoaderType() {
- // if (tsFilePath != null) {
- // return Loader.Type.TsFile;
- // } else if (deletion != null) {
- // return Loader.Type.Deletion;
- // } else if (plan != null) {
- // return Loader.Type.PhysicalPlan;
- // }
- // logger.error("Wrong type for transport type.");
- // return null;
- // }
-
- public List<File> getTsFiles() throws FileNotFoundException {
- File tsFile = new File(tsFilePath).getAbsoluteFile();
- File resource = new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
- File mods = new File(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
-
- List<File> files = new ArrayList<>();
- if (!tsFile.exists()) {
- throw new FileNotFoundException(String.format("Can not find %s.", tsFile.getAbsolutePath()));
- }
- files.add(tsFile);
- if (resource.exists()) {
- files.add(resource);
- }
- if (mods.exists()) {
- files.add(mods);
- }
- return files;
- }
-
- public byte[] getBytes() {
- ByteBuffer buffer = ByteBuffer.allocate(SERIALIZE_BUFFER_SIZE);
- if (deletion != null) {
- deletion.serializeWithoutFileOffset(buffer);
- } else if (plan != null) {
- plan.serialize(buffer);
- } else if (tsFilePath != null) {
- ReadWriteIOUtils.write(tsFilePath, buffer);
- } else {
- logger.error(String.format("Null tsfile pipe data with serialNumber %d.", serialNumber));
- }
-
- byte[] bytes = new byte[buffer.position()];
- buffer.flip();
- buffer.get(bytes);
- return bytes;
- }
-
- public long serialize(DataOutputStream stream) throws IOException {
- long serializeSize = 0;
-
- stream.writeLong(serialNumber);
- serializeSize += Long.BYTES;
- if (tsFilePath != null) {
- stream.writeByte((byte) Type.TSFILE.ordinal());
- } else if (deletion != null) {
- stream.writeByte((byte) Type.DELETION.ordinal());
- } else if (plan != null) {
- stream.writeByte((byte) Type.PHYSICALPLAN.ordinal());
- } else {
- logger.error(String.format("Null tsfile pipe data with serialNumber %d.", serialNumber));
- }
- serializeSize += Byte.BYTES;
-
- byte[] bytes = getBytes();
- stream.writeInt(bytes.length);
- stream.write(bytes);
- serializeSize += Integer.BYTES;
- serializeSize += bytes.length;
- return serializeSize;
- }
-
- public static TsFilePipeData deserialize(DataInputStream stream)
- throws IOException, IllegalPathException {
- long serialNumber = stream.readLong();
- Type type = Type.values()[stream.readByte()];
- byte[] bytes = new byte[stream.readInt()];
- stream.read(bytes);
-
- TsFilePipeData pipeData = null;
- if (type.equals(Type.TSFILE)) {
- pipeData = new TsFilePipeData(ReadWriteIOUtils.readString(stream), serialNumber);
- } else if (type.equals(Type.DELETION)) {
- pipeData =
- new TsFilePipeData(
- Deletion.deserializeWithoutFileOffset(ByteBuffer.wrap(bytes)), serialNumber);
- } else if (type.equals(Type.PHYSICALPLAN)) {
- pipeData =
- new TsFilePipeData(PhysicalPlan.Factory.create(ByteBuffer.wrap(bytes)), serialNumber);
- }
-
- return pipeData;
- }
-
- public enum Type {
- TSFILE,
- DELETION,
- PHYSICALPLAN
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder("TsFilePipeData{");
- builder.append("serialNumber=" + serialNumber);
- if (tsFilePath != null) {
- builder.append(", tsfile=" + tsFilePath);
- } else if (deletion != null) {
- builder.append(", deletion=" + deletion);
- } else if (plan != null) {
- builder.append(", physicalplan=" + plan);
- }
- builder.append("}");
- return builder.toString();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
index 9e8582c..8f2364d 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/SenderLogAnalyzer.java
@@ -113,8 +113,7 @@ public class SenderLogAnalyzer {
}
} catch (Exception e) {
throw new IOException(
- String.format(
- "Recover error in line %d : %s, because %s", lineNumber, readLine, e.getMessage()));
+ String.format("Recover error in line %d : %s, because %s", lineNumber, readLine, e));
}
if (pipes.size() > 0) {
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
index 60fc40d..68a09d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLog.java
@@ -22,9 +22,10 @@ package org.apache.iotdb.db.newsync.sender.recovery;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipeData;
import org.apache.iotdb.db.utils.FileUtils;
import org.slf4j.Logger;
@@ -143,7 +144,7 @@ public class TsFilePipeLog {
}
/** add pipe log data */
- public void addHistoryPipeData(TsFilePipeData pipeData) throws IOException {
+ public void addHistoryPipeData(PipeData pipeData) throws IOException {
getHistoryOutputStream();
pipeData.serialize(historyOutputStream);
}
@@ -158,10 +159,10 @@ public class TsFilePipeLog {
logDir.mkdirs();
File historyPipeLog = new File(pipeLogDir, SenderConf.historyPipeLogName);
createFile(historyPipeLog);
- historyOutputStream = new DataOutputStream(new FileOutputStream(historyPipeLog));
+ historyOutputStream = new DataOutputStream(new FileOutputStream(historyPipeLog, true));
}
- public synchronized void addRealTimePipeData(TsFilePipeData pipeData) throws IOException {
+ public synchronized void addRealTimePipeData(PipeData pipeData) throws IOException {
getRealTimeOutputStream(pipeData.getSerialNumber());
currentPipeLogSize += pipeData.serialize(realTimeOutputStream);
}
@@ -169,25 +170,15 @@ public class TsFilePipeLog {
private void getRealTimeOutputStream(long serialNumber) throws IOException {
if (realTimeOutputStream == null) {
// recover real time pipe log
- realTimePipeLogStartNumber = new LinkedBlockingDeque<>();
- File logDir = new File(pipeLogDir);
- List<Long> startNumbers = new ArrayList<>();
-
- logDir.mkdirs();
- for (File file : logDir.listFiles())
- if (file.getName().endsWith(SenderConf.realTimePipeLogNameSuffix)) {
- startNumbers.add(SenderConf.getSerialNumberFromPipeLogName(file.getName()));
- }
- if (startNumbers.size() != 0) {
- Collections.sort(startNumbers);
- for (Long startTime : startNumbers) {
- realTimePipeLogStartNumber.offer(startTime);
- }
+ if (realTimePipeLogStartNumber == null) {
+ recoverRealTimePipeLogStartNumber();
+ }
+ if (!realTimePipeLogStartNumber.isEmpty()) {
File writingPipeLog =
new File(
pipeLogDir,
- SenderConf.getRealTimePipeLogName(startNumbers.get(startNumbers.size() - 1)));
- realTimeOutputStream = new DataOutputStream(new FileOutputStream(writingPipeLog));
+ SenderConf.getRealTimePipeLogName(realTimePipeLogStartNumber.peekLast()));
+ realTimeOutputStream = new DataOutputStream(new FileOutputStream(writingPipeLog, true));
currentPipeLogSize = writingPipeLog.length();
} else {
moveToNextPipeLog(serialNumber);
@@ -199,6 +190,24 @@ public class TsFilePipeLog {
}
}
+ private void recoverRealTimePipeLogStartNumber() {
+ realTimePipeLogStartNumber = new LinkedBlockingDeque<>();
+ File logDir = new File(pipeLogDir);
+ List<Long> startNumbers = new ArrayList<>();
+
+ logDir.mkdirs();
+ for (File file : logDir.listFiles())
+ if (file.getName().endsWith(SenderConf.realTimePipeLogNameSuffix)) {
+ startNumbers.add(SenderConf.getSerialNumberFromPipeLogName(file.getName()));
+ }
+ if (startNumbers.size() != 0) {
+ Collections.sort(startNumbers);
+ for (Long startTime : startNumbers) {
+ realTimePipeLogStartNumber.offer(startTime);
+ }
+ }
+ }
+
private void moveToNextPipeLog(long startSerialNumber) throws IOException {
if (realTimeOutputStream != null) {
realTimeOutputStream.close();
@@ -219,7 +228,10 @@ public class TsFilePipeLog {
if (historyOutputStream != null) {
removeHistoryPipeLog();
}
- if (realTimePipeLogStartNumber.size() < 2) {
+ if (realTimePipeLogStartNumber == null) {
+ recoverRealTimePipeLogStartNumber();
+ }
+ if (realTimePipeLogStartNumber.size() >= 2) {
long pipeLogStartNumber;
while (true) {
pipeLogStartNumber = realTimePipeLogStartNumber.poll();
@@ -260,11 +272,11 @@ public class TsFilePipeLog {
private void removeTsFile(File realTimePipeLog) {
try {
- List<TsFilePipeData> pipeData = TsFilePipeLogAnalyzer.parseFile(realTimePipeLog);
+ List<PipeData> pipeData = TsFilePipeLogAnalyzer.parseFile(realTimePipeLog);
List<File> tsFiles;
- for (TsFilePipeData data : pipeData)
- if (data.isTsFile()) {
- tsFiles = data.getTsFiles();
+ for (PipeData data : pipeData)
+ if (PipeData.Type.TSFILE.equals(data.getType())) {
+ tsFiles = ((TsFilePipeData) data).getTsFiles();
for (File file : tsFiles) {
Files.deleteIfExists(file.toPath());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
index cd83d49..d24598f 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/recovery/TsFilePipeLogAnalyzer.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.newsync.sender.recovery;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipeData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@ public class TsFilePipeLogAnalyzer {
private final String pipeDir;
private final String pipeLogDir;
- private BlockingDeque<TsFilePipeData> pipeData;
+ private BlockingDeque<PipeData> pipeDataDeque;
private long removeSerialNumber;
public TsFilePipeLogAnalyzer(TsFilePipe pipe) {
@@ -55,19 +55,19 @@ public class TsFilePipeLogAnalyzer {
pipeLogDir = new File(pipeDir, SenderConf.pipeLogDirName).getPath();
}
- public BlockingDeque<TsFilePipeData> recover() {
- pipeData = new LinkedBlockingDeque<>();
+ public BlockingDeque<PipeData> recover() {
+ pipeDataDeque = new LinkedBlockingDeque<>();
removeSerialNumber = Long.MIN_VALUE;
if (!new File(pipeDir).exists()) {
- return pipeData;
+ return pipeDataDeque;
}
deserializeRemoveSerialNumber();
recoverHistoryData();
recoverRealTimeData();
- return pipeData;
+ return pipeDataDeque;
}
private void deserializeRemoveSerialNumber() {
@@ -105,15 +105,15 @@ public class TsFilePipeLogAnalyzer {
if (removeSerialNumber < 0) {
try {
- List<TsFilePipeData> historyPipeData = parseFile(historyPipeLog);
- for (TsFilePipeData data : historyPipeData)
+ List<PipeData> historyPipeData = parseFile(historyPipeLog);
+ for (PipeData data : historyPipeData)
if (data.getSerialNumber() > removeSerialNumber) {
- pipeData.offer(data);
+ pipeDataDeque.offer(data);
}
} catch (IOException e) {
logger.error(
String.format(
- "Can not parse history pipe log %s, because %s", historyPipeLog.getPath()));
+ "Can not parse history pipe log %s, because %s", historyPipeLog.getPath(), e));
}
} else {
try {
@@ -143,10 +143,10 @@ public class TsFilePipeLogAnalyzer {
File realTimePipeLog =
new File(this.pipeLogDir, SenderConf.getRealTimePipeLogName(startNumber));
try {
- List<TsFilePipeData> realTimeData = parseFile(realTimePipeLog);
- for (TsFilePipeData data : realTimeData)
+ List<PipeData> realTimeData = parseFile(realTimePipeLog);
+ for (PipeData data : realTimeData)
if (data.getSerialNumber() > removeSerialNumber) {
- pipeData.offer(data);
+ pipeDataDeque.offer(data);
}
} catch (IOException e) {
logger.error(
@@ -161,12 +161,12 @@ public class TsFilePipeLogAnalyzer {
return new File(pipeDir, SenderConf.pipeCollectFinishLockName).exists();
}
- public static List<TsFilePipeData> parseFile(File file) throws IOException {
- List<TsFilePipeData> pipeData = new ArrayList<>();
+ public static List<PipeData> parseFile(File file) throws IOException {
+ List<PipeData> pipeData = new ArrayList<>();
DataInputStream inputStream = new DataInputStream(new FileInputStream(file));
try {
while (true) {
- pipeData.add(TsFilePipeData.deserialize(inputStream));
+ pipeData.add(PipeData.deserialize(inputStream));
}
} catch (EOFException e) {
logger.info(String.format("Finish parsing pipeLog %s.", file.getPath()));
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java b/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
index 498c714..8e1ad25 100644
--- a/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/sender/service/SenderService.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.exception.PipeException;
import org.apache.iotdb.db.exception.PipeSinkException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.newsync.sender.conf.SenderConf;
-import org.apache.iotdb.db.newsync.sender.pipe.IoTDBPipeSink;
import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
import org.apache.iotdb.db.newsync.sender.pipe.PipeSink;
import org.apache.iotdb.db.newsync.sender.pipe.TsFilePipe;
@@ -177,18 +176,15 @@ public class SenderService implements IService {
}
// get TsFilePipe
- PipeSink.Type pipeSinkType = pipeSink.getType();
- if (!pipeSinkType.equals(PipeSink.Type.IoTDB)) {
- throw new PipeException(
- String.format(
- "Wrong pipeSink type %s for create TsFilePipe.", pipeSinkType)); // internal error
- }
+ // PipeSink.Type pipeSinkType = pipeSink.getType();
+ // if (!pipeSinkType.equals(PipeSink.Type.IoTDB)) {
+ // throw new PipeException(
+ // String.format(
+ // "Wrong pipeSink type %s for create TsFilePipe.", pipeSinkType)); // internal
+ // error
+ // }
return new TsFilePipe(
- pipeCreateTime,
- plan.getPipeName(),
- (IoTDBPipeSink) pipeSink,
- plan.getDataStartTimestamp(),
- syncDelOp);
+ pipeCreateTime, plan.getPipeName(), pipeSink, plan.getDataStartTimestamp(), syncDelOp);
}
public void stopPipe(String pipeName) throws PipeException {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 2a82e6e..4aa910e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -193,7 +193,9 @@ public abstract class Operator {
PRUNE_TEMPLATE,
APPEND_TEMPLATE,
DROP_TEMPLATE,
+
SHOW_QUERY_RESOURCE,
+
CREATE_PIPESINK,
DROP_PIPESINK,
SHOW_PIPESINK,