You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2018/05/31 14:38:47 UTC

[parquet-mr] branch master updated: PARQUET-1304: Release 1.10 contains breaking changes for Hive (#485)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 345e2d5  PARQUET-1304: Release 1.10 contains breaking changes for Hive (#485)
345e2d5 is described below

commit 345e2d541128471641e76aaa44dd5046f199197d
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Thu May 31 16:38:43 2018 +0200

    PARQUET-1304: Release 1.10 contains breaking changes for Hive (#485)
---
 .../apache/parquet/column/values/ValuesReader.java |  70 +++++++++++++++
 .../values/bitpacking/BitPackingValuesReader.java  |   1 +
 .../bitpacking/ByteBitPackingValuesReader.java     |   1 +
 .../delta/DeltaBinaryPackingValuesReader.java      |   2 +
 .../values/plain/BooleanPlainValuesReader.java     |   6 ++
 .../rle/RunLengthBitPackingHybridValuesReader.java |   3 +
 .../column/values/rle/ZeroIntegerValuesReader.java |   3 +-
 ...ltaBinaryPackingValuesWriterForIntegerTest.java |   8 ++
 .../DeltaBinaryPackingValuesWriterForLongTest.java |   8 ++
 .../column/values/dictionary/TestDictionary.java   |   5 ++
 .../parquet/bytes/ByteBufferInputStream.java       | 100 +++++++++++++++++++--
 .../java/org/apache/parquet/bytes/BytesInput.java  |  16 ++++
 .../parquet/bytes/MultiBufferInputStream.java      |   2 +-
 .../parquet/bytes/TestByteBufferInputStreams.java  |  14 +++
 ...m.java => TestDeprecatedBufferInputStream.java} |  94 +++++++++++--------
 .../parquet/bytes/TestSingleBufferInputStream.java |   2 +-
 16 files changed, 289 insertions(+), 46 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
index 5732660..1154bc4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -19,8 +19,10 @@
 package org.apache.parquet.column.values;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 
 /**
@@ -34,6 +36,54 @@ import org.apache.parquet.io.api.Binary;
  */
 public abstract class ValuesReader {
 
+  // To be used to maintain the deprecated behavior of getNextOffset(); -1 means undefined
+  private int actualOffset = -1;
+  private int nextOffset;
+
+  /**
+   * Called to initialize the column reader from a part of a page.
+   *
+   * The underlying implementation knows how much data to read, so a length
+   * is not provided.
+   *
+   * Each page may contain several sections:
+   * <ul>
+   *  <li> repetition levels column
+   *  <li> definition levels column
+   *  <li> data column
+   * </ul>
+   *
+   * This function is called with 'offset' pointing to the beginning of one of these sections,
+   * and should return the offset to the section following it.
+   *
+   * @param valueCount count of values in this page
+   * @param page the array to read from containing the page data (repetition levels, definition levels, data)
+   * @param offset where to start reading from in the page
+   *
+   * @throws IOException
+   * @deprecated Will be removed in 2.0.0
+   */
+  @Deprecated
+  public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
+    actualOffset = offset;
+    ByteBuffer pageWithOffset = page.duplicate();
+    pageWithOffset.position(offset);
+    initFromPage(valueCount, ByteBufferInputStream.wrap(pageWithOffset));
+    actualOffset = -1;
+  }
+
+  /**
+   * Same functionality as method of the same name that takes a ByteBuffer instead of a byte[].
+   *
+   * This method is only provided for backward compatibility and will be removed in a future release.
+   * Please update any code using it as soon as possible.
+   * @see #initFromPage(int, ByteBuffer, int)
+   */
+  @Deprecated
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+
   /**
    * Called to initialize the column reader from a part of a page.
    *
@@ -56,6 +106,26 @@ public abstract class ValuesReader {
   public abstract void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException;
 
   /**
+   * Called to return offset of the next section
+   * @return offset of the next section
+   * @deprecated Will be removed in 2.0.0
+   */
+  @Deprecated
+  public int getNextOffset() {
+    if (nextOffset == -1) {
+      throw new ParquetDecodingException("Unsupported: cannot get offset of the next section.");
+    } else {
+      return nextOffset;
+    }
+  }
+
+  // To be used to maintain the deprecated behavior of getNextOffset();
+  // bytesRead is the number of bytes read in the last initFromPage call
+  protected void updateNextOffset(int bytesRead) {
+    nextOffset = actualOffset == -1 ? -1 : actualOffset + bytesRead;
+  }
+
+  /**
    * usable when the encoding is dictionary based
    * @return the id of the next value from the page
    */
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
index 78d1b72..1a05a2b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
@@ -73,6 +73,7 @@ public class BitPackingValuesReader extends ValuesReader {
 
     this.in = stream.sliceStream(length);
     this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
+    updateNextOffset(length);
   }
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
index 0445d25..1fa62d4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
@@ -80,6 +80,7 @@ public class ByteBitPackingValuesReader extends ValuesReader {
     length = Math.min(length, stream.available());
     this.in = stream.sliceStream(length);
     this.decodedPosition = VALUES_AT_A_TIME - 1;
+    updateNextOffset(length);
   }
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index dceaa52..c8a80fd 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -59,6 +59,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
     this.in = stream;
+    long startPos = in.position();
     this.config = DeltaBinaryPackingConfig.readConfig(in);
     this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
     allocateValuesBuffer();
@@ -70,6 +71,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
     while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
       loadNewBlockToBuffer();
     }
