You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2021/12/28 12:49:32 UTC

[hadoop] branch branch-3.3 updated: HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 05b43f2  HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)
05b43f2 is described below

commit 05b43f205758a39ac5af25a9c7b699704e3b99d2
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Tue Dec 28 18:14:38 2021 +0530

    HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)
    
    Co-authored-by: xuzq <xu...@kuaishou.com>
    Co-authored-by: Ashutosh Gupta <as...@amazon.com>
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
    (cherry picked from commit caab29ec889a0771191b58714c306439b2415d91)
---
 .../io/compress/zstd/ZStandardDecompressor.java    | 13 +++++
 .../zstd/TestZStandardCompressorDecompressor.java  | 59 ++++++++++++++++++++++
 2 files changed, 72 insertions(+)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
index bc9d29c..adf2fe6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
@@ -113,6 +113,12 @@ public class ZStandardDecompressor implements Decompressor {
     compressedDirectBuf.put(
         userBuf, userBufOff, bytesInCompressedBuffer);
 
+    // Set the finished to false when compressedDirectBuf still
+    // contains some bytes.
+    if (compressedDirectBuf.position() > 0 && finished) {
+      finished = false;
+    }
+
     userBufOff += bytesInCompressedBuffer;
     userBufferBytesToConsume -= bytesInCompressedBuffer;
   }
@@ -186,6 +192,13 @@ public class ZStandardDecompressor implements Decompressor {
         0,
         directBufferSize
     );
+
+    // Set the finished to false when compressedDirectBuf still
+    // contains some bytes.
+    if (remaining > 0 && finished) {
+      finished = false;
+    }
+
     uncompressedDirectBuf.limit(n);
 
     // Get at most 'len' bytes
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
index dcfb7e9..653225b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
@@ -234,6 +234,65 @@ public class TestZStandardCompressorDecompressor {
     }
   }
 
+  /**
+   * Verify decompressor logic with some finish operation in compress.
+   */
+  @Test
+  public void testCompressorDecompressorWithFinish() throws Exception {
+    DataOutputStream deflateOut = null;
+    DataInputStream inflateIn = null;
+    int byteSize = 1024 * 100;
+    byte[] bytes = generate(byteSize);
+    int firstLength = 1024 * 30;
+
+    int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
+    try {
+      DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+      CompressionOutputStream deflateFilter =
+              new CompressorStream(compressedDataBuffer, new ZStandardCompressor(),
+                      bufferSize);
+
+      deflateOut =
+              new DataOutputStream(new BufferedOutputStream(deflateFilter));
+
+      // Write some data and finish.
+      deflateOut.write(bytes, 0, firstLength);
+      deflateFilter.finish();
+      deflateOut.flush();
+
+      // ResetState then write some data and finish.
+      deflateFilter.resetState();
+      deflateOut.write(bytes, firstLength, firstLength);
+      deflateFilter.finish();
+      deflateOut.flush();
+
+      // ResetState then write some data and finish.
+      deflateFilter.resetState();
+      deflateOut.write(bytes, firstLength * 2, byteSize - firstLength * 2);
+      deflateFilter.finish();
+      deflateOut.flush();
+
+      DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
+      deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
+              compressedDataBuffer.getLength());
+
+      CompressionInputStream inflateFilter =
+              new DecompressorStream(deCompressedDataBuffer,
+                      new ZStandardDecompressor(bufferSize), bufferSize);
+
+      inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
+
+      byte[] result = new byte[byteSize];
+      inflateIn.read(result);
+      assertArrayEquals(
+              "original array not equals compress/decompressed array", bytes,
+              result);
+    } finally {
+      IOUtils.closeStream(deflateOut);
+      IOUtils.closeStream(inflateIn);
+    }
+  }
+
   @Test
   public void testZStandardCompressDecompressInMultiThreads() throws Exception {
     MultithreadedTestUtil.TestContext ctx =

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org