You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/25 11:47:56 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: refactor serialization of PhysicalPlans

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 64bcce8  refactor serialization of PhysicalPlans
     new 87f7b35  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
64bcce8 is described below

commit 64bcce87e15288d1b4aed8a62efe58c9b9f4e8cf
Author: 江天 <jt...@163.com>
AuthorDate: Tue Jun 25 19:45:20 2019 +0800

    refactor serialization of PhysicalPlans
---
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  | 59 +++++++++++++++++++-
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |  1 -
 .../iotdb/db/qp/physical/crud/DeletePlan.java      | 18 +++++-
 .../iotdb/db/qp/physical/crud/FillQueryPlan.java   |  1 -
 .../iotdb/db/qp/physical/crud/GroupByPlan.java     |  1 -
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 45 ++++++++++++++-
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  1 -
 .../iotdb/db/qp/physical/crud/UpdatePlan.java      |  1 -
 .../iotdb/db/qp/physical/sys/AuthorPlan.java       |  1 -
 .../iotdb/db/qp/physical/sys/LoadDataPlan.java     |  1 -
 .../iotdb/db/qp/physical/sys/MetadataPlan.java     |  1 -
 .../iotdb/db/qp/physical/sys/PropertyPlan.java     |  1 -
 .../iotdb/db/writelog/io/BatchLogReader.java       | 64 ++++++++++++++++++++++
 .../apache/iotdb/db/writelog/io/ILogWriter.java    |  7 ++-
 .../org/apache/iotdb/db/writelog/io/LogWriter.java | 40 ++++++++------
 .../iotdb/db/writelog/io/SingleFileLogReader.java  | 34 +++++++-----
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 38 +++++++++----
 .../org/apache/iotdb/db/tools/WalCheckerTest.java  | 15 ++---
 .../iotdb/db/writelog/WriteLogNodeManagerTest.java |  3 -
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java | 20 +------
 .../iotdb/db/writelog/io/LogWriterReaderTest.java  | 21 +++----
 .../db/writelog/io/MultiFileLogReaderTest.java     | 11 ++--
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  1 +
 23 files changed, 281 insertions(+), 104 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index d83491e..de17c57 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -19,18 +19,24 @@
 package org.apache.iotdb.db.qp.physical;
 
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.transfer.SystemLogOperator;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 /**
  * This class is a abstract class for all type of PhysicalPlan.
  */