+    updateNextOffset((int) (in.position() - startPos));
   }
 
   /**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
index 9dc5629..001a3a1 100755
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -64,4 +64,10 @@ public class BooleanPlainValuesReader extends ValuesReader {
     LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
     this.in.initFromPage(valueCount, stream);
   }
+
+  @Deprecated
+  @Override
+  public int getNextOffset() {
+    return in.getNextOffset();
+  }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index 821ac62..d9b4d99 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -42,6 +42,9 @@ public class RunLengthBitPackingHybridValuesReader extends ValuesReader {
     int length = BytesUtils.readIntLittleEndian(stream);
     this.decoder = new RunLengthBitPackingHybridDecoder(
         bitWidth, stream.sliceStream(length));
+
+    // 4 is for the length which is stored as 4 bytes little endian
+    updateNextOffset(length + 4);
   }
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
index fe00de9..beeb0ad 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
@@ -19,8 +19,6 @@
 package org.apache.parquet.column.values.rle;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 
@@ -37,6 +35,7 @@ public class ZeroIntegerValuesReader extends ValuesReader {
 
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
+    updateNextOffset(0);
   }
 
   @Override
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
index ff4a308..df99e3c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
@@ -169,6 +169,14 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
     for (int i : data) {
       assertEquals(i, reader.readInteger());
     }
+
+    // Testing the deprecated behavior of using byte arrays directly
+    reader = new DeltaBinaryPackingValuesReader();
+    reader.initFromPage(100, pageContent, contentOffsetInPage);
+    assertEquals(valueContent.length + contentOffsetInPage, reader.getNextOffset());
+    for (int i : data) {
+      assertEquals(i, reader.readInteger());
+    }
   }
 
   @Test
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
index 795a591..65ac819 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
@@ -169,6 +169,14 @@ public class DeltaBinaryPackingValuesWriterForLongTest {
     for (long i : data) {
       assertEquals(i, reader.readLong());
     }
+
+    // Testing the deprecated behavior of using byte arrays directly
+    reader = new DeltaBinaryPackingValuesReader();
+    reader.initFromPage(100, pageContent, contentOffsetInPage);
+    assertEquals(valueContent.length + contentOffsetInPage, reader.getNextOffset());
+    for (long i : data) {
+      assertEquals(i, reader.readLong());
+    }
   }
 
   @Test
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
index cf66982..ef2b721 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
@@ -480,6 +480,11 @@ public class TestDictionary {
     ByteBufferInputStream stream = ByteBufferInputStream.wrap(bytes);
     stream.skipFully(stream.available());
     reader.initFromPage(100, stream);
+
+    // Testing the deprecated behavior of using byte arrays directly
+    reader = initDicReader(cw, INT32);
+    int offset = bytes.remaining();
+    reader.initFromPage(100,  bytes, offset);
   }
 
   private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type)
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
index fc92b6b..495cca2 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
@@ -26,7 +26,12 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
-public abstract class ByteBufferInputStream extends InputStream {
+import org.apache.parquet.ShouldNeverHappenException;
+
+public class ByteBufferInputStream extends InputStream {
+
+  // Used to maintain the deprecated behavior of instantiating ByteBufferInputStream directly
+  private final ByteBufferInputStream delegate;
 
   public static ByteBufferInputStream wrap(ByteBuffer... buffers) {
     if (buffers.length == 1) {
@@ -44,7 +49,54 @@ public abstract class ByteBufferInputStream extends InputStream {
     }
   }
 
-  public abstract long position();
+  ByteBufferInputStream() {
+    delegate = null;
+  }
+
+  /**
+   * @param buffer
+   *          the buffer to be wrapped in this input stream
+   * @deprecated Will be removed in 2.0.0; Use {@link #wrap(ByteBuffer...)} instead
+   */
+  @Deprecated
+  public ByteBufferInputStream(ByteBuffer buffer) {
+    delegate = wrap(buffer);
+  }
+
+  /**
+   * @param buffer
+   *          the buffer to be wrapped in this input stream
+   * @param offset
+   *          the offset of the data in the buffer
+   * @param count
+   *          the number of bytes to be read from the buffer
+   * @deprecated Will be removed in 2.0.0; Use {@link #wrap(ByteBuffer...)} instead
+   */
+  @Deprecated
+  public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) {
+    ByteBuffer temp = buffer.duplicate();
+    temp.position(offset);
+    ByteBuffer byteBuf = temp.slice();
+    byteBuf.limit(count);
+    delegate = wrap(byteBuf);
+  }
+
+  /**
+   * @return the slice of the byte buffer inside this stream
+   * @deprecated Will be removed in 2.0.0; Use {@link #slice(int)} instead
+   */
+  @Deprecated
+  public ByteBuffer toByteBuffer() {
+    try {
+      return slice(available());
+    } catch (EOFException e) {
+      throw new ShouldNeverHappenException(e);
+    }
+  }
+
+  public long position() {
+    return delegate.position();
+  }
 
   public void skipFully(long n) throws IOException {
     long skipped = skip(n);
@@ -54,19 +106,55 @@ public abstract class ByteBufferInputStream extends InputStream {
     }
   }
 
