You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/12/09 02:23:04 UTC

[iotdb] branch master updated: [IOTDB-5158] Fix InputStream may skip over some smaller number of bytes (#8388)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fd0bcea3b9 [IOTDB-5158] Fix InputStream may skip over some smaller number of bytes  (#8388)
fd0bcea3b9 is described below

commit fd0bcea3b947fbd065a9963c4f05b1148ffebb40
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Dec 9 10:22:57 2022 +0800

    [IOTDB-5158] Fix InputStream may skip over some smaller number of bytes  (#8388)
---
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  4 ++--
 .../iotdb/tsfile/file/header/ChunkHeader.java      |  2 +-
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       | 24 +++++++++++++++++++++
 .../iotdb/tsfile/utils/ReadWriteIOUtilsTest.java   | 25 ++++++++++++++++++++++
 4 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index 066bbc17e2..3a745f532d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -163,11 +163,11 @@ public class DeviceTimeIndex implements ITimeIndex {
    */
   public static Set<String> getDevices(InputStream inputStream) throws IOException {
     int deviceNum = ReadWriteIOUtils.readInt(inputStream);
-    inputStream.skip(2L * deviceNum * ReadWriteIOUtils.LONG_LEN);
+    ReadWriteIOUtils.skip(inputStream, 2L * deviceNum * ReadWriteIOUtils.LONG_LEN);
     Set<String> devices = new HashSet<>();
     for (int i = 0; i < deviceNum; i++) {
       String path = ReadWriteIOUtils.readString(inputStream).intern();
-      inputStream.skip(ReadWriteIOUtils.INT_LEN);
+      ReadWriteIOUtils.skip(inputStream, ReadWriteIOUtils.INT_LEN);
       devices.add(path);
     }
     return devices;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 151e170495..02763f38cd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -206,7 +206,7 @@ public class ChunkHeader {
   public static Pair<CompressionType, TSEncoding> deserializeCompressionTypeAndEncoding(
       InputStream inputStream) throws IOException {
     ReadWriteForEncodingUtils.readUnsignedVarInt(inputStream);
-    inputStream.skip(Byte.BYTES); // skip Data type
+    ReadWriteIOUtils.skip(inputStream, Byte.BYTES); // skip Data type
     CompressionType type = ReadWriteIOUtils.readCompressionType(inputStream);
     TSEncoding encoding = ReadWriteIOUtils.readEncoding(inputStream);
     return new Pair<>(type, encoding);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 20f9e66b75..f08a1fdb4a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -1059,4 +1059,28 @@ public class ReadWriteIOUtils {
     clone.flip();
     return clone;
   }
+
+  /**
+   * The skip method of will return only if skipping n bytes or reaching end of file.
+   *
+   * @param inputStream the inputSteam to be skipped.
+   * @param n the number of bytes to be skipped.
+   * @throws IOException if the stream does not support seek, or if some other I/O error occurs.
+   */
+  public static void skip(InputStream inputStream, long n) throws IOException {
+    while (n > 0) {
+      long skipN = inputStream.skip(n);
+      if (skipN > 0) {
+        n -= skipN;
+      } else {
+        // read one byte to decide should we retry
+        if (inputStream.read() == -1) {
+          // EOF
+          break;
+        } else {
+          n--;
+        }
+      }
+    }
+  }
 }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtilsTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtilsTest.java
index 7662508f0a..14f22f752f 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtilsTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtilsTest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.tsfile.utils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -163,4 +165,27 @@ public class ReadWriteIOUtilsTest {
     result = ReadWriteIOUtils.readMap(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
     Assert.assertNull(result);
   }
+
+  @Test
+  public void skipInputStreamTest() {
+    int expectedSkipNum = 9000;
+    int totalNum = 10000;
+    try (BufferedInputStream inputStream =
+        new BufferedInputStream(new ByteArrayInputStream(new byte[totalNum]))) {
+      ReadWriteIOUtils.readByte(inputStream);
+      long skipN = inputStream.skip(expectedSkipNum);
+      Assert.assertNotEquals(expectedSkipNum, skipN);
+      Assert.assertNotEquals(totalNum - expectedSkipNum - 1, inputStream.available());
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    try (BufferedInputStream inputStream =
+        new BufferedInputStream(new ByteArrayInputStream(new byte[totalNum]))) {
+      ReadWriteIOUtils.readByte(inputStream);
+      ReadWriteIOUtils.skip(inputStream, expectedSkipNum);
+      Assert.assertEquals(totalNum - expectedSkipNum - 1, inputStream.available());
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
 }