You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/08/16 17:12:16 UTC
parquet-mr git commit: PARQUET-400: Replace CompatibilityUtil with
SeekableInputStream.
Repository: parquet-mr
Updated Branches:
refs/heads/master c8d78b21b -> 898f3d0f6
PARQUET-400: Replace CompatibilityUtil with SeekableInputStream.
This fixes PARQUET-400 by replacing `CompatibilityUtil` with `SeekableInputStream` that's implemented for hadoop-1 and hadoop-2. The benefit of this approach is that `SeekableInputStream` can be used for non-Hadoop file systems in the future.
This also changes the default Hadoop version to Hadoop-2. The library is still compatible with Hadoop 1.x, but this makes building Hadoop-2 classes, like `H2SeekableInputStream`, much easier and removes the need for multiple hadoop versions during compilation.
Author: Ryan Blue <bl...@apache.org>
Closes #349 from rdblue/PARQUET-400-byte-buffers and squashes the following commits:
1bcb8a8 [Ryan Blue] PARQUET-400: Fix review nits.
823ca00 [Ryan Blue] PARQUET-400: Add tests for Hadoop 2 readFully.
02d3709 [Ryan Blue] PARQUET-400: Remove unused property.
b543013 [Ryan Blue] PARQUET-400: Fix logger for HadoopStreams.
2cb6934 [Ryan Blue] PARQUET-400: Remove H2SeekableInputStream tests.
abaa695 [Ryan Blue] PARQUET-400: Fix review items.
5dc50a5 [Ryan Blue] PARQUET-400: Add tests for H1SeekableInputStream methods.
730a9e2 [Ryan Blue] PARQUET-400: Move SeekableInputStream to io package.
506a556 [Ryan Blue] PARQUET-400: Remove Hadoop dependencies from SeekableInputStream.
c80580c [Ryan Blue] PARQUET-400: Handle UnsupportedOperationException from read(ByteBuffer).
ba08b3f [Ryan Blue] PARQUET-400: Replace CompatibilityUtil with SeekableInputStream.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/898f3d0f
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/898f3d0f
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/898f3d0f
Branch: refs/heads/master
Commit: 898f3d0f652f313473c67fef32e22d94d8294d4f
Parents: c8d78b2
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Aug 16 10:12:00 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Aug 16 10:12:00 2016 -0700
----------------------------------------------------------------------
.travis.yml | 4 +-
.../apache/parquet/io/SeekableInputStream.java | 106 +++
.../parquet/hadoop/ParquetFileReader.java | 31 +-
.../parquet/hadoop/ParquetFileWriter.java | 17 +-
.../parquet/hadoop/util/CompatibilityUtil.java | 114 ---
.../hadoop/util/H1SeekableInputStream.java | 154 ++++
.../hadoop/util/H2SeekableInputStream.java | 107 +++
.../parquet/hadoop/util/HadoopStreams.java | 100 +++
.../parquet/hadoop/util/MockInputStream.java | 87 +++
.../hadoop/util/TestHadoop1ByteBufferReads.java | 761 +++++++++++++++++++
.../hadoop/util/TestHadoop2ByteBufferReads.java | 405 ++++++++++
pom.xml | 14 +-
12 files changed, 1761 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 890a372..ff9b356 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -24,8 +24,8 @@ before_install:
- cd ..
env:
- - HADOOP_PROFILE=default TEST_CODECS=uncompressed
- - HADOOP_PROFILE=hadoop-2 TEST_CODECS=gzip,snappy
+ - HADOOP_PROFILE=hadoop-1 TEST_CODECS=uncompressed
+ - HADOOP_PROFILE=default TEST_CODECS=gzip,snappy
install: mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || (cat mvn_install.log && false)
script: mvn test -P $HADOOP_PROFILE
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java
new file mode 100644
index 0000000..7247817
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * {@code SeekableInputStream} is an interface with the methods needed by
+ * Parquet to read data from a file or Hadoop data stream.
+ */
+public abstract class SeekableInputStream extends InputStream {
+
+ /**
+ * Return the current position in the InputStream.
+ *
+ * @return current position in bytes from the start of the stream
+ * @throws IOException If the underlying stream throws IOException
+ */
+ public abstract long getPos() throws IOException;
+
+ /**
+ * Seek to a new position in the InputStream.
+ *
+ * @param newPos the new position to seek to
+ * @throws IOException If the underlying stream throws IOException
+ */
+ public abstract void seek(long newPos) throws IOException;
+
+ /**
+ * Read a byte array of data, from position 0 to the end of the array.
+ * <p>
+ * This method is equivalent to {@code read(bytes, 0, bytes.length)}.
+ * <p>
+ * This method will block until len bytes are available to copy into the
+ * array, or will throw {@link EOFException} if the stream ends before the
+ * array is full.
+ *
+ * @param bytes a byte array to fill with data from the stream
+ * @throws IOException If the underlying stream throws IOException
+ * @throws EOFException If the stream has fewer bytes left than are needed to
+ * fill the array, {@code bytes.length}
+ */
+ public abstract void readFully(byte[] bytes) throws IOException;
+
+ /**
+ * Read {@code len} bytes of data into an array, at position {@code start}.
+ * <p>
+ * This method will block until len bytes are available to copy into the
+ * array, or will throw {@link EOFException} if the stream ends before the
+ * array is full.
+ *
+ * @param bytes a byte array to fill with data from the stream
+ * @throws IOException If the underlying stream throws IOException
+ * @throws EOFException If the stream has fewer than {@code len} bytes left
+ */
+ public abstract void readFully(byte[] bytes, int start, int len) throws IOException;
+
+ /**
+ * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}.
+ * <p>
+ * This method will copy available bytes into the buffer, reading at most
+ * {@code buf.remaining()} bytes. The number of bytes actually copied is
+ * returned by the method, or -1 is returned to signal that the end of the
+ * underlying stream has been reached.
+ *
+ * @param buf a byte array to fill with data from the stream
+ * @return the number of bytes read or -1 if the stream ended
+ * @throws IOException If the underlying stream throws IOException
+ */
+ public abstract int read(ByteBuffer buf) throws IOException;
+
+ /**
+ * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}.
+ * <p>
+ * This method will block until {@code buf.remaining()} bytes are available
+ * to copy into the buffer, or will throw {@link EOFException} if the stream
+ * ends before the buffer is full.
+ *
+ * @param buf a byte array to fill with data from the stream
+ * @throws IOException If the underlying stream throws IOException
+ * @throws EOFException If the stream has fewer bytes left than are needed to
+ * fill the buffer, {@code buf.remaining()}
+ */
+ public abstract void readFully(ByteBuffer buf) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 83542d5..59a7e46 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -54,7 +54,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -66,7 +65,6 @@ import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
import org.apache.parquet.Log;
import org.apache.parquet.bytes.BytesInput;
@@ -91,6 +89,8 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ParquetDecodingException;
@@ -432,7 +432,7 @@ public class ParquetFileReader implements Closeable {
*/
public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
FileSystem fileSystem = file.getPath().getFileSystem(configuration);
- FSDataInputStream in = fileSystem.open(file.getPath());
+ SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath()));
try {
return readFooter(file.getLen(), file.getPath().toString(), in, filter);
} finally {
@@ -449,7 +449,7 @@ public class ParquetFileReader implements Closeable {
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
*/
- public static final ParquetMetadata readFooter(long fileLen, String filePath, FSDataInputStream f, MetadataFilter filter) throws IOException {
+ public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
if (Log.DEBUG) {
LOG.debug("File length " + fileLen);
}
@@ -493,7 +493,7 @@ public class ParquetFileReader implements Closeable {
}
private final CodecFactory codecFactory;
- private final FSDataInputStream f;
+ private final SeekableInputStream f;
private final FileStatus fileStatus;
private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
private final FileMetaData fileMetaData; // may be null
@@ -531,7 +531,7 @@ public class ParquetFileReader implements Closeable {
this.conf = configuration;
this.fileMetaData = fileMetaData;
FileSystem fs = filePath.getFileSystem(configuration);
- this.f = fs.open(filePath);
+ this.f = HadoopStreams.wrap(fs.open(filePath));
this.fileStatus = fs.getFileStatus(filePath);
this.blocks = blocks;
for (ColumnDescriptor col : columns) {
@@ -562,7 +562,7 @@ public class ParquetFileReader implements Closeable {
this.conf = conf;
FileSystem fs = file.getFileSystem(conf);
this.fileStatus = fs.getFileStatus(file);
- this.f = fs.open(file);
+ this.f = HadoopStreams.wrap(fs.open(file));
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
this.fileMetaData = footer.getFileMetaData();
this.blocks = footer.getBlocks();
@@ -585,7 +585,7 @@ public class ParquetFileReader implements Closeable {
this.conf = conf;
FileSystem fs = file.getFileSystem(conf);
this.fileStatus = fs.getFileStatus(file);
- this.f = fs.open(file);
+ this.f = HadoopStreams.wrap(fs.open(file));
this.footer = footer;
this.fileMetaData = footer.getFileMetaData();
this.blocks = footer.getBlocks();
@@ -772,7 +772,7 @@ public class ParquetFileReader implements Closeable {
}
private static DictionaryPage readCompressedDictionary(
- PageHeader pageHeader, FSDataInputStream fin) throws IOException {
+ PageHeader pageHeader, SeekableInputStream fin) throws IOException {
DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
@@ -940,7 +940,7 @@ public class ParquetFileReader implements Closeable {
*/
private class WorkaroundChunk extends Chunk {
- private final FSDataInputStream f;
+ private final SeekableInputStream f;
/**
* @param descriptor the descriptor of the chunk
@@ -948,7 +948,7 @@ public class ParquetFileReader implements Closeable {
* @param offset where the chunk starts in data
* @param f the file stream positioned at the end of this chunk
*/
- private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, FSDataInputStream f) {
+ private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, SeekableInputStream f) {
super(descriptor, byteBuf, offset);
this.f = f;
}
@@ -964,7 +964,7 @@ public class ParquetFileReader implements Closeable {
// to allow reading older files (using dictionary) we need this.
// usually 13 to 19 bytes are missing
// if the last page is smaller than this, the page header itself is truncated in the buffer.
- this.byteBuf.rewind(); // resetting the buffer to the position before we got the error
+ this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error
LOG.info("completing the column chunk to read the page header");
pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
}
@@ -1050,11 +1050,14 @@ public class ParquetFileReader implements Closeable {
* @return the chunks
* @throws IOException
*/
- public List<Chunk> readAll(FSDataInputStream f) throws IOException {
+ public List<Chunk> readAll(SeekableInputStream f) throws IOException {
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
f.seek(offset);
+
+ // Allocate the bytebuffer based on whether the FS can support it.
ByteBuffer chunksByteBuffer = allocator.allocate(length);
- CompatibilityUtil.getBuf(f, chunksByteBuffer, length);
+ f.readFully(chunksByteBuffer);
+
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
int currentChunkOffset = 0;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 523d01f..f0fa7f5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -61,6 +61,8 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -495,6 +497,12 @@ public class ParquetFileWriter {
public void appendRowGroups(FSDataInputStream file,
List<BlockMetaData> rowGroups,
boolean dropColumns) throws IOException {
+ appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns);
+ }
+
+ public void appendRowGroups(SeekableInputStream file,
+ List<BlockMetaData> rowGroups,
+ boolean dropColumns) throws IOException {
for (BlockMetaData block : rowGroups) {
appendRowGroup(file, block, dropColumns);
}
@@ -502,6 +510,11 @@ public class ParquetFileWriter {
public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup,
boolean dropColumns) throws IOException {
+ appendRowGroup(from, rowGroup, dropColumns);
+ }
+
+ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
+ boolean dropColumns) throws IOException {
startBlock(rowGroup.getRowCount());
Map<String, ColumnChunkMetaData> columnsToCopy =
@@ -596,8 +609,8 @@ public class ParquetFileWriter {
* @param length the number of bytes to copy
* @throws IOException
*/
- private static void copy(FSDataInputStream from, FSDataOutputStream to,
- long start, long length) throws IOException{
+ private static void copy(SeekableInputStream from, FSDataOutputStream to,
+ long start, long length) throws IOException{
if (DEBUG) LOG.debug(
"Copying " + length + " bytes at " + start + " to " + to.getPos());
from.seek(start);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
deleted file mode 100644
index bacf222..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.util;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.ShouldNeverHappenException;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-public class CompatibilityUtil {
-
- // Will be set to true if the implementation of FSDataInputSteam supports
- // the 2.x APIs, in particular reading using a provided ByteBuffer
- private static boolean useV21;
- public static final V21FileAPI fileAPI;
-
- private static class V21FileAPI {
- private final Method PROVIDE_BUF_READ_METHOD;
- private final Class<?> FSDataInputStreamCls;
-
- private V21FileAPI() throws ReflectiveOperationException {
- final String PACKAGE = "org.apache.hadoop";
- FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream");
- PROVIDE_BUF_READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBuffer.class);
- }
- }
-
- static {
- // Test to see if a class from the Hadoop 2.x API is available
- boolean v21 = true;
- try {
- Class.forName("org.apache.hadoop.io.compress.DirectDecompressor");
- } catch (ClassNotFoundException cnfe) {
- v21 = false;
- }
-
- useV21 = v21;
- try {
- if (v21) {
- fileAPI = new V21FileAPI();
- } else {
- fileAPI = null;
- }
-
- } catch (ReflectiveOperationException e) {
- throw new IllegalArgumentException("Error finding appropriate interfaces using reflection.", e);
- }
- }
-
- private static Object invoke(Method method, String errorMsg, Object instance, Object... args) {
- try {
- return method.invoke(instance, args);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException(errorMsg, e);
- } catch (InvocationTargetException e) {
- throw new IllegalArgumentException(errorMsg, e);
- }
- }
-
- public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException {
- int res;
- if (useV21) {
- try {
- res = (Integer) fileAPI.PROVIDE_BUF_READ_METHOD.invoke(f, readBuf);
- } catch (InvocationTargetException e) {
- if (e.getCause() instanceof UnsupportedOperationException) {
- // the FSDataInputStream docs say specifically that implementations
- // can choose to throw UnsupportedOperationException, so this should
- // be a reasonable check to make to see if the interface is
- // present but not implemented and we should be falling back
- useV21 = false;
- return getBuf(f, readBuf, maxSize);
- } else if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- } else {
- // To handle any cases where a Runtime exception occurs and provide
- // some additional context information. A stacktrace would just give
- // a line number, this at least tells them we were using the version
- // of the read method designed for using a ByteBuffer.
- throw new IOException("Error reading out of an FSDataInputStream " +
- "using the Hadoop 2 ByteBuffer based read method.", e.getCause());
- }
- } catch (IllegalAccessException e) {
- // This method is public because it is defined in an interface,
- // there should be no problems accessing it
- throw new ShouldNeverHappenException(e);
- }
- } else {
- byte[] buf = new byte[maxSize];
- res = f.read(buf);
- readBuf.put(buf, 0, res);
- }
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
new file mode 100644
index 0000000..4a03b1a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.io.SeekableInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * SeekableInputStream implementation that implements read(ByteBuffer) for
+ * Hadoop 1 FSDataInputStream.
+ */
+class H1SeekableInputStream extends SeekableInputStream {
+
+ private final int COPY_BUFFER_SIZE = 8192;
+ private final byte[] temp = new byte[COPY_BUFFER_SIZE];
+
+ private final FSDataInputStream stream;
+
+ public H1SeekableInputStream(FSDataInputStream stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ stream.seek(newPos);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return stream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ @Override
+ public void readFully(byte[] bytes) throws IOException {
+ stream.readFully(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void readFully(byte[] bytes, int start, int len) throws IOException {
+ stream.readFully(bytes);
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (buf.hasArray()) {
+ return readHeapBuffer(stream, buf);
+ } else {
+ return readDirectBuffer(stream, buf, temp);
+ }
+ }
+
+ @Override
+ public void readFully(ByteBuffer buf) throws IOException {
+ if (buf.hasArray()) {
+ readFullyHeapBuffer(stream, buf);
+ } else {
+ readFullyDirectBuffer(stream, buf, temp);
+ }
+ }
+
+ // Visible for testing
+ static int readHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException {
+ int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+ if (bytesRead < 0) {
+ // if this resulted in EOF, don't update position
+ return bytesRead;
+ } else {
+ buf.position(buf.position() + bytesRead);
+ return bytesRead;
+ }
+ }
+
+ // Visible for testing
+ static void readFullyHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException {
+ f.readFully(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+ buf.position(buf.limit());
+ }
+
+ // Visible for testing
+ static int readDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException {
+ // copy all the bytes that return immediately, stopping at the first
+ // read that doesn't return a full buffer.
+ int nextReadLength = Math.min(buf.remaining(), temp.length);
+ int totalBytesRead = 0;
+ int bytesRead;
+
+ while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) {
+ buf.put(temp);
+ totalBytesRead += bytesRead;
+ nextReadLength = Math.min(buf.remaining(), temp.length);
+ }
+
+ if (bytesRead < 0) {
+ // return -1 if nothing was read
+ return totalBytesRead == 0 ? -1 : totalBytesRead;
+ } else {
+ // copy the last partial buffer
+ buf.put(temp, 0, bytesRead);
+ totalBytesRead += bytesRead;
+ return totalBytesRead;
+ }
+ }
+
+ // Visible for testing
+ static void readFullyDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException {
+ int nextReadLength = Math.min(buf.remaining(), temp.length);
+ int bytesRead = 0;
+
+ while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) {
+ buf.put(temp, 0, bytesRead);
+ nextReadLength = Math.min(buf.remaining(), temp.length);
+ }
+
+ if (bytesRead < 0 && buf.remaining() > 0) {
+ throw new EOFException(
+ "Reached the end of stream. Still have: " + buf.remaining() + " bytes left");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
new file mode 100644
index 0000000..a706546
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.io.SeekableInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * SeekableInputStream implementation for FSDataInputStream that implements
+ * ByteBufferReadable in Hadoop 2.
+ */
+class H2SeekableInputStream extends SeekableInputStream {
+
+ // Visible for testing
+ interface Reader {
+ int read(ByteBuffer buf) throws IOException;
+ }
+
+ private final FSDataInputStream stream;
+ private final Reader reader;
+
+ public H2SeekableInputStream(FSDataInputStream stream) {
+ this.stream = stream;
+ this.reader = new H2Reader();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ stream.seek(newPos);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return stream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ @Override
+ public void readFully(byte[] bytes) throws IOException {
+ stream.readFully(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void readFully(byte[] bytes, int start, int len) throws IOException {
+ stream.readFully(bytes);
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ return stream.read(buf);
+ }
+
+ @Override
+ public void readFully(ByteBuffer buf) throws IOException {
+ readFully(reader, buf);
+ }
+
+ private class H2Reader implements Reader {
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ return stream.read(buf);
+ }
+ }
+
+ public static void readFully(Reader reader, ByteBuffer buf) throws IOException {
+ // unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read
+ // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we
+ // have to loop to ensure we read them.
+ while (buf.hasRemaining()) {
+ int readCount = reader.read(buf);
+ if (readCount == -1) {
+ // this is probably a bug in the ParquetReader. We shouldn't have called readFully with a buffer
+ // that has more remaining than the amount of data in the stream.
+ throw new EOFException("Reached the end of stream. Still have: " + buf.remaining() + " bytes left");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
new file mode 100644
index 0000000..7c321cd
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.Log;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.SeekableInputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Convenience methods to get Parquet abstractions for Hadoop data streams.
+ */
+public class HadoopStreams {
+
+ private static final Log LOG = Log.getLog(HadoopStreams.class);
+
+ private static final Class<?> byteBufferReadableClass = getReadableClass();
+ static final Constructor<SeekableInputStream> h2SeekableConstructor = getH2SeekableConstructor();
+
+ /**
+ * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
+ * implementation for Parquet readers.
+ *
+ * @param stream a Hadoop FSDataInputStream
+ * @return a SeekableInputStream
+ */
+ public static SeekableInputStream wrap(FSDataInputStream stream) {
+ if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+ byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+ try {
+ return h2SeekableConstructor.newInstance(stream);
+ } catch (InstantiationException e) {
+ LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
+ return new H1SeekableInputStream(stream);
+ } catch (IllegalAccessException e) {
+ LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
+ return new H1SeekableInputStream(stream);
+ } catch (InvocationTargetException e) {
+ throw new ParquetDecodingException(
+ "Could not instantiate H2SeekableInputStream", e.getTargetException());
+ }
+ } else {
+ return new H1SeekableInputStream(stream);
+ }
+ }
+
+ private static Class<?> getReadableClass() {
+ try {
+ return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
+ } catch (ClassNotFoundException e) {
+ return null;
+ } catch (NoClassDefFoundError e) {
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Class<SeekableInputStream> getH2SeekableClass() {
+ try {
+ return (Class<SeekableInputStream>) Class.forName(
+ "org.apache.parquet.hadoop.util.H2SeekableInputStream");
+ } catch (ClassNotFoundException e) {
+ return null;
+ } catch (NoClassDefFoundError e) {
+ return null;
+ }
+ }
+
+ private static Constructor<SeekableInputStream> getH2SeekableConstructor() {
+ Class<SeekableInputStream> h2SeekableClass = getH2SeekableClass();
+ if (h2SeekableClass != null) {
+ try {
+ return h2SeekableClass.getConstructor(FSDataInputStream.class);
+ } catch (NoSuchMethodException e) {
+ return null;
+ }
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
new file mode 100644
index 0000000..a112288
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+class MockInputStream extends ByteArrayInputStream
+ implements Seekable, PositionedReadable {
+ static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ private int[] lengths;
+ private int current = 0;
+ MockInputStream(int... actualReadLengths) {
+ super(TEST_ARRAY);
+ this.lengths = actualReadLengths;
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) {
+ if (current < lengths.length) {
+ if (len <= lengths[current]) {
+ // when len == lengths[current], the next read will by 0 bytes
+ int bytesRead = super.read(b, off, len);
+ lengths[current] -= bytesRead;
+ return bytesRead;
+ } else {
+ int bytesRead = super.read(b, off, lengths[current]);
+ current += 1;
+ return bytesRead;
+ }
+ } else {
+ return super.read(b, off, len);
+ }
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ seek(position);
+ return read(buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ throw new UnsupportedOperationException("Not actually supported.");
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ throw new UnsupportedOperationException("Not actually supported.");
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ this.pos = (int) pos;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.pos;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ seek(targetPos);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
new file mode 100644
index 0000000..9e4e2a9
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.hadoop.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+
+import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
+
+public class TestHadoop1ByteBufferReads {
+
+ private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() {
+ @Override
+ protected byte[] initialValue() {
+ return new byte[8192];
+ }
+ };
+
+ @Test
+ public void testHeapRead() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(-1, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapSmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(5);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(5, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(5, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(0, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
+ }
+
+ @Test
+ public void testHeapSmallReads() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(2, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapPosition() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+ readBuffer.position(10);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8));
+
+ int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(8, len);
+ Assert.assertEquals(18, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(20, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(-1, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+ readBuffer.limit(8);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+ int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(0, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testHeapPositionAndLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+ readBuffer.position(5);
+ readBuffer.limit(13);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+ int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(12, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(13, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(0, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectRead() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(-1, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectSmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(5);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(5, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(5, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(0, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
+ }
+
+ @Test
+ public void testDirectSmallReads() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(2, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectPosition() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+ readBuffer.position(10);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8));
+
+ int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(8, len);
+ Assert.assertEquals(18, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(20, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(-1, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+ readBuffer.limit(8);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+ int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(0, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectPositionAndLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+ readBuffer.position(5);
+ readBuffer.limit(13);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+ int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(12, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(13, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(0, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectSmallTempBufferSmallReads() throws Exception {
+ byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
+
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(2, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(-1, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception {
+ byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
+
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+ readBuffer.position(5);
+ readBuffer.limit(13);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+ int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(12, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(13, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(0, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullySmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(8);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyLargeBuffer() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ TestUtils.assertThrows("Should throw EOFException",
+ EOFException.class, new Callable() {
+ @Override
+ public Object call() throws Exception {
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ return null;
+ }
+ });
+
+ Assert.assertEquals(0, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+ }
+
+ @Test
+ public void testHeapReadFullyJustRight() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ // reads all of the bytes available without EOFException
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ // trying to read 0 more bytes doesn't result in EOFException
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullySmallReads() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyPosition() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.position(3);
+ readBuffer.mark();
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyLimit() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.limit(7);
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyPositionAndLimit() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.position(3);
+ readBuffer.limit(7);
+ readBuffer.mark();
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullySmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyLargeBuffer() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ TestUtils.assertThrows("Should throw EOFException",
+ EOFException.class, new Callable() {
+ @Override
+ public Object call() throws Exception {
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ return null;
+ }
+ });
+
+ // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
+ // several read operations that will read up to the end of the input. This
+ // is a correct value because the bytes in the buffer are valid. This
+ // behavior can't be implemented for the heap buffer without using the read
+ // method instead of the readFully method on the underlying
+ // FSDataInputStream.
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+ }
+
+ @Test
+ public void testDirectReadFullyJustRight() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+ // reads all of the bytes available without EOFException
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ // trying to read 0 more bytes doesn't result in EOFException
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullySmallReads() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyPosition() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.position(3);
+ readBuffer.mark();
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyLimit() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.limit(7);
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyPositionAndLimit() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.position(3);
+ readBuffer.limit(7);
+ readBuffer.mark();
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws Exception {
+ byte[] temp = new byte[2]; // this will cause readFully to loop
+
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.position(3);
+ readBuffer.limit(7);
+ readBuffer.mark();
+
+ final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
new file mode 100644
index 0000000..86b903c
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.hadoop.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+
+import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
+
+public class TestHadoop2ByteBufferReads {
+
+ /**
+ * This mimics ByteBuffer reads from streams in Hadoop 2
+ */
+ private static class MockBufferReader implements H2SeekableInputStream.Reader {
+ private final FSDataInputStream stream;
+
+ public MockBufferReader(FSDataInputStream stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ // this is inefficient, but simple for correctness tests of
+ // readFully(ByteBuffer)
+ byte[] temp = new byte[buf.remaining()];
+ int bytesRead = stream.read(temp, 0, temp.length);
+ if (bytesRead > 0) {
+ buf.put(temp, 0, bytesRead);
+ }
+ return bytesRead;
+ }
+ }
+
+ @Test
+ public void testHeapReadFullySmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(8);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyLargeBuffer() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ final MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ TestUtils.assertThrows("Should throw EOFException",
+ EOFException.class, new Callable() {
+ @Override
+ public Object call() throws Exception {
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ return null;
+ }
+ });
+
+ // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
+ // several read operations that will read up to the end of the input. This
+ // is a correct value because the bytes in the buffer are valid. This
+ // behavior can't be implemented for the heap buffer without using the read
+ // method instead of the readFully method on the underlying
+ // FSDataInputStream.
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+ }
+
+ @Test
+ public void testHeapReadFullyJustRight() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ // reads all of the bytes available without EOFException
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ // trying to read 0 more bytes doesn't result in EOFException
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullySmallReads() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyPosition() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.position(3);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.limit(7);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyPositionAndLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.position(3);
+ readBuffer.limit(7);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullySmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyLargeBuffer() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ final MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ TestUtils.assertThrows("Should throw EOFException",
+ EOFException.class, new Callable() {
+ @Override
+ public Object call() throws Exception {
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ return null;
+ }
+ });
+
+ // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
+ // several read operations that will read up to the end of the input. This
+ // is a correct value because the bytes in the buffer are valid. This
+ // behavior can't be implemented for the heap buffer without using the read
+ // method instead of the readFully method on the underlying
+ // FSDataInputStream.
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+ }
+
+ @Test
+ public void testDirectReadFullyJustRight() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ // reads all of the bytes available without EOFException
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ // trying to read 0 more bytes doesn't result in EOFException
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullySmallReads() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyPosition() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.position(3);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.limit(7);
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ H2SeekableInputStream.Reader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyPositionAndLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.position(3);
+ readBuffer.limit(7);
+ readBuffer.mark();
+
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ H2SeekableInputStream.readFully(reader, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2d6d7d2..ca34309 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,8 @@
<jackson.version>1.9.11</jackson.version>
<jackson.package>org.codehaus.jackson</jackson.package>
<shade.prefix>shaded.parquet</shade.prefix>
- <hadoop.version>1.1.0</hadoop.version>
+ <hadoop.version>2.3.0</hadoop.version>
+ <hadoop1.version>1.1.0</hadoop1.version>
<cascading.version>2.5.3</cascading.version>
<cascading3.version>3.0.3</cascading3.version>
<parquet.format.version>2.3.1</parquet.format.version>
@@ -80,7 +81,7 @@
<scala.binary.version>2.10</scala.binary.version>
<scala.maven.test.skip>false</scala.maven.test.skip>
<pig.version>0.14.0</pig.version>
- <pig.classifier/>
+ <pig.classifier>h2</pig.classifier>
<thrift.version>0.7.0</thrift.version>
<fastutil.version>6.5.7</fastutil.version>
<semver.api.version>0.9.33</semver.api.version>
@@ -509,19 +510,18 @@
</profile>
<profile>
- <id>hadoop-2</id>
+ <id>hadoop-1</id>
<activation>
<property>
<name>hadoop.profile</name>
- <value>hadoop2</value>
+ <value>hadoop1</value>
</property>
</activation>
<properties>
<!-- test hadoop-1 with the same jars that were produced for default profile -->
<maven.main.skip>true</maven.main.skip>
- <hadoop.version>2.3.0</hadoop.version>
- <pig.version>0.14.0</pig.version>
- <pig.classifier>h2</pig.classifier>
+ <hadoop.version>${hadoop1.version}</hadoop.version>
+ <pig.classifier/>
</properties>
</profile>
<profile>