-  public abstract int read(ByteBuffer out);
+  public int read(ByteBuffer out) {
+    return delegate.read(out);
+  }
 
-  public abstract ByteBuffer slice(int length) throws EOFException;
+  public ByteBuffer slice(int length) throws EOFException {
+    return delegate.slice(length);
+  }
 
-  public abstract List<ByteBuffer> sliceBuffers(long length) throws EOFException;
+  public List<ByteBuffer> sliceBuffers(long length) throws EOFException {
+    return delegate.sliceBuffers(length);
+  }
 
   public ByteBufferInputStream sliceStream(long length) throws EOFException {
     return ByteBufferInputStream.wrap(sliceBuffers(length));
   }
 
-  public abstract List<ByteBuffer> remainingBuffers();
+  public List<ByteBuffer> remainingBuffers() {
+    return delegate.remainingBuffers();
+  }
 
   public ByteBufferInputStream remainingStream() {
     return ByteBufferInputStream.wrap(remainingBuffers());
   }
+
+  public int read() throws IOException {
+    return delegate.read();
+  }
+
+  public int read(byte[] b, int off, int len) throws IOException {
+    return delegate.read(b, off, len);
+  }
+
+  public long skip(long n) {
+    return delegate.skip(n);
+  }
+
+  public int available() {
+    return delegate.available();
+  }
+
+  public void mark(int readlimit) {
+    delegate.mark(readlimit);
+  }
+
+  public void reset() throws IOException {
+    delegate.reset();
+  }
+
+  public boolean markSupported() {
+    return delegate.markSupported();
+  }
 }
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
index b18aae3..807fd06 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -73,6 +73,22 @@ abstract public class BytesInput {
   }
 
   /**
+   * @param buffer
+   * @param length
+   *          number of bytes to read
+   * @return a BytesInput that will read the given bytes from the ByteBuffer
+   * @deprecated Will be removed in 2.0.0
+   */
+  @Deprecated
+  public static BytesInput from(ByteBuffer buffer, int offset, int length) {
+    ByteBuffer tmp = buffer.duplicate();
+    tmp.position(offset);
+    ByteBuffer slice = tmp.slice();
+    slice.limit(length);
+    return new ByteBufferBytesInput(slice);
+  }
+
+  /**
    * @param buffers an array of byte buffers
    * @return a BytesInput that will read the given bytes from the ByteBuffers
    */
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java
index 20a142b..34fa250 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java
@@ -64,7 +64,7 @@ class MultiBufferInputStream extends ByteBufferInputStream {
   }
 
   @Override
