You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/10/01 21:39:28 UTC

[orc] branch branch-1.6 updated: ORC-555: Fix IllegalArgumentException on large compressed footers.

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 293fd37  ORC-555: Fix IllegalArgumentException on large compressed footers.
293fd37 is described below

commit 293fd373780acee4b06307795dce72a430be0ba4
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon Sep 30 19:30:30 2019 -0700

    ORC-555: Fix IllegalArgumentException on large compressed footers.
    
    Fixes #432
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
---
 .../src/java/org/apache/orc/impl/InStream.java     |  69 ++++++------
 .../src/test/org/apache/orc/impl/TestInStream.java | 118 +++++++++++++++++++++
 2 files changed, 156 insertions(+), 31 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index 991f725..5508e76 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -432,6 +432,11 @@ public abstract class InStream extends InputStream {
      */
     void reset(DiskRangeList input) {
       bytes = input;
+      while (input != null &&
+                 (input.getEnd() <= offset ||
+                      input.getOffset() > offset + length)) {
+        input = input.next;
+      }
       if (input == null || input.getOffset() <= offset) {
         position = 0;
       } else {
@@ -456,43 +461,45 @@ public abstract class InStream extends InputStream {
       }
     }
 
-    private void readHeader() throws IOException {
-      if (compressed == null || compressed.remaining() <= 0) {
+    private int readHeaderByte() {
+      while (currentRange != null &&
+                 (compressed == null || compressed.remaining() <= 0)) {
         setCurrent(currentRange.next, false);
       }
-      if (compressed.remaining() > OutStream.HEADER_SIZE) {
-        int b0 = compressed.get() & 0xff;
-        int b1 = compressed.get() & 0xff;
-        int b2 = compressed.get() & 0xff;
-        boolean isOriginal = (b0 & 0x01) == 1;
-        int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+      if (compressed != null && compressed.remaining() > 0) {
+        position += 1;
+        return compressed.get() & 0xff;
+      } else {
+        throw new IllegalStateException("Can't read header at " + this);
+      }
+    }
 
-        if (chunkLength > bufferSize) {
-          throw new IllegalArgumentException("Buffer size too small. size = " +
-              bufferSize + " needed = " + chunkLength + " in " + name);
-        }
-        // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
-        assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
-        position += OutStream.HEADER_SIZE;
+    private void readHeader() throws IOException {
+      int b0 = readHeaderByte();
+      int b1 = readHeaderByte();
+      int b2 = readHeaderByte();
+      boolean isOriginal = (b0 & 0x01) == 1;
+      int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
 
-        ByteBuffer slice = this.slice(chunkLength);
+      if (chunkLength > bufferSize) {
+        throw new IllegalArgumentException("Buffer size too small. size = " +
+                                               bufferSize + " needed = " + chunkLength + " in " + name);
+      }
+      ByteBuffer slice = this.slice(chunkLength);
 
-        if (isOriginal) {
-          uncompressed = slice;
-          isUncompressedOriginal = true;
-        } else {
-          if (isUncompressedOriginal) {
-            allocateForUncompressed(bufferSize, slice.isDirect());
-            isUncompressedOriginal = false;
-          } else if (uncompressed == null) {
-            allocateForUncompressed(bufferSize, slice.isDirect());
-          } else {
-            uncompressed.clear();
-          }
-          codec.decompress(slice, uncompressed);
-         }
+      if (isOriginal) {
+        uncompressed = slice;
+        isUncompressedOriginal = true;
       } else {
-        throw new IllegalStateException("Can't read header at " + this);
+        if (isUncompressedOriginal) {
+          allocateForUncompressed(bufferSize, slice.isDirect());
+          isUncompressedOriginal = false;
+        } else if (uncompressed == null) {
+          allocateForUncompressed(bufferSize, slice.isDirect());
+        } else {
+          uncompressed.clear();
+        }
+        codec.decompress(slice, uncompressed);
       }
     }
 
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index 400577b..f09169d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.Key;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -661,4 +662,121 @@ public class TestInStream {
     });
     assertEquals(0, stream.available());
   }
+
+  private static byte[] input(int... data) {
+    byte[] result = new byte[data.length];
+    for(int i = 0; i < data.length; ++i) {
+      result[i] = (byte) data[i];
+    }
+    return result;
+  }
+
+  // a zlib stream of 16 sequences of [0..255]
+  private static final byte[] compressed = input(
+       106,    2,    0,   99,   96,  100,   98,  102,   97,  101,   99,  -25,
+       -32,  -28,  -30,  -26,  -31,  -27,  -29,   23,   16,   20,   18,   22,
+        17,   21,   19, -105, -112, -108, -110, -106, -111, -107, -109,   87,
+        80,   84,   82,   86,   81,   85,   83,  -41,  -48,  -44,  -46,  -42,
+       -47,  -43,  -45,   55,   48,   52,   50,   54,   49,   53,   51,  -73,
+       -80,  -76,  -78,  -74,  -79,  -75,  -77,  119,  112,  116,  114,  118,
+       113,  117,  115,   -9,  -16,  -12,  -14,  -10,  -15,  -11,  -13,   15,
+         8,   12,   10,   14,    9,   13,   11, -113, -120, -116, -118, -114,
+      -119, -115, -117,   79,   72,   76,   74,   78,   73,   77,   75,  -49,
+       -56,  -52,  -54,  -50,  -55,  -51,  -53,   47,   40,   44,   42,   46,
+        41,   45,   43,  -81,  -88,  -84,  -86,  -82,  -87,  -83,  -85,  111,
+       104,  108,  106,  110,  105,  109,  107,  -17,  -24,  -20,  -22,  -18,
+       -23,  -19,  -21,  -97,   48,  113,  -46,  -28,   41,   83,  -89,   77,
+       -97,   49,  115,  -42,  -20,   57,  115,  -25,  -51,   95,  -80,  112,
+       -47,  -30,   37,   75, -105,   45,   95,  -79,  114,  -43,  -22,   53,
+       107,  -41,  -83,  -33,  -80,  113,  -45,  -26,   45,   91,  -73,  109,
+       -33,  -79,  115,  -41,  -18,   61,  123,   -9,  -19,   63,  112,  -16,
+       -48,  -31,   35,   71, -113,   29,   63,  113,  -14,  -44,  -23,   51,
+       103,  -49,  -99,  -65,  112,  -15,  -46,  -27,   43,   87,  -81,   93,
+       -65,  113,  -13,  -42,  -19,   59,  119,  -17,  -35,  127,  -16,  -16,
+       -47,  -29,   39,   79,  -97,   61,  127,  -15,  -14,  -43,  -21,   55,
+       111,  -33,  -67,   -1,  -16,  -15,  -45,  -25,   47,   95,  -65,  125,
+        -1,  -15,  -13,  -41,  -17,   63,  127,   -1,   -3,  103,   24,  -11,
+        -1,  -88,   -1,   71,   -3,   63,  -22,   -1,   81,   -1, -113,   -6,
+       127,  -44,   -1,  -93,   -2,   31,  -11,   -1,  -88,   -1,   71,   -3,
+        63,  -22,   -1,   81,   -1, -113,   -6,  127,    4,   -8,   31,    0);
+
+  @Test
+  public void testMultiRangeCompressed() throws IOException {
+    // Set up an initial buffer of PREVIOUS_LENGTH followed by our stream
+    // at START.
+    final long START = 1_000_000_000;
+    final int PREVIOUS_LENGTH = 3000;
+    BufferChunkList list = new BufferChunkList();
+    byte[] previous = new byte[PREVIOUS_LENGTH];
+    Arrays.fill(previous, (byte) -1);
+    list.add(new BufferChunk(ByteBuffer.wrap(previous), START - PREVIOUS_LENGTH));
+    list.add(new BufferChunk(ByteBuffer.wrap(compressed), START));
+    InStream.StreamOptions options =
+        InStream.options().withCodec(new ZlibCodec()).withBufferSize(4096);
+    InStream inStream = InStream.create("test", list.get(), START, 4096, options);
+    byte[] inBuffer = new byte[4096];
+    assertEquals(4096, inStream.read(inBuffer));
+    for(int i=0; i < inBuffer.length; ++i) {
+      assertEquals("position " + i, (byte)i, inBuffer[i]);
+    }
+  }
+
+  @Test
+  public void testExtraFrontCompressed() throws IOException {
+    // Set up a stream that starts at START, which is divided in to regions
+    // of CHUNK_LENGTH. There are two EXTRA_FRONT byte buffers in front of the
+    // stream.
+    final long START = 1_000_000_000;
+    final int EXTRA_FRONT = 3_000;
+    final int CHUNK_LENGTH = 100;
+
+    BufferChunkList list = new BufferChunkList();
+    list.add(new BufferChunk(ByteBuffer.allocate(EXTRA_FRONT),
+        START - 2 * EXTRA_FRONT));
+    byte[] extraFront = new byte[EXTRA_FRONT + CHUNK_LENGTH];
+    Arrays.fill(extraFront, (byte) -1);
+    System.arraycopy(compressed, 0, extraFront, EXTRA_FRONT, CHUNK_LENGTH);
+    list.add(new BufferChunk(ByteBuffer.wrap(extraFront), START - EXTRA_FRONT));
+    int posn = CHUNK_LENGTH;
+    while (posn < compressed.length) {
+      list.add(new BufferChunk(
+          ByteBuffer.wrap(compressed, posn,
+              Math.min(CHUNK_LENGTH, compressed.length - posn)),
+          START + posn));
+      posn += CHUNK_LENGTH;
+    }
+
+    // now set up the stream to read it
+    InStream.StreamOptions options =
+        InStream.options().withCodec(new ZlibCodec()).withBufferSize(4096);
+    InStream inStream = InStream.create("test", list.get(), START, 4096, options);
+
+    // ensure the data is correct
+    byte[] inBuffer = new byte[4096];
+    assertEquals(4096, inStream.read(inBuffer));
+    for(int i=0; i < inBuffer.length; ++i) {
+      assertEquals("position " + i, (byte)i, inBuffer[i]);
+    }
+  }
+
+  @Test
+  public void testMultiRangeCompressHeader() throws IOException {
+    // Set up a buffer where the first 5 bytes are each a chunk and then the
+    // rest of the stream follows.
+    final long START = 1_000_000_000;
+    BufferChunkList list = new BufferChunkList();
+    for(int i=0; i < 5; ++i) {
+      list.add(new BufferChunk(ByteBuffer.wrap(compressed, i, 1), START + i));
+    }
+    list.add(new BufferChunk(
+        ByteBuffer.wrap(compressed, 5, compressed.length - 5), START + 5));
+    InStream.StreamOptions options =
+        InStream.options().withCodec(new ZlibCodec()).withBufferSize(4096);
+    InStream inStream = InStream.create("test", list.get(), START, 4096, options);
+    byte[] inBuffer = new byte[4096];
+    assertEquals(4096, inStream.read(inBuffer));
+    for(int i=0; i < inBuffer.length; ++i) {
+      assertEquals("position " + i, (byte)i, inBuffer[i]);
+    }
+  }
 }