You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/25 11:47:56 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated:
refactor serialization of PhysicalPlans
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 64bcce8 refactor serialization of PhysicalPlans
new 87f7b35 Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
64bcce8 is described below
commit 64bcce87e15288d1b4aed8a62efe58c9b9f4e8cf
Author: 江天 <jt...@163.com>
AuthorDate: Tue Jun 25 19:45:20 2019 +0800
refactor serialization of PhysicalPlans
---
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 59 +++++++++++++++++++-
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 1 -
.../iotdb/db/qp/physical/crud/DeletePlan.java | 18 +++++-
.../iotdb/db/qp/physical/crud/FillQueryPlan.java | 1 -
.../iotdb/db/qp/physical/crud/GroupByPlan.java | 1 -
.../iotdb/db/qp/physical/crud/InsertPlan.java | 45 ++++++++++++++-
.../iotdb/db/qp/physical/crud/QueryPlan.java | 1 -
.../iotdb/db/qp/physical/crud/UpdatePlan.java | 1 -
.../iotdb/db/qp/physical/sys/AuthorPlan.java | 1 -
.../iotdb/db/qp/physical/sys/LoadDataPlan.java | 1 -
.../iotdb/db/qp/physical/sys/MetadataPlan.java | 1 -
.../iotdb/db/qp/physical/sys/PropertyPlan.java | 1 -
.../iotdb/db/writelog/io/BatchLogReader.java | 64 ++++++++++++++++++++++
.../apache/iotdb/db/writelog/io/ILogWriter.java | 7 ++-
.../org/apache/iotdb/db/writelog/io/LogWriter.java | 40 ++++++++------
.../iotdb/db/writelog/io/SingleFileLogReader.java | 34 +++++++-----
.../db/writelog/node/ExclusiveWriteLogNode.java | 38 +++++++++----
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 15 ++---
.../iotdb/db/writelog/WriteLogNodeManagerTest.java | 3 -
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 20 +------
.../iotdb/db/writelog/io/LogWriterReaderTest.java | 21 +++----
.../db/writelog/io/MultiFileLogReaderTest.java | 11 ++--
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 1 +
23 files changed, 281 insertions(+), 104 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index d83491e..de17c57 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -19,18 +19,24 @@
package org.apache.iotdb.db.qp.physical;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.transfer.SystemLogOperator;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
/**
* This class is a abstract class for all type of PhysicalPlan.
*/
-public abstract class PhysicalPlan implements Serializable {
+public abstract class PhysicalPlan {
- private static final long serialVersionUID = -6274856391535568352L;
private boolean isQuery;
private Operator.OperatorType operatorType;
+ private static final int NULL_VALUE_LEN = -1;
+
/**
* plans in a Storage Group are executed serially, this id is to guarantee in recovery stage plans
* in WAL can be redone in the same order
@@ -92,4 +98,53 @@ public abstract class PhysicalPlan implements Serializable {
public void setPlanId(long planId) {
this.planId = planId;
}
+
+ public void serializeTo(ByteBuffer buffer) {
+ throw new UnsupportedOperationException("serialize of unimplemented");
+ }
+
+ public void deserializeFrom(ByteBuffer buffer) {
+ throw new UnsupportedOperationException("serialize of unimplemented");
+ }
+
+ protected void putString(ByteBuffer buffer, String value) {
+ if (value == null) {
+ buffer.putInt(NULL_VALUE_LEN);
+ } else {
+ ReadWriteIOUtils.write(value, buffer);
+ }
+ }
+
+ protected String readString(ByteBuffer buffer) {
+ int valueLen = buffer.getInt();
+ if (valueLen == NULL_VALUE_LEN) {
+ return null;
+ }
+ return ReadWriteIOUtils.readStringWithoutLength(buffer, valueLen);
+ }
+
+ public static class Factory {
+
+ private Factory() {
+ // hidden initializer
+ }
+
+ public static PhysicalPlan create(ByteBuffer buffer) {
+ byte type = buffer.get();
+ PhysicalPlan plan;
+ switch (type) {
+ case SystemLogOperator.INSERT:
+ plan = new InsertPlan();
+ plan.deserializeFrom(buffer);
+ break;
+ case SystemLogOperator.DELETE:
+ plan = new DeletePlan();
+ plan.deserializeFrom(buffer);
+ break;
+ default:
+ throw new UnsupportedOperationException("unrecognized log type " + type);
+ }
+ return plan;
+ }
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 6e5f430..43a5e28 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.qp.logical.Operator;
public class AggregationPlan extends QueryPlan {
- private static final long serialVersionUID = -2049810573809076643L;
private List<String> aggregations = new ArrayList<>();
public AggregationPlan() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index ae183bf..4f518d9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -18,16 +18,18 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.transfer.SystemLogOperator;
import org.apache.iotdb.tsfile.read.common.Path;
public class DeletePlan extends PhysicalPlan {
- private static final long serialVersionUID = -6532570247476907037L;
private long deleteTime;
private List<Path> paths = new ArrayList<>();
@@ -92,4 +94,18 @@ public class DeletePlan extends PhysicalPlan {
return deleteTime == that.deleteTime && Objects.equals(paths, that.paths);
}
+ @Override
+ public void serializeTo(ByteBuffer buffer) {
+ int type = SystemLogOperator.DELETE;
+ buffer.put((byte) type);
+ buffer.putLong(deleteTime);
+ putString(buffer, paths.get(0).getFullPath());
+ }
+
+ @Override
+ public void deserializeFrom(ByteBuffer buffer) {
+ this.deleteTime = buffer.getLong();
+ this.paths = new ArrayList();
+ this.paths.add(new Path(readString(buffer)));
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index 4fd666d..7f0399f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class FillQueryPlan extends QueryPlan {
- private static final long serialVersionUID = -2091710518816582444L;
private long queryTime;
private Map<TSDataType, IFill> fillType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
index d016c36..f4087eb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
public class GroupByPlan extends AggregationPlan {
- private static final long serialVersionUID = 8769258112457178898L;
private long unit;
private long origin;
private List<Pair<Long, Long>> intervals; // show intervals
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 806b693..b017e3c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -26,13 +27,13 @@ import java.util.Objects;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.transfer.SystemLogOperator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.record.TSRecord;
public class InsertPlan extends PhysicalPlan {
- private static final long serialVersionUID = 6102845312368561515L;
private String deviceId;
private String[] measurements;
private TSDataType[] dataTypes;
@@ -43,6 +44,10 @@ public class InsertPlan extends PhysicalPlan {
// 1 : BufferWrite Insert 2 : Overflow Insert
private int insertType;
+ public InsertPlan() {
+ super(false, OperatorType.INSERT);
+ }
+
public InsertPlan(String deviceId, long insertTime, String measurement, String insertValue) {
super(false, OperatorType.INSERT);
this.time = insertTime;
@@ -156,4 +161,42 @@ public class InsertPlan extends PhysicalPlan {
&& Arrays.equals(values, that.values);
}
+ @Override
+ public void serializeTo(ByteBuffer buffer) {
+ int type = SystemLogOperator.INSERT;
+ buffer.put((byte) type);
+ buffer.put((byte) insertType);
+ buffer.putLong(time);
+
+ putString(buffer, deviceId);
+
+ buffer.putInt(measurements.length);
+ for (String m : measurements) {
+ putString(buffer, m);
+ }
+
+ buffer.putInt(values.length);
+ for (String m : values) {
+ putString(buffer, m);
+ }
+ }
+
+ @Override
+ public void deserializeFrom(ByteBuffer buffer) {
+ this.insertType = buffer.get();
+ this.time = buffer.getLong();
+ this.deviceId = readString(buffer);
+
+ int measurementSize = buffer.getInt();
+ this.measurements = new String[measurementSize];
+ for (int i = 0; i < measurementSize; i++) {
+ measurements[i] = readString(buffer);
+ }
+
+ int valueSize = buffer.getInt();
+ this.values = new String[valueSize];
+ for (int i = 0; i < valueSize; i++) {
+ values[i] = readString(buffer);
+ }
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 6bf7c0e..1a4377c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
public class QueryPlan extends PhysicalPlan {
- private static final long serialVersionUID = -5865840981549195660L;
private List<Path> paths = null;
private IExpression expression = null;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
index 60768d6..5da9c2a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.utils.StringContainer;
public class UpdatePlan extends PhysicalPlan {
- private static final long serialVersionUID = 8952248212926920033L;
private List<Pair<Long, Long>> intervals = new ArrayList<>();
private String value;
private Path path;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
index 339f1e0..8cb2116 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class AuthorPlan extends PhysicalPlan {
- private static final long serialVersionUID = 6501894026593590182L;
private final AuthorOperator.AuthorType authorType;
private String userName;
private String roleName;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
index f9b4553..7e2eb7a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class LoadDataPlan extends PhysicalPlan {
- private static final long serialVersionUID = -6631296704227106470L;
private final String inputFilePath;
private final String measureType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
index e3b71f5..93a98a3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class MetadataPlan extends PhysicalPlan {
- private static final long serialVersionUID = -3717406842093744475L;
private final MetadataOperator.NamespaceType namespaceType;
private Path path;
private TSDataType dataType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
index 94ec623..2f2c591 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
*/
public class PropertyPlan extends PhysicalPlan {
- private static final long serialVersionUID = -1462399624512066104L;
private final PropertyOperator.PropertyType propertyType;
private Path propertyPath;
private Path metadataPath;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/BatchLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/BatchLogReader.java
new file mode 100644
index 0000000..5ffa643
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/BatchLogReader.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.writelog.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+/**
+ * BatchedLogReader reads logs from a binary batch of log in the format of ByteBuffer. The
+ * ByteBuffer must be readable.
+ */
+public class BatchLogReader implements ILogReader{
+
+ Iterator<PhysicalPlan> planIterator;
+
+ public BatchLogReader(ByteBuffer buffer) {
+ List<PhysicalPlan> logs = readLogs(buffer);
+ this.planIterator = logs.iterator();
+ }
+
+ private List<PhysicalPlan> readLogs(ByteBuffer buffer) {
+ List<PhysicalPlan> plans = new ArrayList<>();
+ while (buffer.position() != buffer.limit()) {
+ plans.add(PhysicalPlan.Factory.create(buffer));
+ }
+ return plans;
+ }
+
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return planIterator.hasNext();
+ }
+
+ @Override
+ public PhysicalPlan next() throws IOException {
+ return planIterator.next();
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
index fc1c33d..8fb24f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.writelog.io;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
/**
@@ -32,11 +33,11 @@ public interface ILogWriter {
* NOTICE: the logs may be cached in the OS/FileSystem, if the OS/FileSystem you are using do
* not guarantee strong persistency and you want the logs to be persisted immediately, please
* call force() after calling this method.
- * @param logCache WAL logs that have been converted to bytes, each element in this list
- * represents one log.
+ * Notice: do not flip the buffer before calling this method
+ * @param logBuffer WAL logs that have been converted to bytes
* @throws IOException
*/
- void write(List<byte[]> logCache) throws IOException;
+ void write(ByteBuffer logBuffer) throws IOException;
/**
* force the OS/FileSystem to flush its cache to make sure logs are persisted.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
index 5abc925..903d0a4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
@@ -23,7 +23,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.List;
import java.util.zip.CRC32;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,6 +38,8 @@ public class LogWriter implements ILogWriter {
private FileChannel channel;
private CRC32 checkSummer = new CRC32();
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+ private ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
public LogWriter(String logFilePath) {
logFile = new File(logFilePath);
@@ -49,25 +50,32 @@ public class LogWriter implements ILogWriter {
}
@Override
- public void write(List<byte[]> logCache) throws IOException {
+ public void write(ByteBuffer logBuffer) throws IOException {
if (channel == null) {
fileOutputStream = new FileOutputStream(logFile, true);
channel = fileOutputStream.getChannel();
}
- int totalSize = 0;
- for (byte[] bytes : logCache) {
- totalSize += 4 + 8 + bytes.length;
- }
- ByteBuffer buffer = ByteBuffer.allocate(totalSize);
- for (byte[] bytes : logCache) {
- buffer.putInt(bytes.length);
- checkSummer.reset();
- checkSummer.update(bytes);
- buffer.putLong(checkSummer.getValue());
- buffer.put(bytes);
- }
- buffer.flip();
- channel.write(buffer);
+ logBuffer.flip();
+ int logSize = logBuffer.limit();
+ // 4 bytes length and 8 bytes check sum
+
+ checkSummer.reset();
+ checkSummer.update(logBuffer);
+ long checkSum = checkSummer.getValue();
+
+ logBuffer.flip();
+
+ lengthBuffer.clear();
+ checkSumBuffer.clear();
+ lengthBuffer.putInt(logSize);
+ checkSumBuffer.putLong(checkSum);
+ lengthBuffer.flip();
+ checkSumBuffer.flip();
+
+ channel.write(lengthBuffer);
+ channel.write(logBuffer);
+ channel.write(checkSumBuffer);
+
if (config.getForceWalPeriodInMs() == 0) {
channel.force(true);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index 445c573..bacb8f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -24,11 +24,10 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.zip.CRC32;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,26 +38,26 @@ import org.slf4j.LoggerFactory;
public class SingleFileLogReader implements ILogReader {
private static final Logger logger = LoggerFactory.getLogger(SingleFileLogReader.class);
- private int bufferSize = IoTDBDescriptor.getInstance().getConfig().getMaxLogEntrySize();
public static final int LEAST_LOG_SIZE = 12; // size + checksum
private DataInputStream logStream;
private String filepath;
- private byte[] buffer = new byte[bufferSize];
+ private byte[] buffer;
private CRC32 checkSummer = new CRC32();
- private PhysicalPlan planBuffer = null;
// used to indicate the position of the broken log
private int idx;
+ private BatchLogReader batchLogReader;
+
public SingleFileLogReader(File logFile) throws FileNotFoundException {
open(logFile);
}
@Override
public boolean hasNext() throws IOException{
- if (planBuffer != null) {
+ if (batchLogReader != null && batchLogReader.hasNext()) {
return true;
}
@@ -67,19 +66,26 @@ public class SingleFileLogReader implements ILogReader {
}
int logSize = logStream.readInt();
- if (logSize > bufferSize) {
- bufferSize = logSize;
- buffer = new byte[bufferSize];
+ if (logSize <= 0) {
+ return false;
+ }
+ buffer = new byte[logSize];
+
+ int readLen = logStream.read(buffer, 0, logSize);
+ if (readLen < logSize) {
+ throw new IOException("Reach eof");
}
+
final long checkSum = logStream.readLong();
- logStream.read(buffer, 0, logSize);
checkSummer.reset();
checkSummer.update(buffer, 0, logSize);
if (checkSummer.getValue() != checkSum) {
- throw new IOException(String.format("The check sum of the No.%d log is incorrect! In file: "
+ throw new IOException(String.format("The check sum of the No.%d log batch is incorrect! In "
+ + "file: "
+ "%d Calculated: %d.", idx, checkSum, checkSummer.getValue()));
}
- planBuffer = PhysicalPlanLogTransfer.logToPlan(buffer);
+
+ batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
return true;
}
@@ -89,10 +95,8 @@ public class SingleFileLogReader implements ILogReader {
throw new NoSuchElementException();
}
- PhysicalPlan ret = planBuffer;
- planBuffer = null;
idx ++;
- return ret;
+ return batchLogReader.next();
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 23cb319..4e1b221 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -14,8 +14,11 @@
*/
package org.apache.iotdb.db.writelog.node;
+import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils.IO;
import java.io.File;
import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@@ -42,6 +45,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
public static final String WAL_FILE_NAME = "wal";
private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
+ private static int logBufferSize = 64*1024*1024;
private String identifier;
@@ -51,13 +55,15 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private List<byte[]> logCache = new ArrayList<>(config.getFlushWalThreshold());
+ private ByteBuffer logBuffer = ByteBuffer.allocate(logBufferSize);
private ReadWriteLock lock = new ReentrantReadWriteLock();
private long fileId = 0;
private long lastFlushedId = 0;
+ private int bufferedLogNum = 0;
+
/**
* constructor of ExclusiveWriteLogNode.
*
@@ -75,16 +81,26 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
lock.writeLock().lock();
try {
long start = System.currentTimeMillis();
- byte[] logBytes = PhysicalPlanLogTransfer.planToLog(plan);
- logCache.add(logBytes);
+ logBuffer.mark();
+ try {
+ plan.serializeTo(logBuffer);
+ } catch (BufferOverflowException e) {
+ logBuffer.reset();
+ sync();
+ plan.serializeTo(logBuffer);
+ }
+
+ bufferedLogNum ++;
- if (logCache.size() >= config.getFlushWalThreshold()) {
+ if (bufferedLogNum >= config.getFlushWalThreshold()) {
sync();
}
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
logger.info("WAL insert cost {} ms", elapse);
}
+ } catch (BufferOverflowException e) {
+ throw new IOException("Log cannot fit into buffer", e);
} finally {
lock.writeLock().unlock();
}
@@ -167,7 +183,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
lock.writeLock().lock();
try {
long start = System.currentTimeMillis();
- logCache.clear();
+ logBuffer.clear();
close();
FileUtils.deleteDirectory(new File(logDirectory));
long elapse = System.currentTimeMillis() - start;
@@ -204,7 +220,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
lock.writeLock().lock();
try {
long start = System.currentTimeMillis();
- logger.debug("Log node {} starts force, {} logs to be forced", identifier, logCache.size());
+ logger.debug("Log node {} starts force, {} logs to be forced", identifier, bufferedLogNum);
try {
if (currentFileWriter != null) {
currentFileWriter.force();
@@ -226,16 +242,18 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
lock.writeLock().lock();
try {
long start = System.currentTimeMillis();
- logger.debug("Log node {} starts sync, {} logs to be synced", identifier, logCache.size());
- if (logCache.isEmpty()) {
+ logger.debug("Log node {} starts sync, {} logs to be synced", identifier, bufferedLogNum);
+ if (bufferedLogNum == 0) {
return;
}
try {
- getCurrentFileWriter().write(logCache);
+ getCurrentFileWriter().write(logBuffer);
} catch (IOException e) {
logger.error("Log node {} sync failed", identifier, e);
}
- logCache.clear();
+ logBuffer.clear();
+ bufferedLogNum = 0;
+
logger.debug("Log node {} ends sync.", identifier);
long elapse = System.currentTimeMillis() - start;
if (elapse > 1000) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 16862cb..586eb79 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
@@ -74,14 +75,14 @@ public class WalCheckerTest {
LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
+ WAL_FILE_NAME);
- List<byte[]> binaryPlans = new ArrayList<>();
+ ByteBuffer binaryPlans = ByteBuffer.allocate(64*1024);
String deviceId = "device1";
String[] measurements = new String[]{"s1", "s2", "s3"};
String[] values = new String[]{"5", "6", "7"};
for (int j = 0; j < 10; j++) {
- binaryPlans.add(PhysicalPlanLogTransfer
- .planToLog(new InsertPlan(deviceId, j, measurements, values)));
+ new InsertPlan(deviceId, j, measurements, values).serializeTo(binaryPlans);
}
+ binaryPlans.flip();
logWriter.write(binaryPlans);
logWriter.force();
@@ -107,17 +108,17 @@ public class WalCheckerTest {
LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
+ WAL_FILE_NAME);
- List<byte[]> binaryPlans = new ArrayList<>();
+ ByteBuffer binaryPlans = ByteBuffer.allocate(64*1024);
String deviceId = "device1";
String[] measurements = new String[]{"s1", "s2", "s3"};
String[] values = new String[]{"5", "6", "7"};
for (int j = 0; j < 10; j++) {
- binaryPlans.add(PhysicalPlanLogTransfer
- .planToLog(new InsertPlan(deviceId, j, measurements, values)));
+ new InsertPlan(deviceId, j, measurements, values).serializeTo(binaryPlans);
}
if (i > 2) {
- binaryPlans.add("not a wal".getBytes());
+ binaryPlans.put("not a wal".getBytes());
}
+ binaryPlans.flip();
logWriter.write(binaryPlans);
logWriter.force();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index 1aa083e..22b9469 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
@@ -88,11 +87,9 @@ public class WriteLogNodeManagerTest {
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
new String[]{"1.0", "15", "str", "false"});
- UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
- logNode.write(updatePlan);
logNode.write(deletePlan);
File walFile = new File(logNode.getLogDirectory() + File.separator + "wal1");
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index b01988a..eacc136 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -68,11 +68,9 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan(1, identifier, 100,
new String[]{"s1", "s2", "s3", "s4"},
new String[]{"1.0", "15", "str", "false"});
- UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path(identifier + ".s1"));
DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
logNode.write(bwInsertPlan);
- logNode.write(updatePlan);
logNode.write(deletePlan);
logNode.close();
@@ -83,7 +81,6 @@ public class WriteLogNodeTest {
ILogReader reader = logNode.getLogReader();
assertEquals(bwInsertPlan, reader.next());
- assertEquals(updatePlan, reader.next());
assertEquals(deletePlan, reader.next());
logNode.delete();
@@ -100,24 +97,15 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan(1, identifier, 100,
new String[]{"s1", "s2", "s3", "s4"},
new String[]{"1.0", "15", "str", "false"});
- UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path(identifier + ".s1"));
DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
logNode.write(bwInsertPlan);
logNode.notifyStartFlush();
- logNode.write(updatePlan);
- logNode.notifyStartFlush();
logNode.write(deletePlan);
logNode.notifyStartFlush();
ILogReader logReader = logNode.getLogReader();
assertEquals(bwInsertPlan, logReader.next());
- assertEquals(updatePlan, logReader.next());
- assertEquals(deletePlan, logReader.next());
-
- logNode.notifyEndFlush();
- logReader = logNode.getLogReader();
- assertEquals(updatePlan, logReader.next());
assertEquals(deletePlan, logReader.next());
logNode.notifyEndFlush();
@@ -135,18 +123,16 @@ public class WriteLogNodeTest {
public void testSyncThreshold() throws IOException {
// this test checks that if more logs than threshold are written, a sync will be triggered.
int flushWalThreshold = config.getFlushWalThreshold();
- config.setFlushWalThreshold(3);
+ config.setFlushWalThreshold(2);
WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice");
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
new String[]{"1.0", "15", "str", "false"});
- UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
- logNode.write(updatePlan);
File walFile = new File(
config.getWalFolder() + File.separator + "root.logTestDevice" + File.separator + "wal1");
@@ -169,11 +155,9 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
new String[]{"s1", "s2", "s3", "s4"},
new String[]{"1.0", "15", "str", "false"});
- UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
- logNode.write(updatePlan);
logNode.write(deletePlan);
logNode.forceSync();
@@ -194,7 +178,7 @@ public class WriteLogNodeTest {
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice.oversize", 100,
new String[]{"s1", "s2", "s3", "s4"},
- new String[]{"1.0", "15", new String(new char[4 * 1024 * 1024]), "false"});
+ new String[]{"1.0", "15", new String(new char[65 * 1024 * 1024]), "false"});
boolean caught = false;
try {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index 0febe3b..c97cb13 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -19,17 +19,17 @@
package org.apache.iotdb.db.writelog.io;
-import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.Before;
import org.junit.Test;
@@ -37,7 +37,7 @@ import org.junit.Test;
public class LogWriterReaderTest {
private static String filePath = "logtest.test";
- List<byte[]> logs = new ArrayList<>();
+ ByteBuffer logsBuffer = ByteBuffer.allocate(64*1024);
List<PhysicalPlan> plans = new ArrayList<>();
@Before
@@ -49,32 +49,29 @@ public class LogWriterReaderTest {
new String[]{"1", "2"});
InsertPlan insertPlan2 = new InsertPlan(2, "d1", 10L, new String[]{"s1", "s2"},
new String[]{"1", "2"});
- UpdatePlan updatePlan = new UpdatePlan(8L, 11L, "3", new Path("root.d1.s1"));
DeletePlan deletePlan = new DeletePlan(10L, new Path("root.d1.s1"));
plans.add(insertPlan1);
plans.add(insertPlan2);
- plans.add(updatePlan);
plans.add(deletePlan);
for (PhysicalPlan plan : plans) {
- logs.add(PhysicalPlanLogTransfer.planToLog(plan));
-
+ plan.serializeTo(logsBuffer);
}
}
@Test
public void testWriteAndRead() throws IOException {
LogWriter writer = new LogWriter(filePath);
- writer.write(logs);
+ writer.write(logsBuffer);
try {
writer.force();
writer.close();
SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
- List<byte[]> res = new ArrayList<>();
+ List<PhysicalPlan> res = new ArrayList<>();
while (reader.hasNext()) {
- res.add(PhysicalPlanLogTransfer.planToLog(reader.next()));
+ res.add(reader.next());
}
- for (int i = 0; i < logs.size(); i++) {
- assertArrayEquals(logs.get(i), res.get(i));
+ for (int i = 0; i < plans.size(); i++) {
+ assertEquals(plans.get(i), res.get(i));
}
reader.close();
} finally {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
index 1729cfa..0530aa4 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
@@ -23,12 +23,10 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.ByteBuffer;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.After;
import org.junit.Before;
@@ -51,13 +49,12 @@ public class MultiFileLogReaderTest {
fileLogs[i][j] = new DeletePlan(i * logsPerFile + j, new Path("path" + j));
}
- List<byte[]> logCache = new ArrayList<>();
+ ByteBuffer buffer = ByteBuffer.allocate(64*1024);
for (PhysicalPlan plan : fileLogs[i]) {
- logCache.add(PhysicalPlanLogTransfer.planToLog(plan));
+ plan.serializeTo(buffer);
}
-
ILogWriter writer = new LogWriter(logFiles[i]);
- writer.write(logCache);
+ writer.write(buffer);
writer.force();
writer.close();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index e99b021..aa5961f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -88,6 +88,7 @@ public class TsFileIOWriter {
*/
public TsFileIOWriter(File file) throws IOException {
this.out = new DefaultTsFileOutput(file);
+ startFile();
}
/**