-  public long skip(long n) throws IOException {
+  public long skip(long n) {
     if (n <= 0) {
       return 0;
     }
diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
index 7bed2a9..6151c0a 100644
--- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
+++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java
@@ -21,6 +21,9 @@ package org.apache.parquet.bytes;
 
 import org.junit.Assert;
 import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -30,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 
 public abstract class TestByteBufferInputStreams {
+  static final int DATA_LENGTH = 35;
 
   protected abstract ByteBufferInputStream newStream();
   protected abstract void checkOriginalData();
@@ -573,6 +577,16 @@ public abstract class TestByteBufferInputStreams {
         });
   }
 
+  @Test
+  public void testToByteBuffer() {
+    final ByteBufferInputStream stream = newStream();
+
+    ByteBuffer buffer = stream.toByteBuffer();
+    for (int i = 0; i < DATA_LENGTH; ++i) {
+      assertEquals(i, buffer.get());
+    }
+  }
+
   /**
    * A convenience method to avoid a large number of @Test(expected=...) tests
    * @param message A String message to describe this assertion
diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestDeprecatedBufferInputStream.java
similarity index 52%
copy from parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
copy to parquet-common/src/test/java/org/apache/parquet/bytes/TestDeprecatedBufferInputStream.java
index 9db23be..ca04f79 100644
--- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
+++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestDeprecatedBufferInputStream.java
@@ -1,46 +1,73 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.parquet.bytes;
 
-import org.junit.Assert;
-import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-public class TestSingleBufferInputStream extends TestByteBufferInputStreams {
-  private static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] {
-          0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
-          20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 });
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests the deprecated behavior of instantiating ByteBufferInputStream directly
+ */
+@RunWith(Parameterized.class)
+public class TestDeprecatedBufferInputStream extends TestByteBufferInputStreams {
+  @Parameter(0)
+  public ByteBuffer data;
+  @Parameter(1)
+  public Integer offset;
+
+  @Parameters
+  public static List<Object[]> parameters() {
+    return Arrays.asList(
+        new Object[] { TestSingleBufferInputStream.DATA, null },
+        new Object[] { TestSingleBufferInputStream.DATA, 0 },
+        new Object[] { ByteBuffer.wrap(new byte[] { -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
+            15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 }), 4 },
+        new Object[] { ByteBuffer.wrap(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
+            19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, -1, -1, -1 }), 0 },
+        new Object[] { ByteBuffer.wrap(new byte[] { -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
+            16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, -1, -1 }), 3 });
+  }
 
   @Override
   protected ByteBufferInputStream newStream() {
-    return new SingleBufferInputStream(DATA);
+    if (offset == null) {
+      return new ByteBufferInputStream(data);
+    } else {
+      return new ByteBufferInputStream(data, offset, DATA_LENGTH);
+    }
   }
 
   @Override
   protected void checkOriginalData() {
-      Assert.assertEquals("Position should not change", 0, DATA.position());
-      Assert.assertEquals("Limit should not change",
-          DATA.array().length, DATA.limit());
+    Assert.assertEquals("Position should not change", 0, data.position());
+    Assert.assertEquals("Limit should not change",
+        data.array().length, data.limit());
   }
 
   @Test
@@ -62,22 +89,20 @@ public class TestSingleBufferInputStream extends TestByteBufferInputStreams {
 
     ByteBuffer one = buffers.get(0);
     Assert.assertSame("Should use the same backing array",
-        one.array(), DATA.array());
+        one.array(), data.array());
     Assert.assertEquals(8, one.remaining());
     Assert.assertEquals(0, one.position());
     Assert.assertEquals(8, one.limit());
-    Assert.assertEquals(35, one.capacity());
     for (; i < 8; i += 1) {
       Assert.assertEquals("Should produce correct values", i, one.get());
     }
 
     ByteBuffer two = buffers.get(1);
     Assert.assertSame("Should use the same backing array",
-        two.array(), DATA.array());
+        two.array(), data.array());
     Assert.assertEquals(8, two.remaining());
     Assert.assertEquals(8, two.position());
     Assert.assertEquals(16, two.limit());