-public abstract class PhysicalPlan implements Serializable {
+public abstract class PhysicalPlan {
 
-  private static final long serialVersionUID = -6274856391535568352L;
   private boolean isQuery;
   private Operator.OperatorType operatorType;
+  private static final int NULL_VALUE_LEN = -1;
+
   /**
    * plans in a Storage Group are executed serially, this id is to guarantee in recovery stage plans
    * in WAL can be redone in the same order
@@ -92,4 +98,53 @@ public abstract class PhysicalPlan implements Serializable {
   public void setPlanId(long planId) {
     this.planId = planId;
   }
+
+  public void serializeTo(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("serialize of unimplemented");
+  }
+
+  public void deserializeFrom(ByteBuffer buffer) {
+    throw new UnsupportedOperationException("serialize of unimplemented");
+  }
+
+  protected void putString(ByteBuffer buffer, String value) {
+    if (value == null) {
+      buffer.putInt(NULL_VALUE_LEN);
+    } else {
+      ReadWriteIOUtils.write(value, buffer);
+    }
+  }
+
+  protected String readString(ByteBuffer buffer) {
+    int valueLen = buffer.getInt();
+    if (valueLen == NULL_VALUE_LEN) {
+      return null;
+    }
+    return ReadWriteIOUtils.readStringWithoutLength(buffer, valueLen);
+  }
+
+  public static class Factory {
+
+    private Factory() {
+      // hidden initializer
+    }
+
+    public static PhysicalPlan create(ByteBuffer buffer) {
+      byte type = buffer.get();
+      PhysicalPlan plan;
+      switch (type) {
+        case SystemLogOperator.INSERT:
+          plan = new InsertPlan();
+          plan.deserializeFrom(buffer);
+          break;
+        case SystemLogOperator.DELETE:
+          plan = new DeletePlan();
+          plan.deserializeFrom(buffer);
+          break;
+        default:
+          throw new UnsupportedOperationException("unrecognized log type " + type);
+      }
+      return plan;
+    }
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 6e5f430..43a5e28 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.qp.logical.Operator;
 
 public class AggregationPlan extends QueryPlan {
 
-  private static final long serialVersionUID = -2049810573809076643L;
   private List<String> aggregations = new ArrayList<>();
 
   public AggregationPlan() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index ae183bf..4f518d9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -18,16 +18,18 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.transfer.SystemLogOperator;
 import org.apache.iotdb.tsfile.read.common.Path;
 
 public class DeletePlan extends PhysicalPlan {
 
-  private static final long serialVersionUID = -6532570247476907037L;
   private long deleteTime;
   private List<Path> paths = new ArrayList<>();
 
@@ -92,4 +94,18 @@ public class DeletePlan extends PhysicalPlan {
     return deleteTime == that.deleteTime && Objects.equals(paths, that.paths);
   }
 
+  @Override
+  public void serializeTo(ByteBuffer buffer) {
+    int type = SystemLogOperator.DELETE;
+    buffer.put((byte) type);
+    buffer.putLong(deleteTime);
+    putString(buffer, paths.get(0).getFullPath());
+  }
+
+  @Override
+  public void deserializeFrom(ByteBuffer buffer) {
+    this.deleteTime = buffer.getLong();
+    this.paths = new ArrayList();
+    this.paths.add(new Path(readString(buffer)));
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index 4fd666d..7f0399f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class FillQueryPlan extends QueryPlan {
 
-  private static final long serialVersionUID = -2091710518816582444L;
   private long queryTime;
   private Map<TSDataType, IFill> fillType;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
index d016c36..f4087eb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByPlan.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 public class GroupByPlan extends AggregationPlan {
 
-  private static final long serialVersionUID = 8769258112457178898L;
   private long unit;
   private long origin;
   private List<Pair<Long, Long>> intervals; // show intervals
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 806b693..b017e3c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -26,13 +27,13 @@ import java.util.Objects;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.transfer.SystemLogOperator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 
 public class InsertPlan extends PhysicalPlan {
 
-  private static final long serialVersionUID = 6102845312368561515L;
   private String deviceId;
   private String[] measurements;
   private TSDataType[] dataTypes;
@@ -43,6 +44,10 @@ public class InsertPlan extends PhysicalPlan {
   // 1 : BufferWrite Insert 2 : Overflow Insert
   private int insertType;
 
+  public InsertPlan() {
+    super(false, OperatorType.INSERT);
+  }
+
   public InsertPlan(String deviceId, long insertTime, String measurement, String insertValue) {
     super(false, OperatorType.INSERT);
     this.time = insertTime;
@@ -156,4 +161,42 @@ public class InsertPlan extends PhysicalPlan {
         && Arrays.equals(values, that.values);
   }
 
+  @Override
+  public void serializeTo(ByteBuffer buffer) {
+    int type = SystemLogOperator.INSERT;
+    buffer.put((byte) type);
+    buffer.put((byte) insertType);
+    buffer.putLong(time);
+
+    putString(buffer, deviceId);
+
+    buffer.putInt(measurements.length);
+    for (String m : measurements) {
+      putString(buffer, m);
+    }
+
+    buffer.putInt(values.length);
+    for (String m : values) {
+      putString(buffer, m);
+    }
+  }
+
+  @Override
+  public void deserializeFrom(ByteBuffer buffer) {
+    this.insertType = buffer.get();
+    this.time = buffer.getLong();
+    this.deviceId = readString(buffer);
+
+    int measurementSize = buffer.getInt();
+    this.measurements = new String[measurementSize];
+    for (int i = 0; i < measurementSize; i++) {
+      measurements[i] = readString(buffer);
+    }
+
+    int valueSize = buffer.getInt();
+    this.values = new String[valueSize];
+    for (int i = 0; i < valueSize; i++) {
+      values[i] = readString(buffer);
+    }
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 6bf7c0e..1a4377c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 
 public class QueryPlan extends PhysicalPlan {
 
-  private static final long serialVersionUID = -5865840981549195660L;
   private List<Path> paths = null;
   private IExpression expression = null;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
index 60768d6..5da9c2a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/crud/UpdatePlan.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.utils.StringContainer;
 
 public class UpdatePlan extends PhysicalPlan {
 
-  private static final long serialVersionUID = 8952248212926920033L;
   private List<Pair<Long, Long>> intervals = new ArrayList<>();
   private String value;
   private Path path;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
index 339f1e0..8cb2116 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
 
 public class AuthorPlan extends PhysicalPlan {
 
-  private static final long serialVersionUID = 6501894026593590182L;
   private final AuthorOperator.AuthorType authorType;
   private String userName;
   private String roleName;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
index f9b4553..7e2eb7a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/LoadDataPlan.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
 
 public class LoadDataPlan extends PhysicalPlan {
 
-  private static final long serialVersionUID = -6631296704227106470L;
   private final String inputFilePath;
   private final String measureType;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
index e3b71f5..93a98a3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
 
 public class MetadataPlan extends PhysicalPlan {
 
-  private static final long serialVersionUID = -3717406842093744475L;
   private final MetadataOperator.NamespaceType namespaceType;
   private Path path;
   private TSDataType dataType;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
index 94ec623..2f2c591 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/PropertyPlan.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
  */
 public class PropertyPlan extends PhysicalPlan {
 
-  private static final long serialVersionUID = -1462399624512066104L;
   private final PropertyOperator.PropertyType propertyType;
   private Path propertyPath;
   private Path metadataPath;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/BatchLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/BatchLogReader.java
new file mode 100644
index 0000000..5ffa643
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/BatchLogReader.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.writelog.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+/**
+ * BatchedLogReader reads logs from a binary batch of log in the format of ByteBuffer. The
+ * ByteBuffer must be readable.
+ */
+public class BatchLogReader implements ILogReader{
+
+  Iterator<PhysicalPlan> planIterator;
+
+  public BatchLogReader(ByteBuffer buffer) {
+    List<PhysicalPlan> logs = readLogs(buffer);
+    this.planIterator = logs.iterator();
+  }
+
+  private List<PhysicalPlan> readLogs(ByteBuffer buffer) {
+    List<PhysicalPlan> plans = new ArrayList<>();
+    while (buffer.position() != buffer.limit()) {
+      plans.add(PhysicalPlan.Factory.create(buffer));
+    }
+    return plans;
+  }
+
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return planIterator.hasNext();
+  }
+
+  @Override
+  public PhysicalPlan next() throws IOException {
+    return planIterator.next();
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
index fc1c33d..8fb24f1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.writelog.io;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 /**
@@ -32,11 +33,11 @@ public interface ILogWriter {
    * NOTICE: the logs may be cached in the OS/FileSystem, if the OS/FileSystem you are using do
    * not guarantee strong persistency and you want the logs to be persisted immediately, please
    * call force() after calling this method.
-   * @param logCache WAL logs that have been converted to bytes, each element in this list
-   * represents one log.
+   * Notice: do not flip the buffer before calling this method
+   * @param logBuffer WAL logs that have been converted to bytes
    * @throws IOException
    */
-  void write(List<byte[]> logCache) throws IOException;
+  void write(ByteBuffer logBuffer) throws IOException;
 
   /**
    * force the OS/FileSystem to flush its cache to make sure logs are persisted.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
index 5abc925..903d0a4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java
@@ -23,7 +23,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.List;
 import java.util.zip.CRC32;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,6 +38,8 @@ public class LogWriter implements ILogWriter {
   private FileChannel channel;
   private CRC32 checkSummer = new CRC32();
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
+  private ByteBuffer checkSumBuffer = ByteBuffer.allocate(8);
 
   public LogWriter(String logFilePath) {
     logFile = new File(logFilePath);
@@ -49,25 +50,32 @@ public class LogWriter implements ILogWriter {
   }
 
   @Override
-  public void write(List<byte[]> logCache) throws IOException {
+  public void write(ByteBuffer logBuffer) throws IOException {
     if (channel == null) {
       fileOutputStream = new FileOutputStream(logFile, true);
       channel = fileOutputStream.getChannel();
     }
-    int totalSize = 0;
-    for (byte[] bytes : logCache) {
-      totalSize += 4 + 8 + bytes.length;
-    }
-    ByteBuffer buffer = ByteBuffer.allocate(totalSize);
-    for (byte[] bytes : logCache) {
-      buffer.putInt(bytes.length);
-      checkSummer.reset();
-      checkSummer.update(bytes);
-      buffer.putLong(checkSummer.getValue());
-      buffer.put(bytes);
-    }
-    buffer.flip();
-    channel.write(buffer);
+    logBuffer.flip();
+    int logSize = logBuffer.limit();
+    // 4 bytes length and 8 bytes check sum
+
+    checkSummer.reset();
+    checkSummer.update(logBuffer);
+    long checkSum = checkSummer.getValue();
+
+    logBuffer.flip();
+
+    lengthBuffer.clear();
+    checkSumBuffer.clear();
+    lengthBuffer.putInt(logSize);
+    checkSumBuffer.putLong(checkSum);
+    lengthBuffer.flip();
+    checkSumBuffer.flip();
+
+    channel.write(lengthBuffer);
+    channel.write(logBuffer);
+    channel.write(checkSumBuffer);
+
     if (config.getForceWalPeriodInMs() == 0) {
       channel.force(true);
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
index 445c573..bacb8f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/SingleFileLogReader.java
@@ -24,11 +24,10 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.NoSuchElementException;
 import java.util.zip.CRC32;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,26 +38,26 @@ import org.slf4j.LoggerFactory;
 public class SingleFileLogReader implements ILogReader {
 
   private static final Logger logger = LoggerFactory.getLogger(SingleFileLogReader.class);
-  private int bufferSize = IoTDBDescriptor.getInstance().getConfig().getMaxLogEntrySize();
   public static final int LEAST_LOG_SIZE = 12; // size + checksum
 
   private DataInputStream logStream;
   private String filepath;
 
-  private byte[] buffer = new byte[bufferSize];
+  private byte[] buffer;
   private CRC32 checkSummer = new CRC32();
-  private PhysicalPlan planBuffer = null;
 
   // used to indicate the position of the broken log
   private int idx;
 
+  private BatchLogReader batchLogReader;
+
   public SingleFileLogReader(File logFile) throws FileNotFoundException {
     open(logFile);
   }
 
   @Override
   public boolean hasNext() throws IOException{
-    if (planBuffer != null) {
+    if (batchLogReader != null && batchLogReader.hasNext()) {
       return true;
     }
 
@@ -67,19 +66,26 @@ public class SingleFileLogReader implements ILogReader {
     }
 
     int logSize = logStream.readInt();
-    if (logSize > bufferSize) {
-      bufferSize = logSize;
-      buffer = new byte[bufferSize];
+    if (logSize <= 0) {
+      return false;
+    }
+    buffer = new byte[logSize];
+
+    int readLen = logStream.read(buffer, 0, logSize);
+    if (readLen < logSize) {
+      throw new IOException("Reach eof");
     }
+
     final long checkSum = logStream.readLong();
-    logStream.read(buffer, 0, logSize);
     checkSummer.reset();
     checkSummer.update(buffer, 0, logSize);
     if (checkSummer.getValue() != checkSum) {
-      throw new IOException(String.format("The check sum of the No.%d log is incorrect! In file: "
+      throw new IOException(String.format("The check sum of the No.%d log batch is incorrect! In "
+          + "file: "
           + "%d Calculated: %d.", idx, checkSum, checkSummer.getValue()));
     }
-    planBuffer = PhysicalPlanLogTransfer.logToPlan(buffer);
+
+    batchLogReader = new BatchLogReader(ByteBuffer.wrap(buffer));
     return true;
   }
 
@@ -89,10 +95,8 @@ public class SingleFileLogReader implements ILogReader {
       throw new NoSuchElementException();
     }
 
-    PhysicalPlan ret = planBuffer;
-    planBuffer = null;
     idx ++;
-    return ret;
+    return batchLogReader.next();
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 23cb319..4e1b221 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -14,8 +14,11 @@
  */
 package org.apache.iotdb.db.writelog.node;
 
+import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils.IO;
 import java.io.File;
 import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -42,6 +45,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   public static final String WAL_FILE_NAME = "wal";
   private static final Logger logger = LoggerFactory.getLogger(ExclusiveWriteLogNode.class);
+  private static int logBufferSize = 64*1024*1024;
 
   private String identifier;
 
@@ -51,13 +55,15 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
 
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private List<byte[]> logCache = new ArrayList<>(config.getFlushWalThreshold());
+  private ByteBuffer logBuffer = ByteBuffer.allocate(logBufferSize);
 
   private ReadWriteLock lock = new ReentrantReadWriteLock();
 
   private long fileId = 0;
   private long lastFlushedId = 0;
 
+  private int bufferedLogNum = 0;
+
   /**
    * constructor of ExclusiveWriteLogNode.
    *
@@ -75,16 +81,26 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     lock.writeLock().lock();
     try {
       long start = System.currentTimeMillis();
-      byte[] logBytes = PhysicalPlanLogTransfer.planToLog(plan);
-      logCache.add(logBytes);
+      logBuffer.mark();
+      try {
+        plan.serializeTo(logBuffer);
+      } catch (BufferOverflowException e) {
+        logBuffer.reset();
+        sync();
+        plan.serializeTo(logBuffer);
+      }
+
+      bufferedLogNum ++;
 
-      if (logCache.size() >= config.getFlushWalThreshold()) {
+      if (bufferedLogNum >= config.getFlushWalThreshold()) {
         sync();
       }
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
         logger.info("WAL insert cost {} ms", elapse);
       }
+    } catch (BufferOverflowException e) {
+      throw new IOException("Log cannot fit into buffer", e);
     } finally {
       lock.writeLock().unlock();
     }
@@ -167,7 +183,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     lock.writeLock().lock();
     try {
       long start = System.currentTimeMillis();
-      logCache.clear();
+      logBuffer.clear();
       close();
       FileUtils.deleteDirectory(new File(logDirectory));
       long elapse = System.currentTimeMillis() - start;
@@ -204,7 +220,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     lock.writeLock().lock();
     try {
       long start = System.currentTimeMillis();
-      logger.debug("Log node {} starts force, {} logs to be forced", identifier, logCache.size());
+      logger.debug("Log node {} starts force, {} logs to be forced", identifier, bufferedLogNum);
       try {
         if (currentFileWriter != null) {
           currentFileWriter.force();
@@ -226,16 +242,18 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     lock.writeLock().lock();
     try {
       long start = System.currentTimeMillis();
-      logger.debug("Log node {} starts sync, {} logs to be synced", identifier, logCache.size());
-      if (logCache.isEmpty()) {
+      logger.debug("Log node {} starts sync, {} logs to be synced", identifier, bufferedLogNum);
+      if (bufferedLogNum == 0) {
         return;
       }
       try {
-        getCurrentFileWriter().write(logCache);
+        getCurrentFileWriter().write(logBuffer);
       } catch (IOException e) {
         logger.error("Log node {} sync failed", identifier, e);
       }
-      logCache.clear();
+      logBuffer.clear();
+      bufferedLogNum = 0;
+
       logger.debug("Log node {} ends sync.", identifier);
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 16862cb..586eb79 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
@@ -74,14 +75,14 @@ public class WalCheckerTest {
         LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
             + WAL_FILE_NAME);
 
-        List<byte[]> binaryPlans = new ArrayList<>();
+        ByteBuffer binaryPlans = ByteBuffer.allocate(64*1024);
         String deviceId = "device1";
         String[] measurements = new String[]{"s1", "s2", "s3"};
         String[] values = new String[]{"5", "6", "7"};
         for (int j = 0; j < 10; j++) {
-          binaryPlans.add(PhysicalPlanLogTransfer
-              .planToLog(new InsertPlan(deviceId, j, measurements, values)));
+          new InsertPlan(deviceId, j, measurements, values).serializeTo(binaryPlans);
         }
+        binaryPlans.flip();
         logWriter.write(binaryPlans);
         logWriter.force();
 
@@ -107,17 +108,17 @@ public class WalCheckerTest {
         LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
             + WAL_FILE_NAME);
 
-        List<byte[]> binaryPlans = new ArrayList<>();
+        ByteBuffer binaryPlans = ByteBuffer.allocate(64*1024);
         String deviceId = "device1";
         String[] measurements = new String[]{"s1", "s2", "s3"};
         String[] values = new String[]{"5", "6", "7"};
         for (int j = 0; j < 10; j++) {
-          binaryPlans.add(PhysicalPlanLogTransfer
-              .planToLog(new InsertPlan(deviceId, j, measurements, values)));
+          new InsertPlan(deviceId, j, measurements, values).serializeTo(binaryPlans);
         }
         if (i > 2) {
-          binaryPlans.add("not a wal".getBytes());
+          binaryPlans.put("not a wal".getBytes());
         }
+        binaryPlans.flip();
         logWriter.write(binaryPlans);
         logWriter.force();
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index 1aa083e..22b9469 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
@@ -88,11 +87,9 @@ public class WriteLogNodeManagerTest {
     InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
         new String[]{"s1", "s2", "s3", "s4"},
         new String[]{"1.0", "15", "str", "false"});
-    UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
     DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
 
     logNode.write(bwInsertPlan);
-    logNode.write(updatePlan);
     logNode.write(deletePlan);
 
     File walFile = new File(logNode.getLogDirectory() + File.separator + "wal1");
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index b01988a..eacc136 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -68,11 +68,9 @@ public class WriteLogNodeTest {
     InsertPlan bwInsertPlan = new InsertPlan(1, identifier, 100,
         new String[]{"s1", "s2", "s3", "s4"},
         new String[]{"1.0", "15", "str", "false"});
-    UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path(identifier + ".s1"));
     DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
 
     logNode.write(bwInsertPlan);
-    logNode.write(updatePlan);
     logNode.write(deletePlan);
 
     logNode.close();
@@ -83,7 +81,6 @@ public class WriteLogNodeTest {
 
     ILogReader reader = logNode.getLogReader();
     assertEquals(bwInsertPlan, reader.next());
-    assertEquals(updatePlan, reader.next());
     assertEquals(deletePlan, reader.next());
 
     logNode.delete();
@@ -100,24 +97,15 @@ public class WriteLogNodeTest {
     InsertPlan bwInsertPlan = new InsertPlan(1, identifier, 100,
         new String[]{"s1", "s2", "s3", "s4"},
         new String[]{"1.0", "15", "str", "false"});
-    UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path(identifier + ".s1"));
     DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
 
     logNode.write(bwInsertPlan);
     logNode.notifyStartFlush();
-    logNode.write(updatePlan);
-    logNode.notifyStartFlush();
     logNode.write(deletePlan);
     logNode.notifyStartFlush();
 
     ILogReader logReader = logNode.getLogReader();
     assertEquals(bwInsertPlan, logReader.next());
-    assertEquals(updatePlan, logReader.next());
-    assertEquals(deletePlan, logReader.next());
-
-    logNode.notifyEndFlush();
-    logReader = logNode.getLogReader();
-    assertEquals(updatePlan, logReader.next());
     assertEquals(deletePlan, logReader.next());
 
     logNode.notifyEndFlush();
@@ -135,18 +123,16 @@ public class WriteLogNodeTest {
   public void testSyncThreshold() throws IOException {
     // this test checks that if more logs than threshold are written, a sync will be triggered.
     int flushWalThreshold = config.getFlushWalThreshold();
-    config.setFlushWalThreshold(3);
+    config.setFlushWalThreshold(2);
 
     WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice");
 
     InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
         new String[]{"s1", "s2", "s3", "s4"},
         new String[]{"1.0", "15", "str", "false"});
-    UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
     DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
 
     logNode.write(bwInsertPlan);
-    logNode.write(updatePlan);
 
     File walFile = new File(
         config.getWalFolder() + File.separator + "root.logTestDevice" + File.separator + "wal1");
@@ -169,11 +155,9 @@ public class WriteLogNodeTest {
     InsertPlan bwInsertPlan = new InsertPlan(1, "logTestDevice", 100,
         new String[]{"s1", "s2", "s3", "s4"},
         new String[]{"1.0", "15", "str", "false"});
-    UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
     DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
 
     logNode.write(bwInsertPlan);
-    logNode.write(updatePlan);
     logNode.write(deletePlan);
 
     logNode.forceSync();
@@ -194,7 +178,7 @@ public class WriteLogNodeTest {
 
     InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice.oversize", 100,
         new String[]{"s1", "s2", "s3", "s4"},
-        new String[]{"1.0", "15", new String(new char[4 * 1024 * 1024]), "false"});
+        new String[]{"1.0", "15", new String(new char[65 * 1024 * 1024]), "false"});
 
     boolean caught = false;
     try {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index 0febe3b..c97cb13 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -19,17 +19,17 @@
 
 package org.apache.iotdb.db.writelog.io;
 
-import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,7 +37,7 @@ import org.junit.Test;
 public class LogWriterReaderTest {
 
   private static String filePath = "logtest.test";
-  List<byte[]> logs = new ArrayList<>();
+  ByteBuffer logsBuffer = ByteBuffer.allocate(64*1024);
   List<PhysicalPlan> plans = new ArrayList<>();
 
   @Before
@@ -49,32 +49,29 @@ public class LogWriterReaderTest {
         new String[]{"1", "2"});
     InsertPlan insertPlan2 = new InsertPlan(2, "d1", 10L, new String[]{"s1", "s2"},
         new String[]{"1", "2"});
-    UpdatePlan updatePlan = new UpdatePlan(8L, 11L, "3", new Path("root.d1.s1"));
     DeletePlan deletePlan = new DeletePlan(10L, new Path("root.d1.s1"));
     plans.add(insertPlan1);
     plans.add(insertPlan2);
-    plans.add(updatePlan);
     plans.add(deletePlan);
     for (PhysicalPlan plan : plans) {
-      logs.add(PhysicalPlanLogTransfer.planToLog(plan));
-
+      plan.serializeTo(logsBuffer);
     }
   }
 
   @Test
   public void testWriteAndRead() throws IOException {
     LogWriter writer = new LogWriter(filePath);
-    writer.write(logs);
+    writer.write(logsBuffer);
     try {
       writer.force();
       writer.close();
       SingleFileLogReader reader = new SingleFileLogReader(new File(filePath));
-      List<byte[]> res = new ArrayList<>();
+      List<PhysicalPlan> res = new ArrayList<>();
       while (reader.hasNext()) {
-        res.add(PhysicalPlanLogTransfer.planToLog(reader.next()));
+        res.add(reader.next());
       }
-      for (int i = 0; i < logs.size(); i++) {
-        assertArrayEquals(logs.get(i), res.get(i));
+      for (int i = 0; i < plans.size(); i++) {
+        assertEquals(plans.get(i), res.get(i));
       }
       reader.close();
     } finally {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
index 1729cfa..0530aa4 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
@@ -23,12 +23,10 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.ByteBuffer;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.transfer.PhysicalPlanLogTransfer;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.junit.After;
 import org.junit.Before;
@@ -51,13 +49,12 @@ public class MultiFileLogReaderTest {
         fileLogs[i][j] = new DeletePlan(i * logsPerFile + j, new Path("path" + j));
       }
 
-      List<byte[]> logCache = new ArrayList<>();
+      ByteBuffer buffer = ByteBuffer.allocate(64*1024);
       for (PhysicalPlan plan : fileLogs[i]) {
-        logCache.add(PhysicalPlanLogTransfer.planToLog(plan));
+        plan.serializeTo(buffer);
       }
-
       ILogWriter writer = new LogWriter(logFiles[i]);
-      writer.write(logCache);
+      writer.write(buffer);
       writer.force();
       writer.close();
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index e99b021..aa5961f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -88,6 +88,7 @@ public class TsFileIOWriter {
    */
   public TsFileIOWriter(File file) throws IOException {
     this.out = new DefaultTsFileOutput(file);
+    startFile();
   }
 
   /**