You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/11/02 14:27:47 UTC

[iotdb] branch master updated: [IOTDB-1609] Print the actual size of plan when it exceeds the wal_buffer_size (#4279)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 25a98ee  [IOTDB-1609] Print the actual size of plan when it exceeds the wal_buffer_size (#4279)
25a98ee is described below

commit 25a98ee165131047cda93dc92203db2ab9aecbc8
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Tue Nov 2 22:27:22 2021 +0800

    [IOTDB-1609] Print the actual size of plan when it exceeds the wal_buffer_size (#4279)
---
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  | 17 +++++++
 .../iotdb/db/qp/utils/EmptyOutputStream.java       | 34 ++++++++++++++
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 10 +++--
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java | 52 ++++++++++++++++++++++
 4 files changed, 109 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index e0371ef..6a3ab45 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
+import org.apache.iotdb.db.qp.utils.EmptyOutputStream;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -152,6 +153,22 @@ public abstract class PhysicalPlan {
   }
 
   /**
+   * Calculate size after serialization.
+   *
+   * @return size
+   * @throws IOException
+   */
+  public int getSerializedSize() throws IOException {
+    try {
+      DataOutputStream dataOutputStream = new DataOutputStream(new EmptyOutputStream());
+      serialize(dataOutputStream);
+      return dataOutputStream.size();
+    } catch (UnsupportedOperationException e) {
+      throw e;
+    }
+  }
+
+  /**
    * Serialize the plan into the given buffer. All necessary fields will be serialized.
    *
    * @param stream
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/EmptyOutputStream.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/EmptyOutputStream.java
new file mode 100644
index 0000000..ee8fde8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/EmptyOutputStream.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/** Empty OutputStream To count serialize size without serialization */
+public class EmptyOutputStream extends OutputStream {
+
+  @Override
+  public void write(int b) throws IOException {}
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {}
+
+  public void write(byte b[]) throws IOException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 7802255..ff55240 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -33,9 +33,7 @@ import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+import java.io.*;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
@@ -116,7 +114,11 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       // if the size of a single plan bigger than logBufferWorking
       // we need to clear the buffer to drop something wrong that has written.
       logBufferWorking.clear();
-      throw new IOException("Log cannot fit into the buffer, please increase wal_buffer_size", e);
+      int neededSize = plan.getSerializedSize();
+      throw new IOException(
+          "Log cannot fit into the buffer, please increase wal_buffer_size to more than "
+              + neededSize * 2,
+          e);
     } finally {
       lock.unlock();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index fd7cd0d..8498c5a 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -43,6 +43,8 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.assertFalse;
@@ -298,6 +300,7 @@ public class WriteLogNodeTest {
     WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize");
     logNode.initBuffer(byteBuffers);
 
+    int neededSize = 0;
     InsertRowPlan bwInsertPlan =
         new InsertRowPlan(
             new PartialPath("root.logTestDevice.oversize"),
@@ -313,6 +316,11 @@ public class WriteLogNodeTest {
       logNode.write(bwInsertPlan);
     } catch (IOException e) {
       caught = true;
+      Pattern r = Pattern.compile("\\d+");
+      Matcher m = r.matcher(e.getMessage());
+      if (m.find()) {
+        neededSize = Integer.valueOf(m.group());
+      }
     }
     assertTrue(caught);
 
@@ -346,5 +354,49 @@ public class WriteLogNodeTest {
     for (ByteBuffer byteBuffer : array) {
       MmapUtil.clean((MappedByteBuffer) byteBuffer);
     }
+
+    // try to set wal_buffer_size according to error message
+    IoTDBDescriptor.getInstance().getConfig().setWalBufferSize(neededSize);
+    WriteLogNode logNode2 = new ExclusiveWriteLogNode("root.logTestDevice.oversize");
+    logNode2.initBuffer(byteBuffers);
+    ByteBuffer[] byteBuffers2 = new ByteBuffer[2];
+    byteBuffers2[0] =
+        ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    byteBuffers2[1] =
+        ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    logNode2.initBuffer(byteBuffers2);
+    caught = false;
+    try {
+      logNode2.write(bwInsertPlan);
+    } catch (IOException e) {
+      caught = true;
+    }
+    assertFalse(caught);
+    array = logNode2.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
+
+    // try to set wal_buffer_size less than error message
+    IoTDBDescriptor.getInstance().getConfig().setWalBufferSize(neededSize - 1);
+    WriteLogNode logNode3 = new ExclusiveWriteLogNode("root.logTestDevice.oversize");
+    logNode3.initBuffer(byteBuffers);
+    ByteBuffer[] byteBuffers3 = new ByteBuffer[2];
+    byteBuffers3[0] =
+        ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    byteBuffers3[1] =
+        ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+    logNode3.initBuffer(byteBuffers3);
+    caught = false;
+    try {
+      logNode3.write(bwInsertPlan);
+    } catch (IOException e) {
+      caught = true;
+    }
+    assertTrue(caught);
+    array = logNode3.delete();
+    for (ByteBuffer byteBuffer : array) {
+      MmapUtil.clean((MappedByteBuffer) byteBuffer);
+    }
   }
 }