-    Assert.assertEquals(35, two.capacity());
     for (; i < 16; i += 1) {
       Assert.assertEquals("Should produce correct values", i, two.get());
     }
@@ -85,11 +110,10 @@ public class TestSingleBufferInputStream extends TestByteBufferInputStreams {
     // three is a copy of part of the 4th buffer
     ByteBuffer three = buffers.get(2);
     Assert.assertSame("Should use the same backing array",
-        three.array(), DATA.array());
+        three.array(), data.array());
     Assert.assertEquals(8, three.remaining());
     Assert.assertEquals(16, three.position());
     Assert.assertEquals(24, three.limit());
-    Assert.assertEquals(35, three.capacity());
     for (; i < 24; i += 1) {
       Assert.assertEquals("Should produce correct values", i, three.get());
     }
@@ -97,11 +121,10 @@ public class TestSingleBufferInputStream extends TestByteBufferInputStreams {
     // four should be a copy of the next 8 bytes
     ByteBuffer four = buffers.get(3);
     Assert.assertSame("Should use the same backing array",
-        four.array(), DATA.array());
+        four.array(), data.array());
     Assert.assertEquals(8, four.remaining());
     Assert.assertEquals(24, four.position());
     Assert.assertEquals(32, four.limit());
-    Assert.assertEquals(35, four.capacity());
     for (; i < 32; i += 1) {
       Assert.assertEquals("Should produce correct values", i, four.get());
     }
@@ -109,11 +132,10 @@ public class TestSingleBufferInputStream extends TestByteBufferInputStreams {
     // five should be a copy of the next 8 bytes
     ByteBuffer five = buffers.get(4);
     Assert.assertSame("Should use the same backing array",
-        five.array(), DATA.array());
+        five.array(), data.array());
     Assert.assertEquals(3, five.remaining());
     Assert.assertEquals(32, five.position());
     Assert.assertEquals(35, five.limit());
-    Assert.assertEquals(35, five.capacity());
     for (; i < 35; i += 1) {
       Assert.assertEquals("Should produce correct values", i, five.get());
     }
@@ -125,6 +147,6 @@ public class TestSingleBufferInputStream extends TestByteBufferInputStreams {
 
     List<ByteBuffer> buffers = stream.sliceBuffers(stream.available());
     Assert.assertEquals("Should return duplicates of all non-empty buffers",
-        Collections.singletonList(DATA), buffers);
+        Collections.singletonList(TestSingleBufferInputStream.DATA), buffers);
   }
 }
diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
index 9db23be..7ae8c3c 100644
--- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
+++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java
@@ -27,7 +27,7 @@ import java.util.Collections;
 import java.util.List;
 
 public class TestSingleBufferInputStream extends TestByteBufferInputStreams {
-  private static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] {
+  static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] {
           0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
           20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 });
 

-- 
To stop receiving notification emails like this one, please contact
gabor@apache.org.