You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2021/03/23 21:23:03 UTC
[orc] branch master updated: ORC-758: Avoid seeking and
decompressing of compressed stream,
if the required seek is already satisfied by the current decompressed block
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new d68d6db ORC-758: Avoid seeking and decompressing of compressed stream, if the required seek is already satisfied by the current decompressed block
d68d6db is described below
commit d68d6db859bd930f169f7979b21af47f3405e7c2
Author: Pavan Lanka <pl...@apple.com>
AuthorDate: Thu Mar 11 17:33:33 2021 -0800
ORC-758: Avoid seeking and decompressing of compressed stream, if the required seek is
already satisfied by the current decompressed block
Fixes #652
Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
.../src/java/org/apache/orc/impl/InStream.java | 50 ++++++---
.../src/test/org/apache/orc/impl/TestInStream.java | 113 +++++++++++++++++++++
2 files changed, 151 insertions(+), 12 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index a374dc2..53c2afa 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -397,13 +397,14 @@ public abstract class InStream extends InputStream {
}
}
- private static class CompressedStream extends InStream {
+ public static class CompressedStream extends InStream {
private final int bufferSize;
private ByteBuffer uncompressed;
private final CompressionCodec codec;
protected ByteBuffer compressed;
protected DiskRangeList currentRange;
private boolean isUncompressedOriginal;
+ protected long currentCompressedStart = -1;
/**
* Create the stream without resetting the input stream.
@@ -471,6 +472,7 @@ public abstract class InStream extends InputStream {
}
private void readHeader() throws IOException {
+ currentCompressedStart = this.position;
int b0 = readHeaderByte();
int b1 = readHeaderByte();
int b2 = readHeaderByte();
@@ -488,11 +490,14 @@ public abstract class InStream extends InputStream {
isUncompressedOriginal = true;
} else {
if (isUncompressedOriginal) {
+ // Since the previous chunk was uncompressed, allocate the buffer and set original false
allocateForUncompressed(bufferSize, slice.isDirect());
isUncompressedOriginal = false;
} else if (uncompressed == null) {
+ // If the buffer was not allocated then allocate the same
allocateForUncompressed(bufferSize, slice.isDirect());
} else {
+ // Since the buffer is already allocated just clear the same
uncompressed.clear();
}
codec.decompress(slice, uncompressed);
@@ -551,15 +556,25 @@ public abstract class InStream extends InputStream {
@Override
public void seek(PositionProvider index) throws IOException {
- seek(index.getNext());
+ boolean seeked = seek(index.getNext());
long uncompressedBytes = index.getNext();
- if (uncompressedBytes != 0) {
- readHeader();
- uncompressed.position(uncompressed.position() +
- (int) uncompressedBytes);
- } else if (uncompressed != null) {
- // mark the uncompressed buffer as done
- uncompressed.position(uncompressed.limit());
+ if (!seeked) {
+ if (uncompressed != null) {
+ // Only reposition uncompressed
+ uncompressed.position((int) uncompressedBytes);
+ }
+ // uncompressed == null should not happen as !seeked would mean that a previous
+ // readHeader has taken place
+ } else {
+ if (uncompressedBytes != 0) {
+ // Decompress compressed as a seek has taken place and position uncompressed
+ readHeader();
+ uncompressed.position(uncompressed.position() +
+ (int) uncompressedBytes);
+ } else if (uncompressed != null) {
+ // mark the uncompressed buffer as done
+ uncompressed.position(uncompressed.limit());
+ }
}
}
@@ -621,9 +636,20 @@ public abstract class InStream extends InputStream {
chunkLength + " bytes");
}
- void seek(long desired) throws IOException {
+ /**
+ * Seek to the desired chunk based on the input position.
+ *
+ * @param desired position in the compressed stream
+ * @return Indicates whether a seek was performed or not
+ * @throws IOException when seeking outside the stream bounds
+ */
+ boolean seek(long desired) throws IOException {
if (desired == 0 && bytes == null) {
- return;
+ return false;
+ }
+ if (desired == currentCompressedStart) {
+ // Header already at the required position
+ return false;
}
long posn = desired + offset;
for (DiskRangeList range = bytes; range != null; range = range.next) {
@@ -632,7 +658,7 @@ public abstract class InStream extends InputStream {
posn < range.getEnd())) {
position = desired;
setCurrent(range, true);
- return;
+ return true;
}
}
throw new IOException("Seek outside of data in " + this + " to " + desired);
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index b5accb6..e6b05b5 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -19,6 +19,11 @@
package org.apache.orc.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataInputStream;
@@ -441,6 +446,114 @@ public class TestInStream {
}
}
+ private long seekPosition(long prevPos,
+ PositionCollector[] positions,
+ int posIdx,
+ InStream in,
+ boolean needsSeek)
+ throws IOException {
+ if (needsSeek) {
+ assertNotEquals(prevPos, positions[posIdx].getNext());
+ } else {
+ assertEquals(prevPos, positions[posIdx].getNext());
+ }
+ positions[posIdx].reset();
+ ByteBuffer c = ((InStream.CompressedStream) in).compressed;
+ in.seek(positions[posIdx]);
+ assertEquals(posIdx & 0xff, in.read());
+ if (needsSeek) {
+ assertNotSame(c, ((InStream.CompressedStream) in).compressed);
+ } else {
+ assertSame(c, ((InStream.CompressedStream) in).compressed);
+ }
+ positions[posIdx].reset();
+ return positions[posIdx].getNext();
+ }
+
+ @Test
+ public void testCompressedSeeks() throws Exception {
+ // We test two scenarios one where the stream is perfectly aligned with the DiskRange and the
+ // other where it requires an offset
+ for (int offset : new int[]{0, 10}) {
+ int compValues = 1024;
+ int origValues = 100;
+ PositionCollector[] positions = new PositionCollector[compValues + origValues];
+ byte[] compBytes = getCompressed(positions);
+ assertEquals(961, compBytes.length);
+ // Add an original chunk at the end
+ byte[] bytes = new byte[compBytes.length + 3 + origValues + offset];
+ System.arraycopy(compBytes, 0, bytes, offset, compBytes.length);
+ int startPos = offset + compBytes.length;
+ // Write original header
+ bytes[startPos] = (byte) ((origValues << 1) + 1);
+ bytes[startPos + 1] = (byte) (origValues >> 7);
+ bytes[startPos + 2] = (byte) (origValues >> 15);
+ for (int i = 0; i < 100; i++) {
+ positions[compValues + i] = new PositionCollector();
+ positions[compValues + i].addPosition(compBytes.length);
+ positions[compValues + i].addPosition(i);
+ bytes[startPos + 3 + i] = (byte) (compValues + i);
+ }
+ InStream in = InStream.create("test", new BufferChunk(ByteBuffer.wrap(bytes), 0), offset,
+ compBytes.length + 3 + origValues,
+ InStream.options()
+ .withCodec(new ZlibCodec())
+ .withBufferSize(300));
+ assertEquals("compressed stream test position: 0 length: 1064 range: 0" +
+ String.format(" offset: %d limit: %d range 0 = 0 to %d",
+ offset,
+ bytes.length,
+ bytes.length),
+ in.toString());
+
+ // Position to the last
+ long currPos = positions[positions.length - 1].getNext();
+ positions[positions.length - 1].reset();
+ in.seek(positions[positions.length - 1]);
+
+ // Seek to the first should reposition compressed
+ currPos = seekPosition(currPos, positions, 0, in, true);
+ // Seek to next position should not require a seek
+ currPos = seekPosition(currPos, positions, 1, in, false);
+
+ // Seek to 301 which should require a seek
+ currPos = seekPosition(currPos, positions, 301, in, true);
+ // Seek to next position should not require a seek
+ seekPosition(currPos, positions, 302, in, false);
+
+ // Seek to 601 which should require a seek
+ currPos = seekPosition(currPos, positions, 601, in, true);
+ // Seek to next position should not require a seek
+ seekPosition(currPos, positions, 602, in, false);
+
+ // Seek to 1024 which should seek to original
+ currPos = seekPosition(currPos, positions, 1024, in, true);
+ // Seek to next position should not require a seek
+ seekPosition(currPos, positions, 1025, in, false);
+ seekPosition(currPos, positions, 1026, in, false);
+ }
+ }
+
+ @Test
+ public void testInvalidSeek() throws Exception {
+ PositionCollector[] positions = new PositionCollector[1024];
+ byte[] bytes = getCompressed(positions);
+
+ assertEquals(961, bytes.length);
+ InStream in = InStream.create("test", new BufferChunk(ByteBuffer.wrap(bytes), 0), 0,
+ bytes.length, InStream.options().withCodec(new ZlibCodec()).withBufferSize(300));
+ assertEquals("compressed stream test position: 0 length: 961 range: 0" +
+ " offset: 0 limit: 961 range 0 = 0 to 961",
+ in.toString());
+
+ PositionCollector invalidPosition = new PositionCollector();
+ invalidPosition.addPosition(-1);
+ invalidPosition.addPosition(0);
+ in.seek(invalidPosition);
+ assertEquals(0, in.read());
+ assertEquals(1, in.read());
+ }
+
@Test
public void testCompressedPartial() throws Exception {
PositionCollector[] positions = new PositionCollector[1024];