You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2021/12/28 12:49:32 UTC
[hadoop] branch branch-3.3 updated: HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)
This is an automated email from the ASF dual-hosted git repository.
aajisaka pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 05b43f2 HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)
05b43f2 is described below
commit 05b43f205758a39ac5af25a9c7b699704e3b99d2
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Tue Dec 28 18:14:38 2021 +0530
HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)
Co-authored-by: xuzq <xu...@kuaishou.com>
Co-authored-by: Ashutosh Gupta <as...@amazon.com>
Signed-off-by: Akira Ajisaka <aa...@apache.org>
(cherry picked from commit caab29ec889a0771191b58714c306439b2415d91)
---
.../io/compress/zstd/ZStandardDecompressor.java | 13 +++++
.../zstd/TestZStandardCompressorDecompressor.java | 59 ++++++++++++++++++++++
2 files changed, 72 insertions(+)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
index bc9d29c..adf2fe6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
@@ -113,6 +113,12 @@ public class ZStandardDecompressor implements Decompressor {
compressedDirectBuf.put(
userBuf, userBufOff, bytesInCompressedBuffer);
+ // Set the finished to false when compressedDirectBuf still
+ // contains some bytes.
+ if (compressedDirectBuf.position() > 0 && finished) {
+ finished = false;
+ }
+
userBufOff += bytesInCompressedBuffer;
userBufferBytesToConsume -= bytesInCompressedBuffer;
}
@@ -186,6 +192,13 @@ public class ZStandardDecompressor implements Decompressor {
0,
directBufferSize
);
+
+ // Set the finished to false when compressedDirectBuf still
+ // contains some bytes.
+ if (remaining > 0 && finished) {
+ finished = false;
+ }
+
uncompressedDirectBuf.limit(n);
// Get at most 'len' bytes
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
index dcfb7e9..653225b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java
@@ -234,6 +234,65 @@ public class TestZStandardCompressorDecompressor {
}
}
+ /**
+ * Verify decompressor logic with some finish operation in compress.
+ */
+ @Test
+ public void testCompressorDecompressorWithFinish() throws Exception {
+ DataOutputStream deflateOut = null;
+ DataInputStream inflateIn = null;
+ int byteSize = 1024 * 100;
+ byte[] bytes = generate(byteSize);
+ int firstLength = 1024 * 30;
+
+ int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
+ try {
+ DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+ CompressionOutputStream deflateFilter =
+ new CompressorStream(compressedDataBuffer, new ZStandardCompressor(),
+ bufferSize);
+
+ deflateOut =
+ new DataOutputStream(new BufferedOutputStream(deflateFilter));
+
+ // Write some data and finish.
+ deflateOut.write(bytes, 0, firstLength);
+ deflateFilter.finish();
+ deflateOut.flush();
+
+ // ResetState then write some data and finish.
+ deflateFilter.resetState();
+ deflateOut.write(bytes, firstLength, firstLength);
+ deflateFilter.finish();
+ deflateOut.flush();
+
+ // ResetState then write some data and finish.
+ deflateFilter.resetState();
+ deflateOut.write(bytes, firstLength * 2, byteSize - firstLength * 2);
+ deflateFilter.finish();
+ deflateOut.flush();
+
+ DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
+ deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
+ compressedDataBuffer.getLength());
+
+ CompressionInputStream inflateFilter =
+ new DecompressorStream(deCompressedDataBuffer,
+ new ZStandardDecompressor(bufferSize), bufferSize);
+
+ inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
+
+ byte[] result = new byte[byteSize];
+ inflateIn.read(result);
+ assertArrayEquals(
+ "original array not equals compress/decompressed array", bytes,
+ result);
+ } finally {
+ IOUtils.closeStream(deflateOut);
+ IOUtils.closeStream(inflateIn);
+ }
+ }
+
@Test
public void testZStandardCompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx =
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org