You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2017/12/13 19:27:59 UTC
[1/4] parquet-mr git commit: PARQUET-1142: Add alternatives to Hadoop
classes in the API
Repository: parquet-mr
Updated Branches:
refs/heads/master 81f480149 -> 8bfd9b4d8
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 9512b93..bdde70e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -28,6 +28,8 @@ import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
/**
@@ -219,7 +221,8 @@ public class ParquetWriter<T> implements Closeable {
boolean validating,
WriterVersion writerVersion,
Configuration conf) throws IOException {
- this(file, mode, writeSupport, compressionCodecName, blockSize,
+ this(HadoopOutputFile.fromPath(file, conf),
+ mode, writeSupport, compressionCodecName, blockSize,
validating, conf, MAX_PADDING_SIZE_DEFAULT,
ParquetProperties.builder()
.withPageSize(pageSize)
@@ -257,11 +260,11 @@ public class ParquetWriter<T> implements Closeable {
}
ParquetWriter(
- Path file,
+ OutputFile file,
ParquetFileWriter.Mode mode,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
- int blockSize,
+ int rowGroupSize,
boolean validating,
Configuration conf,
int maxPaddingSize,
@@ -271,7 +274,7 @@ public class ParquetWriter<T> implements Closeable {
MessageType schema = writeContext.getSchema();
ParquetFileWriter fileWriter = new ParquetFileWriter(
- conf, schema, file, mode, blockSize, maxPaddingSize);
+ file, schema, mode, rowGroupSize, maxPaddingSize);
fileWriter.start();
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
@@ -281,7 +284,7 @@ public class ParquetWriter<T> implements Closeable {
writeSupport,
schema,
writeContext.getExtraMetaData(),
- blockSize,
+ rowGroupSize,
compressor,
validating,
encodingProps);
@@ -324,7 +327,8 @@ public class ParquetWriter<T> implements Closeable {
* @param <SELF> The type of this builder that is returned by builder methods
*/
public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
- private final Path file;
+ private OutputFile file = null;
+ private Path path = null;
private Configuration conf = new Configuration();
private ParquetFileWriter.Mode mode;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
@@ -334,8 +338,12 @@ public class ParquetWriter<T> implements Closeable {
private ParquetProperties.Builder encodingPropsBuilder =
ParquetProperties.builder();
- protected Builder(Path file) {
- this.file = file;
+ protected Builder(Path path) {
+ this.path = path;
+ }
+
+ protected Builder(OutputFile path) {
+ this.file = path;
}
/**
@@ -485,15 +493,35 @@ public class ParquetWriter<T> implements Closeable {
}
/**
+ * Set a property that will be available to the read path. For writers that use a Hadoop
+ * configuration, this is the recommended way to add configuration values.
+ *
+ * @param property a String property name
+ * @param value a String property value
+ * @return this builder for method chaining.
+ */
+ public SELF config(String property, String value) {
+ conf.set(property, value);
+ return self();
+ }
+
+ /**
* Build a {@link ParquetWriter} with the accumulated configuration.
*
* @return a configured {@code ParquetWriter} instance.
* @throws IOException
*/
public ParquetWriter<T> build() throws IOException {
- return new ParquetWriter<T>(file, mode, getWriteSupport(conf), codecName,
- rowGroupSize, enableValidation, conf, maxPaddingSize,
- encodingPropsBuilder.build());
+ if (file != null) {
+ return new ParquetWriter<>(file,
+ mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf,
+ maxPaddingSize, encodingPropsBuilder.build());
+ } else {
+ return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf),
+ mode, getWriteSupport(conf), codecName,
+ rowGroupSize, enableValidation, conf, maxPaddingSize,
+ encodingPropsBuilder.build());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
index 4696319..a70a0d0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
@@ -20,10 +20,12 @@ package org.apache.parquet.hadoop;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
// Essentially taken from:
// https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java#L124
@@ -60,6 +62,10 @@ public class UnmaterializableRecordCounter {
);
}
+ public UnmaterializableRecordCounter(ParquetReadOptions options, long totalNumRecords) {
+ this(getFloat(options, BAD_RECORD_THRESHOLD_CONF_KEY, DEFAULT_THRESHOLD), totalNumRecords);
+ }
+
public UnmaterializableRecordCounter(double errorThreshold, long totalNumRecords) {
this.errorThreshold = errorThreshold;
this.totalNumRecords = totalNumRecords;
@@ -85,4 +91,13 @@ public class UnmaterializableRecordCounter {
throw new ParquetDecodingException(message, cause);
}
}
+
+ private static float getFloat(ParquetReadOptions options, String key, float defaultValue) {
+ String value = options.getProperty(key);
+ if (value != null) {
+ return Float.valueOf(value);
+ } else {
+ return defaultValue;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
deleted file mode 100644
index 9657cc1..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.codec;
-
-/**
- * This exception will be thrown when the codec is not supported by parquet, meaning there is no
- * matching codec defined in {@link org.apache.parquet.hadoop.metadata.CompressionCodecName}
- */
-public class CompressionCodecNotSupportedException extends RuntimeException {
- private final Class codecClass;
-
- public CompressionCodecNotSupportedException(Class codecClass) {
- super("codec not supported: " + codecClass.getName());
- this.codecClass = codecClass;
- }
-
- public Class getCodecClass() {
- return codecClass;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
deleted file mode 100644
index 153133e..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.metadata;
-
-import org.apache.parquet.format.CompressionCodec;
-import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException;
-
-import java.util.Locale;
-
-public enum CompressionCodecName {
- UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""),
- SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", CompressionCodec.SNAPPY, ".snappy"),
- GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
- LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
- BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"),
- LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
- ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, ".zstd");
-
- public static CompressionCodecName fromConf(String name) {
- if (name == null) {
- return UNCOMPRESSED;
- }
- return valueOf(name.toUpperCase(Locale.ENGLISH));
- }
-
- public static CompressionCodecName fromCompressionCodec(Class<?> clazz) {
- if (clazz == null) {
- return UNCOMPRESSED;
- }
- String name = clazz.getName();
- for (CompressionCodecName codec : CompressionCodecName.values()) {
- if (name.equals(codec.getHadoopCompressionCodecClassName())) {
- return codec;
- }
- }
- throw new CompressionCodecNotSupportedException(clazz);
- }
-
- public static CompressionCodecName fromParquet(CompressionCodec codec) {
- for (CompressionCodecName codecName : CompressionCodecName.values()) {
- if (codec.equals(codecName.parquetCompressionCodec)) {
- return codecName;
- }
- }
- throw new IllegalArgumentException("Unknown compression codec " + codec);
- }
-
- private final String hadoopCompressionCodecClass;
- private final CompressionCodec parquetCompressionCodec;
- private final String extension;
-
- private CompressionCodecName(String hadoopCompressionCodecClass, CompressionCodec parquetCompressionCodec, String extension) {
- this.hadoopCompressionCodecClass = hadoopCompressionCodecClass;
- this.parquetCompressionCodec = parquetCompressionCodec;
- this.extension = extension;
- }
-
- public String getHadoopCompressionCodecClassName() {
- return hadoopCompressionCodecClass;
- }
-
- public Class getHadoopCompressionCodecClass() {
- String codecClassName = getHadoopCompressionCodecClassName();
- if (codecClassName==null) {
- return null;
- }
- try {
- return Class.forName(codecClassName);
- } catch (ClassNotFoundException e) {
- return null;
- }
- }
-
- public CompressionCodec getParquetCompressionCodec() {
- return parquetCompressionCodec;
- }
-
- public String getExtension() {
- return extension;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
index 4a03b1a..876a1f3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
@@ -20,32 +20,23 @@
package org.apache.parquet.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.io.SeekableInputStream;
-import java.io.EOFException;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
/**
* SeekableInputStream implementation that implements read(ByteBuffer) for
* Hadoop 1 FSDataInputStream.
*/
-class H1SeekableInputStream extends SeekableInputStream {
-
- private final int COPY_BUFFER_SIZE = 8192;
- private final byte[] temp = new byte[COPY_BUFFER_SIZE];
+class H1SeekableInputStream extends DelegatingSeekableInputStream {
private final FSDataInputStream stream;
public H1SeekableInputStream(FSDataInputStream stream) {
+ super(stream);
this.stream = stream;
}
@Override
- public void close() throws IOException {
- stream.close();
- }
-
- @Override
public long getPos() throws IOException {
return stream.getPos();
}
@@ -56,16 +47,6 @@ class H1SeekableInputStream extends SeekableInputStream {
}
@Override
- public int read() throws IOException {
- return stream.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return stream.read(b, off, len);
- }
-
- @Override
public void readFully(byte[] bytes) throws IOException {
stream.readFully(bytes, 0, bytes.length);
}
@@ -75,80 +56,4 @@ class H1SeekableInputStream extends SeekableInputStream {
stream.readFully(bytes);
}
- @Override
- public int read(ByteBuffer buf) throws IOException {
- if (buf.hasArray()) {
- return readHeapBuffer(stream, buf);
- } else {
- return readDirectBuffer(stream, buf, temp);
- }
- }
-
- @Override
- public void readFully(ByteBuffer buf) throws IOException {
- if (buf.hasArray()) {
- readFullyHeapBuffer(stream, buf);
- } else {
- readFullyDirectBuffer(stream, buf, temp);
- }
- }
-
- // Visible for testing
- static int readHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException {
- int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
- if (bytesRead < 0) {
- // if this resulted in EOF, don't update position
- return bytesRead;
- } else {
- buf.position(buf.position() + bytesRead);
- return bytesRead;
- }
- }
-
- // Visible for testing
- static void readFullyHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException {
- f.readFully(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
- buf.position(buf.limit());
- }
-
- // Visible for testing
- static int readDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException {
- // copy all the bytes that return immediately, stopping at the first
- // read that doesn't return a full buffer.
- int nextReadLength = Math.min(buf.remaining(), temp.length);
- int totalBytesRead = 0;
- int bytesRead;
-
- while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) {
- buf.put(temp);
- totalBytesRead += bytesRead;
- nextReadLength = Math.min(buf.remaining(), temp.length);
- }
-
- if (bytesRead < 0) {
- // return -1 if nothing was read
- return totalBytesRead == 0 ? -1 : totalBytesRead;
- } else {
- // copy the last partial buffer
- buf.put(temp, 0, bytesRead);
- totalBytesRead += bytesRead;
- return totalBytesRead;
- }
- }
-
- // Visible for testing
- static void readFullyDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException {
- int nextReadLength = Math.min(buf.remaining(), temp.length);
- int bytesRead = 0;
-
- while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) {
- buf.put(temp, 0, bytesRead);
- nextReadLength = Math.min(buf.remaining(), temp.length);
- }
-
- if (bytesRead < 0 && buf.remaining() > 0) {
- throw new EOFException(
- "Reached the end of stream. Still have: " + buf.remaining() + " bytes left");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
index ec4567e..c68f6b6 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -20,7 +20,7 @@
package org.apache.parquet.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -29,7 +29,7 @@ import java.nio.ByteBuffer;
* SeekableInputStream implementation for FSDataInputStream that implements
* ByteBufferReadable in Hadoop 2.
*/
-class H2SeekableInputStream extends SeekableInputStream {
+class H2SeekableInputStream extends DelegatingSeekableInputStream {
// Visible for testing
interface Reader {
@@ -40,6 +40,7 @@ class H2SeekableInputStream extends SeekableInputStream {
private final Reader reader;
public H2SeekableInputStream(FSDataInputStream stream) {
+ super(stream);
this.stream = stream;
this.reader = new H2Reader();
}
@@ -60,21 +61,6 @@ class H2SeekableInputStream extends SeekableInputStream {
}
@Override
- public int read() throws IOException {
- return stream.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return stream.read(b, off, len);
- }
-
- @Override
- public void readFully(byte[] bytes) throws IOException {
- stream.readFully(bytes, 0, bytes.length);
- }
-
- @Override
public void readFully(byte[] bytes, int start, int len) throws IOException {
stream.readFully(bytes);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
new file mode 100644
index 0000000..a46c8db
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+
+public class HadoopCodecs {
+ public static CompressionCodecFactory newFactory(int sizeHint) {
+ return new CodecFactory(new Configuration(), sizeHint);
+ }
+
+ public static CompressionCodecFactory newFactory(Configuration conf, int sizeHint) {
+ return new CodecFactory(conf, sizeHint);
+ }
+
+ public static CompressionCodecFactory newDirectFactory(Configuration conf, ByteBufferAllocator allocator, int sizeHint) {
+ return CodecFactory.createDirectCodecFactory(conf, allocator, sizeHint);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
new file mode 100644
index 0000000..4740fd4
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopOutputFile.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class HadoopOutputFile implements OutputFile {
+ // need to supply a buffer size when setting block size. this is the default
+ // for hadoop 1 to present. copying it avoids loading DFSConfigKeys.
+ private static final int DFS_BUFFER_SIZE_DEFAULT = 4096;
+
+ private static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>();
+ static {
+ BLOCK_FS_SCHEMES.add("hdfs");
+ BLOCK_FS_SCHEMES.add("webhdfs");
+ BLOCK_FS_SCHEMES.add("viewfs");
+ }
+
+ // visible for testing
+ public static Set<String> getBlockFileSystems() {
+ return BLOCK_FS_SCHEMES;
+ }
+
+ private static boolean supportsBlockSize(FileSystem fs) {
+ return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme());
+ }
+
+ private final FileSystem fs;
+ private final Path path;
+ private final Configuration conf;
+
+ public static HadoopOutputFile fromPath(Path path, Configuration conf)
+ throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ return new HadoopOutputFile(fs, fs.makeQualified(path), conf);
+ }
+
+ private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) {
+ this.fs = fs;
+ this.path = path;
+ this.conf = conf;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) throws IOException {
+ return HadoopStreams.wrap(fs.create(path, false /* do not overwrite */,
+ DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(path),
+ Math.max(fs.getDefaultBlockSize(path), blockSizeHint)));
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
+ return HadoopStreams.wrap(fs.create(path, true /* overwrite if exists */,
+ DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(path),
+ Math.max(fs.getDefaultBlockSize(path), blockSizeHint)));
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return supportsBlockSize(fs);
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return fs.getDefaultBlockSize(path);
+ }
+
+ @Override
+ public String toString() {
+ return path.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
new file mode 100644
index 0000000..4b194aa
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.io.PositionOutputStream;
+import java.io.IOException;
+
+public class HadoopPositionOutputStream extends PositionOutputStream {
+ private final FSDataOutputStream wrapped;
+
+ HadoopPositionOutputStream(FSDataOutputStream wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return wrapped.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ wrapped.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ wrapped.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ wrapped.write(b, off, len);
+ }
+
+ public void sync() throws IOException {
+ wrapped.hsync();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ wrapped.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ wrapped.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index 8731bd6..c35e98f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -20,8 +20,11 @@
package org.apache.parquet.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.Preconditions;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.PositionOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +49,7 @@ public class HadoopStreams {
* @return a SeekableInputStream
*/
public static SeekableInputStream wrap(FSDataInputStream stream) {
+ Preconditions.checkNotNull(stream, "Cannot wrap a null input stream");
if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
try {
@@ -99,4 +103,15 @@ public class HadoopStreams {
return null;
}
+ /**
+ * Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream}
+ * implementation for Parquet writers.
+ *
+ * @param stream a Hadoop FSDataOutputStream
+ * @return a SeekableOutputStream
+ */
+ public static PositionOutputStream wrap(FSDataOutputStream stream) {
+ Preconditions.checkNotNull(stream, "Cannot wrap a null output stream");
+ return new HadoopPositionOutputStream(stream);
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
index 0ac9c0f..8e3e6c7 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputOutputFormatWithPadding.java
@@ -35,6 +35,8 @@ import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
@@ -105,7 +107,7 @@ public class TestInputOutputFormatWithPadding {
@Test
public void testBasicBehaviorWithPadding() throws Exception {
- ParquetFileWriter.BLOCK_FS_SCHEMES.add("file");
+ HadoopOutputFile.getBlockFileSystems().add("file");
File inputFile = temp.newFile();
FileOutputStream out = new FileOutputStream(inputFile);
@@ -186,7 +188,7 @@ public class TestInputOutputFormatWithPadding {
Assert.assertEquals("Should match written file content",
FILE_CONTENT, reconstructed);
- ParquetFileWriter.BLOCK_FS_SCHEMES.remove("file");
+ HadoopOutputFile.getBlockFileSystems().remove("file");
}
private void waitForJob(Job job) throws Exception {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 1442e04..6915c86 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
new file mode 100644
index 0000000..b41b3c8
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+class MockHadoopInputStream extends ByteArrayInputStream
+ implements Seekable, PositionedReadable {
+ static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ private int[] lengths;
+ private int current = 0;
+ MockHadoopInputStream(int... actualReadLengths) {
+ super(TEST_ARRAY);
+ this.lengths = actualReadLengths;
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) {
+ if (current < lengths.length) {
+ if (len <= lengths[current]) {
+ // when len == lengths[current], the next read will by 0 bytes
+ int bytesRead = super.read(b, off, len);
+ lengths[current] -= bytesRead;
+ return bytesRead;
+ } else {
+ int bytesRead = super.read(b, off, lengths[current]);
+ current += 1;
+ return bytesRead;
+ }
+ } else {
+ return super.read(b, off, len);
+ }
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ seek(position);
+ return read(buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ throw new UnsupportedOperationException("Not actually supported.");
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ throw new UnsupportedOperationException("Not actually supported.");
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ this.pos = (int) pos;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.pos;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ seek(targetPos);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
deleted file mode 100644
index a112288..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.hadoop.util;
-
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-class MockInputStream extends ByteArrayInputStream
- implements Seekable, PositionedReadable {
- static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-
- private int[] lengths;
- private int current = 0;
- MockInputStream(int... actualReadLengths) {
- super(TEST_ARRAY);
- this.lengths = actualReadLengths;
- }
-
- @Override
- public synchronized int read(byte[] b, int off, int len) {
- if (current < lengths.length) {
- if (len <= lengths[current]) {
- // when len == lengths[current], the next read will by 0 bytes
- int bytesRead = super.read(b, off, len);
- lengths[current] -= bytesRead;
- return bytesRead;
- } else {
- int bytesRead = super.read(b, off, lengths[current]);
- current += 1;
- return bytesRead;
- }
- } else {
- return super.read(b, off, len);
- }
- }
-
- @Override
- public int read(long position, byte[] buffer, int offset, int length) throws IOException {
- seek(position);
- return read(buffer, offset, length);
- }
-
- @Override
- public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
- throw new UnsupportedOperationException("Not actually supported.");
- }
-
- @Override
- public void readFully(long position, byte[] buffer) throws IOException {
- throw new UnsupportedOperationException("Not actually supported.");
- }
-
- @Override
- public void seek(long pos) throws IOException {
- this.pos = (int) pos;
- }
-
- @Override
- public long getPos() throws IOException {
- return this.pos;
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- seek(targetPos);
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
deleted file mode 100644
index 9e4e2a9..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
+++ /dev/null
@@ -1,761 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.hadoop.util;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.TestUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import java.io.EOFException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Callable;
-
-import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
-
-public class TestHadoop1ByteBufferReads {
-
- private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() {
- @Override
- protected byte[] initialValue() {
- return new byte[8192];
- }
- };
-
- @Test
- public void testHeapRead() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(-1, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapSmallBuffer() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(5);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(5, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(5, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(0, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
- }
-
- @Test
- public void testHeapSmallReads() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(2, len);
- Assert.assertEquals(2, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(3, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(3, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(2, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapPosition() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
- readBuffer.position(10);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8));
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(8, len);
- Assert.assertEquals(18, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(2, len);
- Assert.assertEquals(20, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(-1, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapLimit() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
- readBuffer.limit(8);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, len);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(1, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(0, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testHeapPositionAndLimit() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
- readBuffer.position(5);
- readBuffer.limit(13);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, len);
- Assert.assertEquals(12, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(1, len);
- Assert.assertEquals(13, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(0, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testDirectRead() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(-1, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectSmallBuffer() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(5);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(5, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(5, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(0, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
- }
-
- @Test
- public void testDirectSmallReads() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(2, len);
- Assert.assertEquals(2, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(3, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(3, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(2, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectPosition() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
- readBuffer.position(10);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(8, len);
- Assert.assertEquals(18, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(2, len);
- Assert.assertEquals(20, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(-1, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectLimit() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(20);
- readBuffer.limit(8);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, len);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(1, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(0, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testDirectPositionAndLimit() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
- readBuffer.position(5);
- readBuffer.limit(13);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, len);
- Assert.assertEquals(12, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(1, len);
- Assert.assertEquals(13, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(0, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testDirectSmallTempBufferSmallReads() throws Exception {
- byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
-
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(2, len);
- Assert.assertEquals(2, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(3, len);
- Assert.assertEquals(5, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(3, len);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(2, len);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(-1, len);
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception {
- byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
-
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
- readBuffer.position(5);
- readBuffer.limit(13);
- readBuffer.mark();
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
-
- int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(7, len);
- Assert.assertEquals(12, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(1, len);
- Assert.assertEquals(13, readBuffer.position());
- Assert.assertEquals(13, readBuffer.limit());
-
- len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(0, len);
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testHeapReadFullySmallBuffer() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocate(8);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testHeapReadFullyLargeBuffer() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(20);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- TestUtils.assertThrows("Should throw EOFException",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- return null;
- }
- });
-
- Assert.assertEquals(0, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
- }
-
- @Test
- public void testHeapReadFullyJustRight() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- // reads all of the bytes available without EOFException
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- // trying to read 0 more bytes doesn't result in EOFException
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapReadFullySmallReads() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapReadFullyPosition() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
- readBuffer.position(3);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-
- @Test
- public void testHeapReadFullyLimit() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
- readBuffer.limit(7);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testHeapReadFullyPositionAndLimit() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocate(10);
- readBuffer.position(3);
- readBuffer.limit(7);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-
- @Test
- public void testDirectReadFullySmallBuffer() throws Exception {
- ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
-
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(8, readBuffer.position());
- Assert.assertEquals(8, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
- }
-
- @Test
- public void testDirectReadFullyLargeBuffer() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- TestUtils.assertThrows("Should throw EOFException",
- EOFException.class, new Callable() {
- @Override
- public Object call() throws Exception {
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- return null;
- }
- });
-
- // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
- // several read operations that will read up to the end of the input. This
- // is a correct value because the bytes in the buffer are valid. This
- // behavior can't be implemented for the heap buffer without using the read
- // method instead of the readFully method on the underlying
- // FSDataInputStream.
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(20, readBuffer.limit());
- }
-
- @Test
- public void testDirectReadFullyJustRight() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
-
- // reads all of the bytes available without EOFException
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- // trying to read 0 more bytes doesn't result in EOFException
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectReadFullySmallReads() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectReadFullyPosition() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- readBuffer.position(3);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-
- @Test
- public void testDirectReadFullyLimit() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- readBuffer.limit(7);
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.flip();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY), readBuffer);
- }
-
- @Test
- public void testDirectReadFullyPositionAndLimit() throws Exception {
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- readBuffer.position(3);
- readBuffer.limit(7);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-
- @Test
- public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws Exception {
- byte[] temp = new byte[2]; // this will cause readFully to loop
-
- final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- readBuffer.position(3);
- readBuffer.limit(7);
- readBuffer.mark();
-
- final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(7, readBuffer.position());
- Assert.assertEquals(7, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
-
- readBuffer.position(7);
- readBuffer.limit(10);
- H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
- Assert.assertEquals(10, readBuffer.position());
- Assert.assertEquals(10, readBuffer.limit());
-
- readBuffer.reset();
- Assert.assertEquals("Buffer contents should match",
- ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
index 86b903c..68c9b3b 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -28,7 +28,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
-import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
+import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY;
public class TestHadoop2ByteBufferReads {
@@ -59,7 +59,7 @@ public class TestHadoop2ByteBufferReads {
public void testHeapReadFullySmallBuffer() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(8);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -79,7 +79,7 @@ public class TestHadoop2ByteBufferReads {
public void testHeapReadFullyLargeBuffer() throws Exception {
final ByteBuffer readBuffer = ByteBuffer.allocate(20);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
@@ -105,7 +105,7 @@ public class TestHadoop2ByteBufferReads {
public void testHeapReadFullyJustRight() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
// reads all of the bytes available without EOFException
@@ -127,7 +127,7 @@ public class TestHadoop2ByteBufferReads {
public void testHeapReadFullySmallReads() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -149,7 +149,7 @@ public class TestHadoop2ByteBufferReads {
readBuffer.position(3);
readBuffer.mark();
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -170,7 +170,7 @@ public class TestHadoop2ByteBufferReads {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
readBuffer.limit(7);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -203,7 +203,7 @@ public class TestHadoop2ByteBufferReads {
readBuffer.limit(7);
readBuffer.mark();
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -233,7 +233,7 @@ public class TestHadoop2ByteBufferReads {
public void testDirectReadFullySmallBuffer() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -253,7 +253,7 @@ public class TestHadoop2ByteBufferReads {
public void testDirectReadFullyLargeBuffer() throws Exception {
final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
@@ -279,7 +279,7 @@ public class TestHadoop2ByteBufferReads {
public void testDirectReadFullyJustRight() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
// reads all of the bytes available without EOFException
@@ -301,7 +301,7 @@ public class TestHadoop2ByteBufferReads {
public void testDirectReadFullySmallReads() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -323,7 +323,7 @@ public class TestHadoop2ByteBufferReads {
readBuffer.position(3);
readBuffer.mark();
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -344,7 +344,7 @@ public class TestHadoop2ByteBufferReads {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
readBuffer.limit(7);
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
H2SeekableInputStream.Reader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
@@ -377,7 +377,7 @@ public class TestHadoop2ByteBufferReads {
readBuffer.limit(7);
readBuffer.mark();
- FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+ FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
----------------------------------------------------------------------
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index 5d79a49..fe64587 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.FileMetaData;
@@ -91,7 +92,7 @@ public class MergeCommand extends ArgsOnlyCommand {
tooSmallFilesMerged = true;
}
- writer.appendFile(conf, input);
+ writer.appendFile(HadoopInputFile.fromPath(input, conf));
}
if (tooSmallFilesMerged) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 44b0b62..05e3e47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,7 +262,14 @@
<exclude>org/apache/parquet/avro/SpecificDataSupplier</exclude> <!-- made public -->
<exclude>org/apache/parquet/io/ColumnIOFactory$ColumnIOCreatorVisitor</exclude> <!-- removed non-API class -->
<exclude>org/apache/parquet/io/ColumnIOFactory/**</exclude> <!-- removed non-API class and methods-->
- <exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added synchronized modifier -->
+ <exclude>org/apache/parquet/hadoop/codec/SnappyCompressor</exclude> <!-- added synchronized modifier -->
+ <exclude>org/apache/parquet/bytes/BytesInput</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/bytes/CapacityByteArrayOutputStream</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/bytes/ConcatenatingByteArrayCollector</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/bytes/LittleEndianDataInputStream</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/bytes/LittleEndianDataOutputStream</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/hadoop/metadata/CompressionCodecName</exclude> <!-- moved to parquet-common -->
+ <exclude>org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException</exclude> <!-- moved to parquet-common -->
</excludes>
</requireBackwardCompatibility>
</rules>
[2/4] parquet-mr git commit: PARQUET-1142: Add alternatives to Hadoop
classes in the API
Posted by bl...@apache.org.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
new file mode 100644
index 0000000..87c8ac9
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+
+import java.util.Map;
+
+import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
+
+public class HadoopReadOptions extends ParquetReadOptions {
+ private final Configuration conf;
+
+ private HadoopReadOptions(boolean useSignedStringMinMax,
+ boolean useStatsFilter,
+ boolean useDictionaryFilter,
+ boolean useRecordFilter,
+ FilterCompat.Filter recordFilter,
+ MetadataFilter metadataFilter,
+ CompressionCodecFactory codecFactory,
+ ByteBufferAllocator allocator,
+ Map<String, String> properties,
+ Configuration conf) {
+ super(
+ useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, recordFilter,
+ metadataFilter, codecFactory, allocator, properties
+ );
+ this.conf = conf;
+ }
+
+ @Override
+ public String getProperty(String property) {
+ String value = super.getProperty(property);
+ if (value != null) {
+ return value;
+ }
+ return conf.get(property);
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public static Builder builder(Configuration conf) {
+ return new Builder(conf);
+ }
+
+ public static class Builder extends ParquetReadOptions.Builder {
+ private final Configuration conf;
+
+ public Builder(Configuration conf) {
+ this.conf = conf;
+ useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
+ useDictionaryFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
+ useStatsFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
+ useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
+ withCodecFactory(HadoopCodecs.newFactory(conf, 0));
+ withRecordFilter(getFilter(conf));
+ String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
+ if (badRecordThresh != null) {
+ set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
+ }
+ }
+
+ @Override
+ public ParquetReadOptions build() {
+ return new HadoopReadOptions(
+ useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+ recordFilter, metadataFilter, codecFactory, allocator, properties, conf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
new file mode 100644
index 0000000..5f2f0a8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet;
+
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+// Internal use only
+public class ParquetReadOptions {
+ private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
+ private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
+ private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
+
+ private final boolean useSignedStringMinMax;
+ private final boolean useStatsFilter;
+ private final boolean useDictionaryFilter;
+ private final boolean useRecordFilter;
+ private final FilterCompat.Filter recordFilter;
+ private final ParquetMetadataConverter.MetadataFilter metadataFilter;
+ private final CompressionCodecFactory codecFactory;
+ private final ByteBufferAllocator allocator;
+ private final Map<String, String> properties;
+
+ ParquetReadOptions(boolean useSignedStringMinMax,
+ boolean useStatsFilter,
+ boolean useDictionaryFilter,
+ boolean useRecordFilter,
+ FilterCompat.Filter recordFilter,
+ ParquetMetadataConverter.MetadataFilter metadataFilter,
+ CompressionCodecFactory codecFactory,
+ ByteBufferAllocator allocator,
+ Map<String, String> properties) {
+ this.useSignedStringMinMax = useSignedStringMinMax;
+ this.useStatsFilter = useStatsFilter;
+ this.useDictionaryFilter = useDictionaryFilter;
+ this.useRecordFilter = useRecordFilter;
+ this.recordFilter = recordFilter;
+ this.metadataFilter = metadataFilter;
+ this.codecFactory = codecFactory;
+ this.allocator = allocator;
+ this.properties = Collections.unmodifiableMap(properties);
+ }
+
+ public boolean useSignedStringMinMax() {
+ return useSignedStringMinMax;
+ }
+
+ public boolean useStatsFilter() {
+ return useStatsFilter;
+ }
+
+ public boolean useDictionaryFilter() {
+ return useDictionaryFilter;
+ }
+
+ public boolean useRecordFilter() {
+ return useRecordFilter;
+ }
+
+ public FilterCompat.Filter getRecordFilter() {
+ return recordFilter;
+ }
+
+ public ParquetMetadataConverter.MetadataFilter getMetadataFilter() {
+ return metadataFilter;
+ }
+
+ public CompressionCodecFactory getCodecFactory() {
+ return codecFactory;
+ }
+
+ public ByteBufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ public Set<String> getPropertyNames() {
+ return properties.keySet();
+ }
+
+ public String getProperty(String property) {
+ return properties.get(property);
+ }
+
+ public boolean isEnabled(String property, boolean defaultValue) {
+ if (properties.containsKey(property)) {
+ return Boolean.valueOf(properties.get(property));
+ } else {
+ return defaultValue;
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ boolean useSignedStringMinMax = false;
+ boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
+ boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
+ boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
+ FilterCompat.Filter recordFilter = null;
+ ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
+ // the page size parameter isn't used when only using the codec factory to get decompressors
+ CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+ ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+ Map<String, String> properties = new HashMap<>();
+
+ public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
+ this.useSignedStringMinMax = useSignedStringMinMax;
+ return this;
+ }
+
+ public Builder useSignedStringMinMax() {
+ this.useSignedStringMinMax = true;
+ return this;
+ }
+
+ public Builder useStatsFilter(boolean useStatsFilter) {
+ this.useStatsFilter = useStatsFilter;
+ return this;
+ }
+
+ public Builder useStatsFilter() {
+ this.useStatsFilter = true;
+ return this;
+ }
+
+ public Builder useDictionaryFilter(boolean useDictionaryFilter) {
+ this.useDictionaryFilter = useDictionaryFilter;
+ return this;
+ }
+
+ public Builder useDictionaryFilter() {
+ this.useDictionaryFilter = true;
+ return this;
+ }
+
+ public Builder useRecordFilter(boolean useRecordFilter) {
+ this.useRecordFilter = useRecordFilter;
+ return this;
+ }
+
+ public Builder useRecordFilter() {
+ this.useRecordFilter = true;
+ return this;
+ }
+
+ public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) {
+ this.recordFilter = rowGroupFilter;
+ return this;
+ }
+
+ public Builder withRange(long start, long end) {
+ this.metadataFilter = ParquetMetadataConverter.range(start, end);
+ return this;
+ }
+
+ public Builder withOffsets(long... rowGroupOffsets) {
+ this.metadataFilter = ParquetMetadataConverter.offsets(rowGroupOffsets);
+ return this;
+ }
+
+ public Builder withMetadataFilter(ParquetMetadataConverter.MetadataFilter metadataFilter) {
+ this.metadataFilter = metadataFilter;
+ return this;
+ }
+
+ public Builder withCodecFactory(CompressionCodecFactory codecFactory) {
+ this.codecFactory = codecFactory;
+ return this;
+ }
+
+ public Builder withAllocator(ByteBufferAllocator allocator) {
+ this.allocator = allocator;
+ return this;
+ }
+
+ public Builder set(String key, String value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ public Builder copy(ParquetReadOptions options) {
+ useSignedStringMinMax(options.useSignedStringMinMax);
+ useStatsFilter(options.useStatsFilter);
+ useDictionaryFilter(options.useDictionaryFilter);
+ useRecordFilter(options.useRecordFilter);
+ withRecordFilter(options.recordFilter);
+ withMetadataFilter(options.metadataFilter);
+ withCodecFactory(options.codecFactory);
+ withAllocator(options.allocator);
+ for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
+ set(keyValue.getKey(), keyValue.getValue());
+ }
+ return this;
+ }
+
+ public ParquetReadOptions build() {
+ return new ParquetReadOptions(
+ useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+ recordFilter, metadataFilter, codecFactory, allocator, properties);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
index fd74799..68c38ce 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
@@ -51,6 +51,10 @@ public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
DICTIONARY
}
+ /**
+ * @deprecated will be removed in 2.0.0.
+ */
+ @Deprecated
public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
checkNotNull(filter, "filter");
return filter.accept(new RowGroupFilter(blocks, schema));
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index aeb6152..bba7e62 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -38,6 +38,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.CorruptStatistics;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.PageEncodingStats;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.ColumnChunk;
@@ -89,10 +91,18 @@ public class ParquetMetadataConverter {
this(false);
}
+ /**
+ * @deprecated will be removed in 2.0.0; use {@code ParquetMetadataConverter(ParquetReadOptions)}
+ */
+ @Deprecated
public ParquetMetadataConverter(Configuration conf) {
this(conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
}
+ public ParquetMetadataConverter(ParquetReadOptions options) {
+ this(options.useSignedStringMinMax());
+ }
+
private ParquetMetadataConverter(boolean useSignedStringMinMax) {
this.useSignedStringMinMax = useSignedStringMinMax;
}
@@ -193,7 +203,7 @@ public class ParquetMetadataConverter {
getType(columnMetaData.getType()),
toFormatEncodings(columnMetaData.getEncodings()),
Arrays.asList(columnMetaData.getPath().toArray()),
- columnMetaData.getCodec().getParquetCompressionCodec(),
+ toFormatCodec(columnMetaData.getCodec()),
columnMetaData.getValueCount(),
columnMetaData.getTotalUncompressedSize(),
columnMetaData.getTotalSize(),
@@ -246,6 +256,14 @@ public class ParquetMetadataConverter {
return cached;
}
+ private CompressionCodecName fromFormatCodec(CompressionCodec codec) {
+ return CompressionCodecName.valueOf(codec.toString());
+ }
+
+ private CompressionCodec toFormatCodec(CompressionCodecName codec) {
+ return CompressionCodec.valueOf(codec.toString());
+ }
+
public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) {
return org.apache.parquet.column.Encoding.valueOf(encoding.name());
}
@@ -820,7 +838,7 @@ public class ParquetMetadataConverter {
ColumnChunkMetaData column = ColumnChunkMetaData.get(
path,
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
- CompressionCodecName.fromParquet(metaData.codec),
+ fromFormatCodec(metaData.codec),
convertEncodingStats(metaData.getEncoding_stats()),
fromFormatEncodings(metaData.encodings),
fromParquetStatistics(
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 8bf882f..8befa79 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -36,9 +36,10 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-public class CodecFactory {
+public class CodecFactory implements CompressionCodecFactory {
protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
.synchronizedMap(new HashMap<String, CompressionCodec>());
@@ -118,7 +119,7 @@ public class CodecFactory {
output.put(decompressed);
}
- protected void release() {
+ public void release() {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
@@ -171,7 +172,7 @@ public class CodecFactory {
}
@Override
- protected void release() {
+ public void release() {
if (compressor != null) {
CodecPool.returnCompressor(compressor);
}
@@ -183,6 +184,7 @@ public class CodecFactory {
}
+ @Override
public BytesCompressor getCompressor(CompressionCodecName codecName) {
BytesCompressor comp = compressors.get(codecName);
if (comp == null) {
@@ -192,6 +194,7 @@ public class CodecFactory {
return comp;
}
+ @Override
public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
BytesDecompressor decomp = decompressors.get(codecName);
if (decomp == null) {
@@ -235,6 +238,7 @@ public class CodecFactory {
}
}
+ @Override
public void release() {
for (BytesCompressor compressor : compressors.values()) {
compressor.release();
@@ -246,15 +250,23 @@ public class CodecFactory {
decompressors.clear();
}
- public static abstract class BytesCompressor {
+ /**
+ * @deprecated will be removed in 2.0.0; use CompressionCodecFactory.BytesInputCompressor instead.
+ */
+ @Deprecated
+ public static abstract class BytesCompressor implements CompressionCodecFactory.BytesInputCompressor {
public abstract BytesInput compress(BytesInput bytes) throws IOException;
public abstract CompressionCodecName getCodecName();
- protected abstract void release();
+ public abstract void release();
}
- public static abstract class BytesDecompressor {
+ /**
+ * @deprecated will be removed in 2.0.0; use CompressionCodecFactory.BytesInputDecompressor instead.
+ */
+ @Deprecated
+ public static abstract class BytesDecompressor implements CompressionCodecFactory.BytesInputDecompressor {
public abstract BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException;
public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException;
- protected abstract void release();
+ public abstract void release();
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index f067679..37dfd6d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -33,6 +33,8 @@ import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
@@ -56,12 +58,12 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
*/
static final class ColumnChunkPageReader implements PageReader {
- private final BytesDecompressor decompressor;
+ private final BytesInputDecompressor decompressor;
private final long valueCount;
private final List<DataPage> compressedPages;
private final DictionaryPage compressedDictionaryPage;
- ColumnChunkPageReader(BytesDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
+ ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
this.decompressor = decompressor;
this.compressedPages = new LinkedList<DataPage>(compressedPages);
this.compressedDictionaryPage = compressedDictionaryPage;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
index 344f3ec..58e79ac 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -179,7 +179,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {
}
@Override
- protected void release() {
+ public void release() {
DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
}
}
@@ -221,7 +221,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {
}
@Override
- protected void release() {
+ public void release() {
DirectCodecPool.INSTANCE.returnDirectDecompressor(decompressor);
extraDecompressor.release();
}
@@ -245,7 +245,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {
}
@Override
- protected void release() {}
+ public void release() {}
}
@@ -269,7 +269,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {
}
@Override
- protected void release() {}
+ public void release() {}
}
public class SnappyCompressor extends BytesCompressor {
@@ -311,7 +311,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {
}
@Override
- protected void release() {
+ public void release() {
outgoing = DirectCodecFactory.this.release(outgoing);
incoming = DirectCodecFactory.this.release(incoming);
}
@@ -333,7 +333,7 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {
}
@Override
- protected void release() {}
+ public void release() {}
}
static class DirectCodecPool {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 88b3d2d..a048878 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -27,6 +27,8 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
@@ -47,7 +49,6 @@ import org.slf4j.LoggerFactory;
import static java.lang.String.format;
import static org.apache.parquet.Preconditions.checkNotNull;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
-import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT;
import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
class InternalParquetRecordReader<T> {
@@ -160,6 +161,34 @@ class InternalParquetRecordReader<T> {
return (float) current / total;
}
+ public void initialize(ParquetFileReader reader, ParquetReadOptions options) {
+ // copy custom configuration to the Configuration passed to the ReadSupport
+ Configuration conf = new Configuration();
+ if (options instanceof HadoopReadOptions) {
+ conf = ((HadoopReadOptions) options).getConf();
+ }
+ for (String property : options.getPropertyNames()) {
+ conf.set(property, options.getProperty(property));
+ }
+
+ // initialize a ReadContext for this file
+ this.reader = reader;
+ FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
+ this.fileSchema = parquetFileMetadata.getSchema();
+ Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
+ ReadSupport.ReadContext readContext = readSupport.init(new InitContext(conf, toSetMultiMap(fileMetadata), fileSchema));
+ this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+ this.requestedSchema = readContext.getRequestedSchema();
+ this.columnCount = requestedSchema.getPaths().size();
+ this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext);
+ this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true);
+ this.total = reader.getRecordCount();
+ this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(options, total);
+ this.filterRecords = options.useRecordFilter();
+ reader.setRequestedSchema(requestedSchema);
+ LOG.info("RecordReader initialized will read a total of {} records.", total);
+ }
+
public void initialize(ParquetFileReader reader, Configuration configuration)
throws IOException {
// initialize a ReadContext for this file
@@ -177,8 +206,7 @@ class InternalParquetRecordReader<T> {
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
this.total = reader.getRecordCount();
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
- this.filterRecords = configuration.getBoolean(
- RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT);
+ this.filterRecords = configuration.getBoolean(RECORD_FILTERING_ENABLED, true);
reader.setRequestedSchema(requestedSchema);
LOG.info("RecordReader initialized will read a total of {} records.", total);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 1815bd6..1ace040 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -26,10 +26,6 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_
import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
-import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT;
-import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
-import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT;
import java.io.Closeable;
import java.io.IOException;
@@ -51,17 +47,16 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -80,15 +75,14 @@ import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
-import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
-import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ParquetDecodingException;
@@ -119,6 +113,7 @@ public class ParquetFileReader implements Closeable {
* @param partFiles the part files to read
* @return the footers for those files using the summary file if possible.
* @throws IOException
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
public static List<Footer> readAllFootersInParallelUsingSummaryFiles(Configuration configuration, List<FileStatus> partFiles) throws IOException {
@@ -137,7 +132,9 @@ public class ParquetFileReader implements Closeable {
* @param skipRowGroups to skipRowGroups in the footers
* @return the footers for those files using the summary file if possible.
* @throws IOException
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
+ @Deprecated
public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
final Configuration configuration,
final Collection<FileStatus> partFiles,
@@ -233,6 +230,9 @@ public class ParquetFileReader implements Closeable {
}
}
+ /**
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
@Deprecated
public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
return readAllFootersInParallel(configuration, partFiles, false);
@@ -246,7 +246,10 @@ public class ParquetFileReader implements Closeable {
* @param skipRowGroups to skip the rowGroup info
* @return the footers
* @throws IOException
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
+ @Deprecated
public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException {
List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
for (final FileStatus currentFile : partFiles) {
@@ -271,7 +274,10 @@ public class ParquetFileReader implements Closeable {
/**
* Read the footers of all the files under that path (recursively)
* not using summary files.
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
+ @Deprecated
public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus, boolean skipRowGroups) throws IOException {
List<FileStatus> statuses = listFiles(configuration, fileStatus);
return readAllFootersInParallel(configuration, statuses, skipRowGroups);
@@ -285,12 +291,18 @@ public class ParquetFileReader implements Closeable {
* @param fileStatus the root dir
* @return all the footers
* @throws IOException
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
+ @Deprecated
public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException {
return readAllFootersInParallel(configuration, fileStatus, false);
}
-
+ /**
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
+ */
@Deprecated
public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException {
return readFooters(configuration, status(configuration, path));
@@ -306,6 +318,8 @@ public class ParquetFileReader implements Closeable {
* @param pathStatus
* @return
* @throws IOException
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
@Deprecated
public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus) throws IOException {
@@ -319,7 +333,10 @@ public class ParquetFileReader implements Closeable {
* @param pathStatus the root dir
* @return all the footers
* @throws IOException
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
+ @Deprecated
public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus, boolean skipRowGroups) throws IOException {
List<FileStatus> files = listFiles(configuration, pathStatus);
return readAllFootersInParallelUsingSummaryFiles(configuration, files, skipRowGroups);
@@ -345,7 +362,9 @@ public class ParquetFileReader implements Closeable {
* @param summaryStatus
* @return the metadata translated for each file
* @throws IOException
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
+ @Deprecated
public static List<Footer> readSummaryFile(Configuration configuration, FileStatus summaryStatus) throws IOException {
final Path parent = summaryStatus.getPath().getParent();
ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, filter(false));
@@ -394,6 +413,8 @@ public class ParquetFileReader implements Closeable {
* @param file the parquet File
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
@Deprecated
public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException {
@@ -408,13 +429,16 @@ public class ParquetFileReader implements Closeable {
* @param filter the filter to apply to row groups
* @return the metadata with row groups filtered.
* @throws IOException if an error occurs while reading the file
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
}
/**
- * @deprecated use {@link ParquetFileReader#readFooter(Configuration, FileStatus, MetadataFilter)}
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
@Deprecated
public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
@@ -428,7 +452,10 @@ public class ParquetFileReader implements Closeable {
* @param filter the filter to apply to row groups
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
+ @Deprecated
public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
return readFooter(HadoopInputFile.fromStatus(file, configuration), filter);
}
@@ -439,35 +466,32 @@ public class ParquetFileReader implements Closeable {
* @param filter the filter to apply to row groups
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
- public static final ParquetMetadata readFooter(
- InputFile file, MetadataFilter filter) throws IOException {
- ParquetMetadataConverter converter;
- // TODO: remove this temporary work-around.
- // this is necessary to pass the Configuration to ParquetMetadataConverter
- // and should be removed when there is a non-Hadoop configuration.
+ @Deprecated
+ public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException {
+ ParquetReadOptions options;
if (file instanceof HadoopInputFile) {
- converter = new ParquetMetadataConverter(
- ((HadoopInputFile) file).getConfiguration());
+ options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration())
+ .withMetadataFilter(filter).build();
} else {
- converter = new ParquetMetadataConverter();
+ options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
}
- try (SeekableInputStream in = file.newStream()) {
- return readFooter(converter, file.getLength(), file.toString(), in, filter);
+ try (SeekableInputStream in = file.newStream()) {
+ return readFooter(file, options, in);
}
}
- /**
- * Reads the meta data block in the footer of the file using provided input stream
- * @param fileLen length of the file
- * @param filePath file location
- * @param f input stream for the file
- * @param filter the filter to apply to row groups
- * @return the metadata blocks in the footer
- * @throws IOException if an error occurs while reading the file
- */
- private static final ParquetMetadata readFooter(ParquetMetadataConverter converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
+ private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
+ ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
+ return readFooter(file, options, f, converter);
+ }
+
+ private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
+ long fileLen = file.getLength();
+ String filePath = file.toString();
LOG.debug("File length {}", fileLen);
int FOOTER_LENGTH_SIZE = 4;
if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
@@ -489,43 +513,75 @@ public class ParquetFileReader implements Closeable {
throw new RuntimeException("corrupted file: the footer index is not within the file: " + footerIndex);
}
f.seek(footerIndex);
- return converter.readParquetMetadata(f, filter);
+ return converter.readParquetMetadata(f, options.getMetadataFilter());
}
+ /**
+ * @deprecated will be removed in 2.0.0; use {@link #open(InputFile)}
+ */
+ @Deprecated
public static ParquetFileReader open(Configuration conf, Path file) throws IOException {
- return new ParquetFileReader(conf, file);
+ return new ParquetFileReader(HadoopInputFile.fromPath(file, conf),
+ HadoopReadOptions.builder(conf).build());
}
+ /**
+ * @deprecated will be removed in 2.0.0; use {@link #open(InputFile,ParquetReadOptions)}
+ */
+ @Deprecated
public static ParquetFileReader open(Configuration conf, Path file, MetadataFilter filter) throws IOException {
- return new ParquetFileReader(conf, file, filter);
+ return open(HadoopInputFile.fromPath(file, conf),
+ HadoopReadOptions.builder(conf).withMetadataFilter(filter).build());
}
+ /**
+ * @deprecated will be removed in 2.0.0
+ */
+ @Deprecated
public static ParquetFileReader open(Configuration conf, Path file, ParquetMetadata footer) throws IOException {
return new ParquetFileReader(conf, file, footer);
}
- private final CodecFactory codecFactory;
+ /**
+ * Open a {@link InputFile file}.
+ *
+ * @param file an input file
+ * @return an open ParquetFileReader
+ */
+ public static ParquetFileReader open(InputFile file) throws IOException {
+ return new ParquetFileReader(file, ParquetReadOptions.builder().build());
+ }
+
+ /**
+ * Open a {@link InputFile file} with {@link ParquetReadOptions options}.
+ *
+ * @param file an input file
+ * @return an open ParquetFileReader
+ */
+ public static ParquetFileReader open(InputFile file, ParquetReadOptions options) throws IOException {
+ return new ParquetFileReader(file, options);
+ }
+
+ private final InputFile file;
private final SeekableInputStream f;
- private final FileStatus fileStatus;
- private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
+ private final ParquetReadOptions options;
+ private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
private final FileMetaData fileMetaData; // may be null
- private final ByteBufferAllocator allocator;
- private final Configuration conf;
+ private final List<BlockMetaData> blocks;
// not final. in some cases, this may be lazily loaded for backward-compat.
private ParquetMetadata footer;
- // blocks can be filtered after they are read (or set in the constructor)
- private List<BlockMetaData> blocks;
private int currentBlock = 0;
private ColumnChunkPageReadStore currentRowGroup = null;
private DictionaryPageReader nextDictionaryReader = null;
/**
- * @deprecated use @link{ParquetFileReader(Configuration configuration, FileMetaData fileMetaData,
- * Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns)} instead
+ * @deprecated use {@link ParquetFileReader(Configuration,FileMetaData,Path,List,List)} instead.
*/
- public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
+ @Deprecated
+ public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks,
+ List<ColumnDescriptor> columns) throws IOException {
this(configuration, null, filePath, blocks, columns);
}
@@ -541,28 +597,14 @@ public class ParquetFileReader implements Closeable {
Configuration configuration, FileMetaData fileMetaData,
Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
this.converter = new ParquetMetadataConverter(configuration);
- this.conf = configuration;
+ this.file = HadoopInputFile.fromPath(filePath, configuration);
this.fileMetaData = fileMetaData;
- FileSystem fs = filePath.getFileSystem(configuration);
- this.f = HadoopStreams.wrap(fs.open(filePath));
- this.fileStatus = fs.getFileStatus(filePath);
- this.blocks = blocks;
+ this.f = file.newStream();
+ this.options = HadoopReadOptions.builder(configuration).build();
+ this.blocks = filterRowGroups(blocks);
for (ColumnDescriptor col : columns) {
paths.put(ColumnPath.get(col.getPath()), col);
}
- // the page size parameter isn't meaningful when only using
- // the codec factory to get decompressors
- this.codecFactory = new CodecFactory(configuration, 0);
- this.allocator = new HeapByteBufferAllocator();
- }
-
- /**
- * @param configuration the Hadoop Configuration
- * @param file Path to a parquet file
- * @throws IOException if the file can not be opened
- */
- private ParquetFileReader(Configuration configuration, Path file) throws IOException {
- this(configuration, file, NO_FILTER);
}
/**
@@ -570,23 +612,13 @@ public class ParquetFileReader implements Closeable {
* @param file Path to a parquet file
* @param filter a {@link MetadataFilter} for selecting row groups
* @throws IOException if the file can not be opened
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileReader(InputFile,MetadataFilter)} instead
*/
+ @Deprecated
public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException {
- this.converter = new ParquetMetadataConverter(conf);
- this.conf = conf;
- FileSystem fs = file.getFileSystem(conf);
- this.fileStatus = fs.getFileStatus(file);
- this.f = HadoopStreams.wrap(fs.open(file));
- this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
- this.fileMetaData = footer.getFileMetaData();
- this.blocks = footer.getBlocks();
- for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
- paths.put(ColumnPath.get(col.getPath()), col);
- }
- // the page size parameter isn't meaningful when only using
- // the codec factory to get decompressors
- this.codecFactory = new CodecFactory(conf, 0);
- this.allocator = new HeapByteBufferAllocator();
+ this(HadoopInputFile.fromPath(file, conf),
+ HadoopReadOptions.builder(conf).withMetadataFilter(filter).build());
}
/**
@@ -595,29 +627,38 @@ public class ParquetFileReader implements Closeable {
* @param footer a {@link ParquetMetadata} footer already read from the file
* @throws IOException if the file can not be opened
*/
+ @Deprecated
public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) throws IOException {
this.converter = new ParquetMetadataConverter(conf);
- this.conf = conf;
- FileSystem fs = file.getFileSystem(conf);
- this.fileStatus = fs.getFileStatus(file);
- this.f = HadoopStreams.wrap(fs.open(file));
+ this.file = HadoopInputFile.fromPath(file, conf);
+ this.f = this.file.newStream();
+ this.options = HadoopReadOptions.builder(conf).build();
this.footer = footer;
this.fileMetaData = footer.getFileMetaData();
- this.blocks = footer.getBlocks();
+ this.blocks = filterRowGroups(footer.getBlocks());
+ for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
+ paths.put(ColumnPath.get(col.getPath()), col);
+ }
+ }
+
+ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
+ this.converter = new ParquetMetadataConverter(options);
+ this.file = file;
+ this.f = file.newStream();
+ this.options = options;
+ this.footer = readFooter(file, options, f, converter);
+ this.fileMetaData = footer.getFileMetaData();
+ this.blocks = filterRowGroups(footer.getBlocks());
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
- // the page size parameter isn't meaningful when only using
- // the codec factory to get decompressors
- this.codecFactory = new CodecFactory(conf, 0);
- this.allocator = new HeapByteBufferAllocator();
}
public ParquetMetadata getFooter() {
if (footer == null) {
try {
// don't read the row groups because this.blocks is always set
- this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
+ this.footer = readFooter(file, options, f, converter);
} catch (IOException e) {
throw new ParquetDecodingException("Unable to read file footer", e);
}
@@ -640,25 +681,36 @@ public class ParquetFileReader implements Closeable {
return total;
}
+ /**
+ * @deprecated will be removed in 2.0.0; use {@link #getFile()} instead
+ */
+ @Deprecated
public Path getPath() {
- return fileStatus.getPath();
+ return new Path(file.toString());
+ }
+
+ public String getFile() {
+ return file.toString();
}
- void filterRowGroups(FilterCompat.Filter filter) throws IOException {
+ private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) throws IOException {
// set up data filters based on configured levels
- List<RowGroupFilter.FilterLevel> levels = new ArrayList<RowGroupFilter.FilterLevel>();
+ List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
- if (conf.getBoolean(
- STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) {
+ if (options.useStatsFilter()) {
levels.add(STATISTICS);
}
- if (conf.getBoolean(
- DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) {
+ if (options.useDictionaryFilter()) {
levels.add(DICTIONARY);
}
- this.blocks = RowGroupFilter.filterRowGroups(levels, filter, blocks, this);
+ FilterCompat.Filter recordFilter = options.getRecordFilter();
+ if (recordFilter != null) {
+ return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this);
+ }
+
+ return blocks;
}
public List<BlockMetaData> getRowGroups() {
@@ -785,7 +837,7 @@ public class ParquetFileReader implements Closeable {
}
DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f);
- BytesDecompressor decompressor = codecFactory.getDecompressor(meta.getCodec());
+ BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(meta.getCodec());
return new DictionaryPage(
decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()),
@@ -817,9 +869,7 @@ public class ParquetFileReader implements Closeable {
f.close();
}
} finally {
- if (codecFactory != null) {
- codecFactory.release();
- }
+ options.getCodecFactory().release();
}
}
@@ -929,7 +979,7 @@ public class ParquetFileReader implements Closeable {
" but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + (descriptor.fileOffset + pos()));
}
- BytesDecompressor decompressor = codecFactory.getDecompressor(descriptor.metadata.getCodec());
+ BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage);
}
@@ -1077,7 +1127,7 @@ public class ParquetFileReader implements Closeable {
f.seek(offset);
// Allocate the bytebuffer based on whether the FS can support it.
- ByteBuffer chunksByteBuffer = allocator.allocate(length);
+ ByteBuffer chunksByteBuffer = options.getAllocator().allocate(length);
f.readFully(chunksByteBuffer);
// report in a counter the data we just scanned
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 57500bf..da8635d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -23,7 +23,6 @@ import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
@@ -36,7 +35,6 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -59,9 +57,13 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.TypeUtil;
@@ -85,22 +87,6 @@ public class ParquetFileWriter {
public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
public static final int CURRENT_VERSION = 1;
- // need to supply a buffer size when setting block size. this is the default
- // for hadoop 1 to present. copying it avoids loading DFSConfigKeys.
- private static final int DFS_BUFFER_SIZE_DEFAULT = 4096;
-
- // visible for testing
- static final Set<String> BLOCK_FS_SCHEMES = new HashSet<String>();
- static {
- BLOCK_FS_SCHEMES.add("hdfs");
- BLOCK_FS_SCHEMES.add("webhdfs");
- BLOCK_FS_SCHEMES.add("viewfs");
- }
-
- private static boolean supportsBlockSize(FileSystem fs) {
- return BLOCK_FS_SCHEMES.contains(fs.getUri().getScheme());
- }
-
// File creation modes
public static enum Mode {
CREATE,
@@ -108,7 +94,7 @@ public class ParquetFileWriter {
}
private final MessageType schema;
- private final FSDataOutputStream out;
+ private final PositionOutputStream out;
private final AlignmentStrategy alignment;
// file data
@@ -193,11 +179,14 @@ public class ParquetFileWriter {
* @param schema the schema of the data
* @param file the file to write to
* @throws IOException if the file can not be created
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead
*/
+ @Deprecated
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file) throws IOException {
- this(configuration, schema, file, Mode.CREATE, DEFAULT_BLOCK_SIZE,
- MAX_PADDING_SIZE_DEFAULT);
+ this(HadoopOutputFile.fromPath(file, configuration),
+ schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
}
/**
@@ -206,11 +195,14 @@ public class ParquetFileWriter {
* @param file the file to write to
* @param mode file creation mode
* @throws IOException if the file can not be created
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead
*/
+ @Deprecated
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, Mode mode) throws IOException {
- this(configuration, schema, file, mode, DEFAULT_BLOCK_SIZE,
- MAX_PADDING_SIZE_DEFAULT);
+ this(HadoopOutputFile.fromPath(file, configuration),
+ schema, mode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
}
/**
@@ -219,36 +211,54 @@ public class ParquetFileWriter {
* @param file the file to write to
* @param mode file creation mode
* @param rowGroupSize the row group size
+ * @param maxPaddingSize the maximum padding
* @throws IOException if the file can not be created
+ * @deprecated will be removed in 2.0.0;
+ * use {@link ParquetFileWriter(OutputFile,MessageType,Mode,long,long)} instead
*/
+ @Deprecated
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, Mode mode, long rowGroupSize,
int maxPaddingSize)
throws IOException {
- TypeUtil.checkValidWriteSchema(schema);
- this.schema = schema;
- FileSystem fs = file.getFileSystem(configuration);
- boolean overwriteFlag = (mode == Mode.OVERWRITE);
+ this(HadoopOutputFile.fromPath(file, configuration),
+ schema, mode, rowGroupSize, maxPaddingSize);
+ }
- if (supportsBlockSize(fs)) {
- // use the default block size, unless row group size is larger
- long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);
+ /**
+ * @param file OutputFile to create or overwrite
+ * @param schema the schema of the data
+ * @param mode file creation mode
+ * @param rowGroupSize the row group size
+ * @param maxPaddingSize the maximum padding
+ * @throws IOException if the file can not be created
+ */
+ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+ long rowGroupSize, int maxPaddingSize)
+ throws IOException {
+ TypeUtil.checkValidWriteSchema(schema);
- this.alignment = PaddingAlignment.get(
- dfsBlockSize, rowGroupSize, maxPaddingSize);
- this.out = fs.create(file, overwriteFlag, DFS_BUFFER_SIZE_DEFAULT,
- fs.getDefaultReplication(file), dfsBlockSize);
+ this.schema = schema;
+ long blockSize = rowGroupSize;
+ if (file.supportsBlockSize()) {
+ blockSize = Math.max(file.defaultBlockSize(), rowGroupSize);
+ this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, maxPaddingSize);
} else {
this.alignment = NoAlignment.get(rowGroupSize);
- this.out = fs.create(file, overwriteFlag);
+ }
+
+ if (mode == Mode.OVERWRITE) {
+ this.out = file.createOrOverwrite(blockSize);
+ } else {
+ this.out = file.create(blockSize);
}
this.encodingStatsBuilder = new EncodingStats.Builder();
}
/**
- * FOR TESTING ONLY.
+ * FOR TESTING ONLY. This supports testing block padding behavior on the local FS.
*
* @param configuration Hadoop configuration
* @param schema the schema of the data
@@ -263,11 +273,10 @@ public class ParquetFileWriter {
this.schema = schema;
this.alignment = PaddingAlignment.get(
rowAndBlockSize, rowAndBlockSize, maxPaddingSize);
- this.out = fs.create(file, true, DFS_BUFFER_SIZE_DEFAULT,
- fs.getDefaultReplication(file), rowAndBlockSize);
+ this.out = HadoopStreams.wrap(
+ fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize));
this.encodingStatsBuilder = new EncodingStats.Builder();
}
-
/**
* start the file
* @throws IOException
@@ -490,10 +499,23 @@ public class ParquetFileWriter {
currentBlock = null;
}
+ /**
+ * @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} instead
+ */
+ @Deprecated
public void appendFile(Configuration conf, Path file) throws IOException {
ParquetFileReader.open(conf, file).appendTo(this);
}
+ public void appendFile(InputFile file) throws IOException {
+ ParquetFileReader.open(file).appendTo(this);
+ }
+
+ /**
+ * @deprecated will be removed in 2.0.0;
+ * use {@link #appendRowGroups(SeekableInputStream,List,boolean)} instead
+ */
+ @Deprecated
public void appendRowGroups(FSDataInputStream file,
List<BlockMetaData> rowGroups,
boolean dropColumns) throws IOException {
@@ -508,13 +530,18 @@ public class ParquetFileWriter {
}
}
+ /**
+ * @deprecated will be removed in 2.0.0;
+ * use {@link #appendRowGroup(SeekableInputStream,BlockMetaData,boolean)} instead
+ */
+ @Deprecated
public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup,
boolean dropColumns) throws IOException {
- appendRowGroup(from, rowGroup, dropColumns);
+ appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns);
}
public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
- boolean dropColumns) throws IOException {
+ boolean dropColumns) throws IOException {
startBlock(rowGroup.getRowCount());
Map<String, ColumnChunkMetaData> columnsToCopy =
@@ -603,13 +630,13 @@ public class ParquetFileWriter {
/**
* Copy from a FS input stream to an output stream. Thread-safe
*
- * @param from a {@link FSDataInputStream}
- * @param to any {@link OutputStream}
+ * @param from a {@link SeekableInputStream}
+ * @param to any {@link PositionOutputStream}
* @param start where in the from stream to start copying
* @param length the number of bytes to copy
* @throws IOException
*/
- private static void copy(SeekableInputStream from, FSDataOutputStream to,
+ private static void copy(SeekableInputStream from, PositionOutputStream to,
long start, long length) throws IOException{
LOG.debug("Copying {} bytes at {} to {}" ,length , start , to.getPos());
from.seek(start);
@@ -642,7 +669,7 @@ public class ParquetFileWriter {
out.close();
}
- private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
+ private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
long footerIndex = out.getPos();
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
writeFileMetaData(parquetMetadata, out);
@@ -654,7 +681,9 @@ public class ParquetFileWriter {
/**
* Given a list of metadata files, merge them into a single ParquetMetadata
* Requires that the schemas be compatible, and the extraMetadata be exactly equal.
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
+ @Deprecated
public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf) throws IOException {
Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata");
@@ -677,7 +706,9 @@ public class ParquetFileWriter {
* Requires that the schemas be compatible, and the extraMetaData be exactly equal.
* This is useful when merging 2 directories of parquet files into a single directory, as long
* as both directories were written with compatible schemas and equal extraMetaData.
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
+ @Deprecated
public static void writeMergedMetadataFile(List<Path> files, Path outputPath, Configuration conf) throws IOException {
ParquetMetadata merged = mergeMetadataFiles(files, conf);
writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf));
@@ -688,8 +719,8 @@ public class ParquetFileWriter {
* @param configuration the configuration to use to get the FileSystem
* @param outputPath the directory to write the _metadata file to
* @param footers the list of footers to merge
- * @deprecated use the variant of writeMetadataFile that takes a {@link JobSummaryLevel} as an argument.
* @throws IOException
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
@@ -698,7 +729,9 @@ public class ParquetFileWriter {
/**
* writes _common_metadata file, and optionally a _metadata file depending on the {@link JobSummaryLevel} provided
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
+ @Deprecated
public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers, JobSummaryLevel level) throws IOException {
Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == JobSummaryLevel.COMMON_ONLY,
"Unsupported level: " + level);
@@ -715,15 +748,23 @@ public class ParquetFileWriter {
writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
}
+ /**
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
throws IOException {
Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile);
writeMetadataFile(metaDataPath, metadataFooter, fs);
}
+ /**
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs)
throws IOException {
- FSDataOutputStream metadata = fs.create(outputPath);
+ PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
metadata.write(MAGIC);
serializeFooter(metadataFooter, metadata);
metadata.close();
@@ -850,9 +891,9 @@ public class ParquetFileWriter {
}
private interface AlignmentStrategy {
- void alignForRowGroup(FSDataOutputStream out) throws IOException;
+ void alignForRowGroup(PositionOutputStream out) throws IOException;
- long nextRowGroupSize(FSDataOutputStream out) throws IOException;
+ long nextRowGroupSize(PositionOutputStream out) throws IOException;
}
private static class NoAlignment implements AlignmentStrategy {
@@ -867,11 +908,11 @@ public class ParquetFileWriter {
}
@Override
- public void alignForRowGroup(FSDataOutputStream out) {
+ public void alignForRowGroup(PositionOutputStream out) {
}
@Override
- public long nextRowGroupSize(FSDataOutputStream out) {
+ public long nextRowGroupSize(PositionOutputStream out) {
return rowGroupSize;
}
}
@@ -900,7 +941,7 @@ public class ParquetFileWriter {
}
@Override
- public void alignForRowGroup(FSDataOutputStream out) throws IOException {
+ public void alignForRowGroup(PositionOutputStream out) throws IOException {
long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
if (isPaddingNeeded(remaining)) {
@@ -912,7 +953,7 @@ public class ParquetFileWriter {
}
@Override
- public long nextRowGroupSize(FSDataOutputStream out) throws IOException {
+ public long nextRowGroupSize(PositionOutputStream out) throws IOException {
if (maxPaddingSize <= 0) {
return rowGroupSize;
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index 7c5b5be..979388d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -120,19 +120,16 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
* key to configure whether record-level filtering is enabled
*/
public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled";
- static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
/**
* key to configure whether row group stats filtering is enabled
*/
public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled";
- static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
/**
* key to configure whether row group dictionary filtering is enabled
*/
public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled";
- static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false;
/**
* key to turn on or off task side metadata loading (default true)
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 78af765..340ec11 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -42,6 +42,7 @@ import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
import org.apache.parquet.hadoop.codec.CodecConfig;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -383,8 +384,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
}
WriteContext init = writeSupport.init(conf);
- ParquetFileWriter w = new ParquetFileWriter(
- conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);
+ ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
+ init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize);
w.start();
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index ff9c811..1ba5380 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -22,7 +22,8 @@ import static org.apache.parquet.Preconditions.checkNotNull;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -31,12 +32,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
+import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.io.InputFile;
/**
* Read records from a Parquet file.
@@ -45,9 +51,8 @@ import org.apache.parquet.hadoop.util.HiddenFileFilter;
public class ParquetReader<T> implements Closeable {
private final ReadSupport<T> readSupport;
- private final Configuration conf;
- private final Iterator<Footer> footersIterator;
- private final Filter filter;
+ private final Iterator<InputFile> filesIterator;
+ private final ParquetReadOptions options;
private InternalParquetRecordReader<T> reader;
@@ -100,17 +105,22 @@ public class ParquetReader<T> implements Closeable {
}
private ParquetReader(Configuration conf,
- Path file,
- ReadSupport<T> readSupport,
- Filter filter) throws IOException {
- this.readSupport = readSupport;
- this.filter = checkNotNull(filter, "filter");
- this.conf = conf;
+ Path file,
+ ReadSupport<T> readSupport,
+ FilterCompat.Filter filter) throws IOException {
+ this(Collections.singletonList((InputFile) HadoopInputFile.fromPath(file, conf)),
+ HadoopReadOptions.builder(conf)
+ .withRecordFilter(checkNotNull(filter, "filter"))
+ .build(),
+ readSupport);
+ }
- FileSystem fs = file.getFileSystem(conf);
- List<FileStatus> statuses = Arrays.asList(fs.listStatus(file, HiddenFileFilter.INSTANCE));
- List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
- this.footersIterator = footers.iterator();
+ private ParquetReader(List<InputFile> files,
+ ParquetReadOptions options,
+ ReadSupport<T> readSupport) throws IOException {
+ this.readSupport = readSupport;
+ this.options = options;
+ this.filesIterator = files.iterator();
}
/**
@@ -135,18 +145,15 @@ public class ParquetReader<T> implements Closeable {
reader.close();
reader = null;
}
- if (footersIterator.hasNext()) {
- Footer footer = footersIterator.next();
- ParquetFileReader fileReader = ParquetFileReader.open(
- conf, footer.getFile(), footer.getParquetMetadata());
+ if (filesIterator.hasNext()) {
+ InputFile file = filesIterator.next();
- // apply data filters
- fileReader.filterRowGroups(filter);
+ ParquetFileReader fileReader = ParquetFileReader.open(file, options);
- reader = new InternalParquetRecordReader<T>(readSupport, filter);
+ reader = new InternalParquetRecordReader<>(readSupport, options.getRecordFilter());
- reader.initialize(fileReader, conf);
+ reader.initialize(fileReader, options);
}
}
@@ -157,37 +164,114 @@ public class ParquetReader<T> implements Closeable {
}
}
+ public static <T> Builder<T> read(InputFile file) throws IOException {
+ return new Builder<>(file);
+ }
+
public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
- return new Builder<T>(readSupport, path);
+ return new Builder<>(readSupport, path);
}
public static class Builder<T> {
private final ReadSupport<T> readSupport;
- private final Path file;
- private Filter filter;
- protected Configuration conf;
+ private final InputFile file;
+ private final Path path;
+ private Filter filter = null;
+ protected Configuration conf = new Configuration();
+ private ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(conf);
+ @Deprecated
private Builder(ReadSupport<T> readSupport, Path path) {
this.readSupport = checkNotNull(readSupport, "readSupport");
- this.file = checkNotNull(path, "path");
- this.conf = new Configuration();
- this.filter = FilterCompat.NOOP;
+ this.file = null;
+ this.path = checkNotNull(path, "path");
}
+ @Deprecated
protected Builder(Path path) {
this.readSupport = null;
- this.file = checkNotNull(path, "path");
- this.conf = new Configuration();
- this.filter = FilterCompat.NOOP;
+ this.file = null;
+ this.path = checkNotNull(path, "path");
}
+ protected Builder(InputFile file) {
+ this.readSupport = null;
+ this.file = checkNotNull(file, "file");
+ this.path = null;
+ }
+
+ // when called, resets options to the defaults from conf
public Builder<T> withConf(Configuration conf) {
this.conf = checkNotNull(conf, "conf");
+
+ // previous versions didn't use the builder, so may set filter before conf. this maintains
+ // compatibility for filter. other options are reset by a new conf.
+ this.optionsBuilder = HadoopReadOptions.builder(conf);
+ if (filter != null) {
+ optionsBuilder.withRecordFilter(filter);
+ }
+
return this;
}
public Builder<T> withFilter(Filter filter) {
- this.filter = checkNotNull(filter, "filter");
+ this.filter = filter;
+ optionsBuilder.withRecordFilter(filter);
+ return this;
+ }
+
+ public Builder<T> useSignedStringMinMax(boolean useSignedStringMinMax) {
+ optionsBuilder.useSignedStringMinMax(useSignedStringMinMax);
+ return this;
+ }
+
+ public Builder<T> useSignedStringMinMax() {
+ optionsBuilder.useSignedStringMinMax();
+ return this;
+ }
+
+ public Builder<T> useStatsFilter(boolean useStatsFilter) {
+ optionsBuilder.useStatsFilter(useStatsFilter);
+ return this;
+ }
+
+ public Builder<T> useStatsFilter() {
+ optionsBuilder.useStatsFilter();
+ return this;
+ }
+
+ public Builder<T> useDictionaryFilter(boolean useDictionaryFilter) {
+ optionsBuilder.useDictionaryFilter(useDictionaryFilter);
+ return this;
+ }
+
+ public Builder<T> useDictionaryFilter() {
+ optionsBuilder.useDictionaryFilter();
+ return this;
+ }
+
+ public Builder<T> useRecordFilter(boolean useRecordFilter) {
+ optionsBuilder.useRecordFilter(useRecordFilter);
+ return this;
+ }
+
+ public Builder<T> useRecordFilter() {
+ optionsBuilder.useRecordFilter();
+ return this;
+ }
+
+ public Builder<T> withFileRange(long start, long end) {
+ optionsBuilder.withRange(start, end);
+ return this;
+ }
+
+ public Builder<T> withCodecFactory(CompressionCodecFactory codecFactory) {
+ optionsBuilder.withCodecFactory(codecFactory);
+ return this;
+ }
+
+ public Builder<T> set(String key, String value) {
+ optionsBuilder.set(key, value);
return this;
}
@@ -199,7 +283,29 @@ public class ParquetReader<T> implements Closeable {
}
public ParquetReader<T> build() throws IOException {
- return new ParquetReader<T>(conf, file, getReadSupport(), filter);
+ ParquetReadOptions options = optionsBuilder.build();
+
+ if (path != null) {
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus stat = fs.getFileStatus(path);
+
+ if (stat.isFile()) {
+ return new ParquetReader<>(
+ Collections.singletonList((InputFile) HadoopInputFile.fromStatus(stat, conf)),
+ options,
+ getReadSupport());
+
+ } else {
+ List<InputFile> files = new ArrayList<>();
+ for (FileStatus fileStatus : fs.listStatus(path, HiddenFileFilter.INSTANCE)) {
+ files.add(HadoopInputFile.fromStatus(fileStatus, conf));
+ }
+ return new ParquetReader<T>(files, options, getReadSupport());
+ }
+
+ } else {
+ return new ParquetReader<>(Collections.singletonList(file), options, getReadSupport());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index ebdc686..9ca8be9 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -18,14 +18,9 @@
*/
package org.apache.parquet.hadoop;
-import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.*;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.offsets;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetInputFormat.SPLIT_FILES;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -37,21 +32,21 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.parquet.CorruptDeltaByteArrays;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
-import org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel;
-import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
@@ -158,13 +153,16 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
long[] rowGroupOffsets = split.getRowGroupOffsets();
// if task.side.metadata is set, rowGroupOffsets is null
- MetadataFilter metadataFilter = (rowGroupOffsets != null ?
- offsets(rowGroupOffsets) :
- range(split.getStart(), split.getEnd()));
+ ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(configuration);
+ if (rowGroupOffsets != null) {
+ optionsBuilder.withOffsets(rowGroupOffsets);
+ } else {
+ optionsBuilder.withRange(split.getStart(), split.getEnd());
+ }
// open a reader with the metadata filter
ParquetFileReader reader = ParquetFileReader.open(
- configuration, path, metadataFilter);
+ HadoopInputFile.fromPath(path, configuration), optionsBuilder.build());
if (rowGroupOffsets != null) {
// verify a row group was found for each offset
@@ -175,10 +173,6 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
+ " expected: " + Arrays.toString(rowGroupOffsets)
+ " found: " + blocks);
}
-
- } else {
- // apply data filters
- reader.filterRowGroups(getFilter(configuration));
}
if (!reader.getRowGroups().isEmpty()) {
[4/4] parquet-mr git commit: PARQUET-1142: Add alternatives to Hadoop
classes in the API
Posted by bl...@apache.org.
PARQUET-1142: Add alternatives to Hadoop classes in the API
This updates the read and write paths to avoid using Hadoop classes where possible.
* Adds a generic compression interface, `CompressionCodecFactory`
* Adds `OutputFile` and `PositionOutputStream`
* Adds classes to help implementations wrap input and output streams: `DelegatingSeekableInputStream` and `DelegatingPositionOutputStream`
* Adds `ParquetReadOptions` to avoid passing options with `Configuration`
* Updates the read and write APIs to use new abstractions instead of Hadoop
Author: Ryan Blue <bl...@apache.org>
Closes #429 from rdblue/PARQUET-1142-add-hadoop-alternatives and squashes the following commits:
21500337b [Ryan Blue] PARQUET-1142: Fix NPE when not filtering with new read API.
35eddd735 [Ryan Blue] PARQUET-1142: Fix problems from Gabor's review.
da391b0d4 [Ryan Blue] PARQUET-1142: Fix binary incompatibilities.
2e3d693ab [Ryan Blue] PARQUET-1142: Update the read and write paths to use new files and streams.
8d57e089f [Ryan Blue] PARQUET-1142: Add OutputFile and PositionOutputStream.
42908a95e [Ryan Blue] PARQUET-1142: Extract non-Hadoop API from CodecFactory.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8bfd9b4d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8bfd9b4d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8bfd9b4d
Branch: refs/heads/master
Commit: 8bfd9b4d8f4fb0a2b522c9328f67eb642066306b
Parents: 81f4801
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Dec 13 11:27:54 2017 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Dec 13 11:27:54 2017 -0800
----------------------------------------------------------------------
parquet-common/pom.xml | 6 +
.../org/apache/parquet/bytes/BytesInput.java | 486 +++++++++++
.../bytes/CapacityByteArrayOutputStream.java | 337 ++++++++
.../bytes/ConcatenatingByteArrayCollector.java | 63 ++
.../bytes/LittleEndianDataInputStream.java | 424 +++++++++
.../bytes/LittleEndianDataOutputStream.java | 220 +++++
.../compression/CompressionCodecFactory.java | 47 +
.../CompressionCodecNotSupportedException.java | 38 +
.../hadoop/metadata/CompressionCodecName.java | 98 +++
.../io/DelegatingPositionOutputStream.java | 63 ++
.../io/DelegatingSeekableInputStream.java | 171 ++++
.../java/org/apache/parquet/io/InputFile.java | 9 +-
.../java/org/apache/parquet/io/OutputFile.java | 34 +
.../apache/parquet/io/PositionOutputStream.java | 39 +
.../org/apache/parquet/io/MockInputStream.java | 56 ++
.../io/TestDelegatingSeekableInputStream.java | 861 +++++++++++++++++++
.../org/apache/parquet/bytes/BytesInput.java | 486 -----------
.../bytes/CapacityByteArrayOutputStream.java | 337 --------
.../bytes/ConcatenatingByteArrayCollector.java | 63 --
.../bytes/LittleEndianDataInputStream.java | 424 ---------
.../bytes/LittleEndianDataOutputStream.java | 220 -----
.../org/apache/parquet/HadoopReadOptions.java | 98 +++
.../org/apache/parquet/ParquetReadOptions.java | 232 +++++
.../parquet/filter2/compat/RowGroupFilter.java | 4 +
.../converter/ParquetMetadataConverter.java | 22 +-
.../org/apache/parquet/hadoop/CodecFactory.java | 26 +-
.../hadoop/ColumnChunkPageReadStore.java | 6 +-
.../parquet/hadoop/DirectCodecFactory.java | 12 +-
.../hadoop/InternalParquetRecordReader.java | 34 +-
.../parquet/hadoop/ParquetFileReader.java | 254 +++---
.../parquet/hadoop/ParquetFileWriter.java | 147 ++--
.../parquet/hadoop/ParquetInputFormat.java | 3 -
.../parquet/hadoop/ParquetOutputFormat.java | 5 +-
.../apache/parquet/hadoop/ParquetReader.java | 174 +++-
.../parquet/hadoop/ParquetRecordReader.java | 26 +-
.../apache/parquet/hadoop/ParquetWriter.java | 50 +-
.../hadoop/UnmaterializableRecordCounter.java | 15 +
.../CompressionCodecNotSupportedException.java | 36 -
.../hadoop/metadata/CompressionCodecName.java | 98 ---
.../hadoop/util/H1SeekableInputStream.java | 101 +--
.../hadoop/util/H2SeekableInputStream.java | 20 +-
.../parquet/hadoop/util/HadoopCodecs.java | 39 +
.../parquet/hadoop/util/HadoopOutputFile.java | 100 +++
.../hadoop/util/HadoopPositionOutputStream.java | 66 ++
.../parquet/hadoop/util/HadoopStreams.java | 15 +
.../TestInputOutputFormatWithPadding.java | 6 +-
.../parquet/hadoop/TestParquetFileWriter.java | 1 +
.../hadoop/util/MockHadoopInputStream.java | 87 ++
.../parquet/hadoop/util/MockInputStream.java | 87 --
.../hadoop/util/TestHadoop1ByteBufferReads.java | 761 ----------------
.../hadoop/util/TestHadoop2ByteBufferReads.java | 30 +-
.../parquet/tools/command/MergeCommand.java | 3 +-
pom.xml | 9 +-
53 files changed, 4158 insertions(+), 2891 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml
index b0357ba..7ae6068 100644
--- a/parquet-common/pom.xml
+++ b/parquet-common/pom.xml
@@ -37,6 +37,12 @@
<dependencies>
<dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-format</artifactId>
+ <version>${parquet.format.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
new file mode 100644
index 0000000..6e593c2
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A source of bytes capable of writing itself to an output.
+ * A BytesInput should be consumed right away.
+ * It is not a container.
+ * For example if it is referring to a stream,
+ * subsequent BytesInput reads from the stream will be incorrect
+ * if the previous has not been consumed.
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class BytesInput {
+ private static final Logger LOG = LoggerFactory.getLogger(BytesInput.class);
+ private static final boolean DEBUG = false;//Log.DEBUG;
+ private static final EmptyBytesInput EMPTY_BYTES_INPUT = new EmptyBytesInput();
+
+ /**
+ * logically concatenate the provided inputs
+ * @param inputs the inputs to concatenate
+ * @return a concatenated input
+ */
+ public static BytesInput concat(BytesInput... inputs) {
+ return new SequenceBytesIn(Arrays.asList(inputs));
+ }
+
+ /**
+ * logically concatenate the provided inputs
+ * @param inputs the inputs to concatenate
+ * @return a concatenated input
+ */
+ public static BytesInput concat(List<BytesInput> inputs) {
+ return new SequenceBytesIn(inputs);
+ }
+
+ /**
+ * @param in
+ * @param bytes number of bytes to read
+ * @return a BytesInput that will read that number of bytes from the stream
+ */
+ public static BytesInput from(InputStream in, int bytes) {
+ return new StreamBytesInput(in, bytes);
+ }
+
+ /**
+ * @param buffer
+ * @param length number of bytes to read
+ * @return a BytesInput that will read the given bytes from the ByteBuffer
+ */
+ public static BytesInput from(ByteBuffer buffer, int offset, int length) {
+ return new ByteBufferBytesInput(buffer, offset, length);
+ }
+
+ /**
+ *
+ * @param in
+ * @return a Bytes input that will write the given bytes
+ */
+ public static BytesInput from(byte[] in) {
+ LOG.debug("BytesInput from array of {} bytes", in.length);
+ return new ByteArrayBytesInput(in, 0 , in.length);
+ }
+
+ public static BytesInput from(byte[] in, int offset, int length) {
+ LOG.debug("BytesInput from array of {} bytes", length);
+ return new ByteArrayBytesInput(in, offset, length);
+ }
+
+ /**
+ * @param intValue the int to write
+ * @return a BytesInput that will write 4 bytes in little endian
+ */
+ public static BytesInput fromInt(int intValue) {
+ return new IntBytesInput(intValue);
+ }
+
+ /**
+ * @param intValue the int to write
+ * @return a BytesInput that will write var int
+ */
+ public static BytesInput fromUnsignedVarInt(int intValue) {
+ return new UnsignedVarIntBytesInput(intValue);
+ }
+
+ /**
+ *
+ * @param intValue the int to write
+ */
+ public static BytesInput fromZigZagVarInt(int intValue) {
+ int zigZag = (intValue << 1) ^ (intValue >> 31);
+ return new UnsignedVarIntBytesInput(zigZag);
+ }
+
+ /**
+ * @param longValue the long to write
+ * @return a BytesInput that will write var long
+ */
+ public static BytesInput fromUnsignedVarLong(long longValue) {
+ return new UnsignedVarLongBytesInput(longValue);
+ }
+
+ /**
+ *
+ * @param longValue the long to write
+ */
+ public static BytesInput fromZigZagVarLong(long longValue) {
+ long zigZag = (longValue << 1) ^ (longValue >> 63);
+ return new UnsignedVarLongBytesInput(zigZag);
+ }
+
+ /**
+ * @param arrayOut
+ * @return a BytesInput that will write the content of the buffer
+ */
+ public static BytesInput from(CapacityByteArrayOutputStream arrayOut) {
+ return new CapacityBAOSBytesInput(arrayOut);
+ }
+
+ /**
+ * @param baos - stream to wrap into a BytesInput
+ * @return a BytesInput that will write the content of the buffer
+ */
+ public static BytesInput from(ByteArrayOutputStream baos) {
+ return new BAOSBytesInput(baos);
+ }
+
+ /**
+ * @return an empty bytes input
+ */
+ public static BytesInput empty() {
+ return EMPTY_BYTES_INPUT;
+ }
+
+ /**
+ * copies the input into a new byte array
+ * @param bytesInput
+ * @return
+ * @throws IOException
+ */
+ public static BytesInput copy(BytesInput bytesInput) throws IOException {
+ return from(bytesInput.toByteArray());
+ }
+
+ /**
+ * writes the bytes into a stream
+ * @param out
+ * @throws IOException
+ */
+ abstract public void writeAllTo(OutputStream out) throws IOException;
+
+ /**
+ *
+ * @return a new byte array materializing the contents of this input
+ * @throws IOException
+ */
+ public byte[] toByteArray() throws IOException {
+ BAOS baos = new BAOS((int)size());
+ this.writeAllTo(baos);
+ LOG.debug("converted {} to byteArray of {} bytes", size() , baos.size());
+ return baos.getBuf();
+ }
+
+ /**
+ *
+ * @return a new ByteBuffer materializing the contents of this input
+ * @throws IOException
+ */
+ public ByteBuffer toByteBuffer() throws IOException {
+ return ByteBuffer.wrap(toByteArray());
+ }
+
+ /**
+ *
+ * @return a new InputStream materializing the contents of this input
+ * @throws IOException
+ */
+ public InputStream toInputStream() throws IOException {
+ return new ByteBufferInputStream(toByteBuffer());
+ }
+
+ /**
+ *
+ * @return the size in bytes that would be written
+ */
+ abstract public long size();
+
+ private static final class BAOS extends ByteArrayOutputStream {
+ private BAOS(int size) {
+ super(size);
+ }
+
+ public byte[] getBuf() {
+ return this.buf;
+ }
+ }
+
+ private static class StreamBytesInput extends BytesInput {
+ private static final Logger LOG = LoggerFactory.getLogger(BytesInput.StreamBytesInput.class);
+ private final InputStream in;
+ private final int byteCount;
+
+ private StreamBytesInput(InputStream in, int byteCount) {
+ super();
+ this.in = in;
+ this.byteCount = byteCount;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ LOG.debug("write All {} bytes", byteCount);
+ // TODO: more efficient
+ out.write(this.toByteArray());
+ }
+
+ public byte[] toByteArray() throws IOException {
+ LOG.debug("read all {} bytes", byteCount);
+ byte[] buf = new byte[byteCount];
+ new DataInputStream(in).readFully(buf);
+ return buf;
+ }
+
+ @Override
+ public long size() {
+ return byteCount;
+ }
+
+ }
+
+ private static class SequenceBytesIn extends BytesInput {
+ private static final Logger LOG = LoggerFactory.getLogger(BytesInput.SequenceBytesIn.class);
+
+ private final List<BytesInput> inputs;
+ private final long size;
+
+ private SequenceBytesIn(List<BytesInput> inputs) {
+ this.inputs = inputs;
+ long total = 0;
+ for (BytesInput input : inputs) {
+ total += input.size();
+ }
+ this.size = total;
+ }
+
+ @SuppressWarnings("unused")
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ for (BytesInput input : inputs) {
+
+ LOG.debug("write {} bytes to out", input.size());
+ if (input instanceof SequenceBytesIn) LOG.debug("{");
+ input.writeAllTo(out);
+ if (input instanceof SequenceBytesIn) LOG.debug("}");
+ }
+ }
+
+ @Override
+ public long size() {
+ return size;
+ }
+
+ }
+
+ private static class IntBytesInput extends BytesInput {
+
+ private final int intValue;
+
+ public IntBytesInput(int intValue) {
+ this.intValue = intValue;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ BytesUtils.writeIntLittleEndian(out, intValue);
+ }
+
+ public ByteBuffer toByteBuffer() throws IOException {
+ return ByteBuffer.allocate(4).putInt(0, intValue);
+ }
+
+ @Override
+ public long size() {
+ return 4;
+ }
+
+ }
+
+ private static class UnsignedVarIntBytesInput extends BytesInput {
+
+ private final int intValue;
+
+ public UnsignedVarIntBytesInput(int intValue) {
+ this.intValue = intValue;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ BytesUtils.writeUnsignedVarInt(intValue, out);
+ }
+
+ public ByteBuffer toByteBuffer() throws IOException {
+ ByteBuffer ret = ByteBuffer.allocate((int) size());
+ BytesUtils.writeUnsignedVarInt(intValue, ret);
+ return ret;
+ }
+
+ @Override
+ public long size() {
+ int s = (38 - Integer.numberOfLeadingZeros(intValue)) / 7;
+ return s == 0 ? 1 : s;
+ }
+ }
+
+ private static class UnsignedVarLongBytesInput extends BytesInput {
+
+ private final long longValue;
+
+ public UnsignedVarLongBytesInput(long longValue) {
+ this.longValue = longValue;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ BytesUtils.writeUnsignedVarLong(longValue, out);
+ }
+
+ @Override
+ public long size() {
+ int s = (70 - Long.numberOfLeadingZeros(longValue)) / 7;
+ return s == 0 ? 1 : s;
+ }
+ }
+
+ private static class EmptyBytesInput extends BytesInput {
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ }
+
+ @Override
+ public long size() {
+ return 0;
+ }
+
+ public ByteBuffer toByteBuffer() throws IOException {
+ return ByteBuffer.allocate(0);
+ }
+
+ }
+
+ private static class CapacityBAOSBytesInput extends BytesInput {
+
+ private final CapacityByteArrayOutputStream arrayOut;
+
+ private CapacityBAOSBytesInput(CapacityByteArrayOutputStream arrayOut) {
+ this.arrayOut = arrayOut;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ arrayOut.writeTo(out);
+ }
+
+ @Override
+ public long size() {
+ return arrayOut.size();
+ }
+
+ }
+
+ private static class BAOSBytesInput extends BytesInput {
+
+ private final ByteArrayOutputStream arrayOut;
+
+ private BAOSBytesInput(ByteArrayOutputStream arrayOut) {
+ this.arrayOut = arrayOut;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ arrayOut.writeTo(out);
+ }
+
+ @Override
+ public long size() {
+ return arrayOut.size();
+ }
+
+ }
+
+ private static class ByteArrayBytesInput extends BytesInput {
+
+ private final byte[] in;
+ private final int offset;
+ private final int length;
+
+ private ByteArrayBytesInput(byte[] in, int offset, int length) {
+ this.in = in;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ out.write(in, offset, length);
+ }
+
+ public ByteBuffer toByteBuffer() throws IOException {
+ return ByteBuffer.wrap(in, offset, length);
+ }
+
+ @Override
+ public long size() {
+ return length;
+ }
+
+ }
+
+ private static class ByteBufferBytesInput extends BytesInput {
+
+ private final ByteBuffer byteBuf;
+ private final int length;
+ private final int offset;
+
+ private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) {
+ this.byteBuf = byteBuf;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ final WritableByteChannel outputChannel = Channels.newChannel(out);
+ byteBuf.position(offset);
+ ByteBuffer tempBuf = byteBuf.slice();
+ tempBuf.limit(length);
+ outputChannel.write(tempBuf);
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() throws IOException {
+ byteBuf.position(offset);
+ ByteBuffer buf = byteBuf.slice();
+ buf.limit(length);
+ return buf;
+ }
+
+ @Override
+ public long size() {
+ return length;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
new file mode 100644
index 0000000..92674d4
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import static java.lang.Math.max;
+import static java.lang.Math.pow;
+import static java.lang.String.format;
+import static org.apache.parquet.Preconditions.checkArgument;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.OutputStreamCloseException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying.
+ * Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a new larger array, this output
+ * stream grows by allocating a new array (slab) and adding it to a list of previous arrays.
+ *
+ * Each new slab is allocated to be the same size as all the previous slabs combined, so these allocations become
+ * exponentially less frequent, just like ByteArrayOutputStream, with one difference. This output stream accepts a
+ * max capacity hint, which is a hint describing the max amount of data that will be written to this stream. As the
+ * total size of this stream nears this max, this stream starts to grow linearly instead of exponentially.
+ * So new slabs are allocated to be 1/5th of the max capacity hint,
+ * instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly
+ * twice the needed space when a new slab is added just before the stream is done being used.
+ *
+ * When reusing this stream it will adjust the initial slab size based on the previous data size, aiming for fewer
+ * allocations, with the assumption that a similar amount of data will be written to this stream on re-use.
+ * See ({@link CapacityByteArrayOutputStream#reset()}).
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class CapacityByteArrayOutputStream extends OutputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(CapacityByteArrayOutputStream.class);
+ private static final ByteBuffer EMPTY_SLAB = ByteBuffer.wrap(new byte[0]);
+
+ private int initialSlabSize;
+ private final int maxCapacityHint;
+ private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();
+
+ private ByteBuffer currentSlab;
+ private int currentSlabIndex;
+ private int bytesAllocated = 0;
+ private int bytesUsed = 0;
+ private ByteBufferAllocator allocator;
+
+ /**
+ * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it
+ * will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be
+ * a balance between the overhead of creating new slabs and wasting memory by eagerly making
+ * initial slabs too big.
+ *
+ * Note that targetCapacity here need not match maxCapacityHint in the constructor of
+ * CapacityByteArrayOutputStream, though often that would make sense.
+ *
+ * @param minSlabSize no matter what we shouldn't make slabs any smaller than this
+ * @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have?
+ * @param targetNumSlabs how many slabs should it take to reach targetCapacity?
+ */
+ public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) {
+ // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times
+ // before reaching the targetCapacity
+ // eg for page size of 1MB we start at 1024 bytes.
+ // we also don't want to start too small, so we also apply a minimum.
+ return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
+ }
+
+ public static CapacityByteArrayOutputStream withTargetNumSlabs(
+ int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
+ return withTargetNumSlabs(minSlabSize, maxCapacityHint, targetNumSlabs, new HeapByteBufferAllocator());
+ }
+
+ /**
+ * Construct a CapacityByteArrayOutputStream configured such that its initial slab size is
+ * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint
+ */
+ public static CapacityByteArrayOutputStream withTargetNumSlabs(
+ int minSlabSize, int maxCapacityHint, int targetNumSlabs, ByteBufferAllocator allocator) {
+
+ return new CapacityByteArrayOutputStream(
+ initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
+ maxCapacityHint, allocator);
+ }
+
+ /**
+ * Defaults maxCapacityHint to 1MB
+ * @param initialSlabSize
+ * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
+ */
+ @Deprecated
+ public CapacityByteArrayOutputStream(int initialSlabSize) {
+ this(initialSlabSize, 1024 * 1024, new HeapByteBufferAllocator());
+ }
+
+ /**
+ * Defaults maxCapacityHint to 1MB
+ * @param initialSlabSize
+ * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
+ */
+ @Deprecated
+ public CapacityByteArrayOutputStream(int initialSlabSize, ByteBufferAllocator allocator) {
+ this(initialSlabSize, 1024 * 1024, allocator);
+ }
+
+ /**
+ * @param initialSlabSize the size to make the first slab
+ * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
+ * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
+ */
+ @Deprecated
+ public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
+ this(initialSlabSize, maxCapacityHint, new HeapByteBufferAllocator());
+ }
+
+ /**
+ * @param initialSlabSize the size to make the first slab
+ * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
+ */
+ public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint, ByteBufferAllocator allocator) {
+ checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
+ checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
+ checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint));
+ this.initialSlabSize = initialSlabSize;
+ this.maxCapacityHint = maxCapacityHint;
+ this.allocator = allocator;
+ reset();
+ }
+
+ /**
+ * the new slab is guaranteed to be at least minimumSize
+ * @param minimumSize the size of the data we want to copy in the new slab
+ */
+ private void addSlab(int minimumSize) {
+ int nextSlabSize;
+
+ if (bytesUsed == 0) {
+ nextSlabSize = initialSlabSize;
+ } else if (bytesUsed > maxCapacityHint / 5) {
+ // to avoid an overhead of up to twice the needed size, we get linear when approaching target page size
+ nextSlabSize = maxCapacityHint / 5;
+ } else {
+ // double the size every time
+ nextSlabSize = bytesUsed;
+ }
+
+ if (nextSlabSize < minimumSize) {
+ LOG.debug("slab size {} too small for value of size {}. Bumping up slab size", nextSlabSize, minimumSize);
+ nextSlabSize = minimumSize;
+ }
+
+ LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize);
+
+ this.currentSlab = allocator.allocate(nextSlabSize);
+ this.slabs.add(currentSlab);
+ this.bytesAllocated += nextSlabSize;
+ this.currentSlabIndex = 0;
+ }
+
+ @Override
+ public void write(int b) {
+ if (!currentSlab.hasRemaining()) {
+ addSlab(1);
+ }
+ currentSlab.put(currentSlabIndex, (byte) b);
+ currentSlabIndex += 1;
+ currentSlab.position(currentSlabIndex);
+ bytesUsed += 1;
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) - b.length > 0)) {
+ throw new IndexOutOfBoundsException(
+ String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off));
+ }
+ if (len >= currentSlab.remaining()) {
+ final int length1 = currentSlab.remaining();
+ currentSlab.put(b, off, length1);
+ bytesUsed += length1;
+ currentSlabIndex += length1;
+ final int length2 = len - length1;
+ addSlab(length2);
+ currentSlab.put(b, off + length1, length2);
+ currentSlabIndex = length2;
+ bytesUsed += length2;
+ } else {
+ currentSlab.put(b, off, len);
+ currentSlabIndex += len;
+ bytesUsed += len;
+ }
+ }
+
+ private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException {
+ if (buf.hasArray()) {
+ out.write(buf.array(), buf.arrayOffset(), len);
+ } else {
+ // The OutputStream interface only takes a byte[], unfortunately this means that a ByteBuffer
+ // not backed by a byte array must be copied to fulfil this interface
+ byte[] copy = new byte[len];
+ buf.flip();
+ buf.get(copy);
+ out.write(copy);
+ }
+ }
+
+ /**
+ * Writes the complete contents of this buffer to the specified output stream argument. the output
+ * stream's write method <code>out.write(slab, 0, slab.length)</code>) will be called once per slab.
+ *
+ * @param out the output stream to which to write the data.
+ * @exception IOException if an I/O error occurs.
+ */
+ public void writeTo(OutputStream out) throws IOException {
+ for (int i = 0; i < slabs.size() - 1; i++) {
+ writeToOutput(out, slabs.get(i), slabs.get(i).position());
+ }
+ writeToOutput(out, currentSlab, currentSlabIndex);
+ }
+
+ /**
+ * @return The total size in bytes of data written to this stream.
+ */
+ public long size() {
+ return bytesUsed;
+ }
+
+ /**
+ *
+ * @return The total size in bytes currently allocated for this stream.
+ */
+ public int getCapacity() {
+ return bytesAllocated;
+ }
+
+ /**
+ * When re-using an instance with reset, it will adjust slab size based on previous data size.
+ * The intent is to reuse the same instance for the same type of data (for example, the same column).
+ * The assumption is that the size in the buffer will be consistent.
+ */
+ public void reset() {
+ // readjust slab size.
+ // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size
+ this.initialSlabSize = max(bytesUsed / 7, initialSlabSize);
+ LOG.debug("initial slab of size {}", initialSlabSize);
+ for (ByteBuffer slab : slabs) {
+ allocator.release(slab);
+ }
+ this.slabs.clear();
+ this.bytesAllocated = 0;
+ this.bytesUsed = 0;
+ this.currentSlab = EMPTY_SLAB;
+ this.currentSlabIndex = 0;
+ }
+
+ /**
+ * @return the index of the last value written to this stream, which
+ * can be passed to {@link #setByte(long, byte)} in order to change it
+ */
+ public long getCurrentIndex() {
+ checkArgument(bytesUsed > 0, "This is an empty stream");
+ return bytesUsed - 1;
+ }
+
+ /**
+ * Replace the byte stored at position index in this stream with value
+ *
+ * @param index which byte to replace
+ * @param value the value to replace it with
+ */
+ public void setByte(long index, byte value) {
+ checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of: " + bytesUsed);
+
+ long seen = 0;
+ for (int i = 0; i < slabs.size(); i++) {
+ ByteBuffer slab = slabs.get(i);
+ if (index < seen + slab.limit()) {
+ // ok found index
+ slab.put((int)(index-seen), value);
+ break;
+ }
+ seen += slab.limit();
+ }
+ }
+
+ /**
+ * @param prefix a prefix to be used for every new line in the string
+ * @return a text representation of the memory usage of this structure
+ */
+ public String memUsageString(String prefix) {
+ return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), getCapacity());
+ }
+
+ /**
+ * @return the total number of allocated slabs
+ */
+ int getSlabCount() {
+ return slabs.size();
+ }
+
+ @Override
+ public void close() {
+ for (ByteBuffer slab : slabs) {
+ allocator.release(slab);
+ }
+ try {
+ super.close();
+ }catch(IOException e){
+ throw new OutputStreamCloseException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
new file mode 100644
index 0000000..d333168
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.lang.String.format;
+
+public class ConcatenatingByteArrayCollector extends BytesInput {
+ private final List<byte[]> slabs = new ArrayList<byte[]>();
+ private long size = 0;
+
+ public void collect(BytesInput bytesInput) throws IOException {
+ byte[] bytes = bytesInput.toByteArray();
+ slabs.add(bytes);
+ size += bytes.length;
+ }
+
+ public void reset() {
+ size = 0;
+ slabs.clear();
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ for (byte[] slab : slabs) {
+ out.write(slab);
+ }
+ }
+
+ @Override
+ public long size() {
+ return size;
+ }
+
+ /**
+ * @param prefix a prefix to be used for every new line in the string
+ * @return a text representation of the memory usage of this structure
+ */
+ public String memUsageString(String prefix) {
+ return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
new file mode 100644
index 0000000..a092753
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Based on DataInputStream but little endian and without the String/char methods
+ *
+ * @author Julien Le Dem
+ *
+ */
+public final class LittleEndianDataInputStream extends InputStream {
+
+ private final InputStream in;
+
+ /**
+ * Creates a LittleEndianDataInputStream that uses the specified
+ * underlying InputStream.
+ *
+ * @param in the specified input stream
+ */
+ public LittleEndianDataInputStream(InputStream in) {
+ this.in = in;
+ }
+
+ /**
+ * See the general contract of the <code>readFully</code>
+ * method of <code>DataInput</code>.
+ * <p>
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @param b the buffer into which the data is read.
+ * @exception EOFException if this input stream reaches the end before
+ * reading all the bytes.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final void readFully(byte b[]) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ /**
+ * See the general contract of the <code>readFully</code>
+ * method of <code>DataInput</code>.
+ * <p>
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @param b the buffer into which the data is read.
+ * @param off the start offset of the data.
+ * @param len the number of bytes to read.
+ * @exception EOFException if this input stream reaches the end before
+ * reading all the bytes.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final void readFully(byte b[], int off, int len) throws IOException {
+ if (len < 0)
+ throw new IndexOutOfBoundsException();
+ int n = 0;
+ while (n < len) {
+ int count = in.read(b, off + n, len - n);
+ if (count < 0)
+ throw new EOFException();
+ n += count;
+ }
+ }
+
+ /**
+ * See the general contract of the <code>skipBytes</code>
+ * method of <code>DataInput</code>.
+ * <p>
+ * Bytes for this operation are read from the contained
+ * input stream.
+ *
+ * @param n the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ * @exception IOException if the contained input stream does not support
+ * seek, or the stream has been closed and
+ * the contained input stream does not support
+ * reading after close, or another I/O error occurs.
+ */
+ public final int skipBytes(int n) throws IOException {
+ int total = 0;
+ int cur = 0;
+
+ while ((total<n) && ((cur = (int) in.skip(n-total)) > 0)) {
+ total += cur;
+ }
+
+ return total;
+ }
+
+ /**
+ * @return
+ * @throws IOException
+ * @see java.io.InputStream#read()
+ */
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ /**
+ * @return
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return in.hashCode();
+ }
+
+ /**
+ * @param b
+ * @return
+ * @throws IOException
+ * @see java.io.InputStream#read(byte[])
+ */
+ public int read(byte[] b) throws IOException {
+ return in.read(b);
+ }
+
+ /**
+ * @param obj
+ * @return
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object obj) {
+ return in.equals(obj);
+ }
+
+ /**
+ * @param b
+ * @param off
+ * @param len
+ * @return
+ * @throws IOException
+ * @see java.io.InputStream#read(byte[], int, int)
+ */
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ /**
+ * @param n
+ * @return
+ * @throws IOException
+ * @see java.io.InputStream#skip(long)
+ */
+ public long skip(long n) throws IOException {
+ return in.skip(n);
+ }
+
+ /**
+ * @return
+ * @throws IOException
+ * @see java.io.InputStream#available()
+ */
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ /**
+ * @throws IOException
+ * @see java.io.InputStream#close()
+ */
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /**
+ * @param readlimit
+ * @see java.io.InputStream#mark(int)
+ */
+ public void mark(int readlimit) {
+ in.mark(readlimit);
+ }
+
+ /**
+ * @throws IOException
+ * @see java.io.InputStream#reset()
+ */
+ public void reset() throws IOException {
+ in.reset();
+ }
+
+ /**
+ * @return
+ * @see java.io.InputStream#markSupported()
+ */
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ /**
+ * See the general contract of the <code>readBoolean</code>
+ * method of <code>DataInput</code>.
+ * <p>
+ * Bytes for this operation are read from the contained
+ * input stream.
+ *
+ * @return the <code>boolean</code> value read.
+ * @exception EOFException if this input stream has reached the end.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final boolean readBoolean() throws IOException {
+ int ch = in.read();
+ if (ch < 0)
+ throw new EOFException();
+ return (ch != 0);
+ }
+
+ /**
+ * See the general contract of the <code>readByte</code>
+ * method of <code>DataInput</code>.
+ * <p>
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @return the next byte of this input stream as a signed 8-bit
+ * <code>byte</code>.
+ * @exception EOFException if this input stream has reached the end.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final byte readByte() throws IOException {
+ int ch = in.read();
+ if (ch < 0)
+ throw new EOFException();
+ return (byte)(ch);
+ }
+
+ /**
+ * See the general contract of the <code>readUnsignedByte</code>
+ * method of <code>DataInput</code>.
+ * <p>
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @return the next byte of this input stream, interpreted as an
+ * unsigned 8-bit number.
+ * @exception EOFException if this input stream has reached the end.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final int readUnsignedByte() throws IOException {
+ int ch = in.read();
+ if (ch < 0)
+ throw new EOFException();
+ return ch;
+ }
+
+ /**
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @return the next two bytes of this input stream, interpreted as a
+ * signed 16-bit number.
+ * @exception EOFException if this input stream reaches the end before
+ * reading two bytes.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final short readShort() throws IOException {
+ int ch2 = in.read();
+ int ch1 = in.read();
+ if ((ch1 | ch2) < 0)
+ throw new EOFException();
+ return (short)((ch1 << 8) + (ch2 << 0));
+ }
+
+ /**
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @return the next two bytes of this input stream, interpreted as an
+ * unsigned 16-bit integer.
+ * @exception EOFException if this input stream reaches the end before
+ * reading two bytes.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final int readUnsignedShort() throws IOException {
+ int ch2 = in.read();
+ int ch1 = in.read();
+ if ((ch1 | ch2) < 0)
+ throw new EOFException();
+ return (ch1 << 8) + (ch2 << 0);
+ }
+
+ /**
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @return the next four bytes of this input stream, interpreted as an
+ * <code>int</code>.
+ * @exception EOFException if this input stream reaches the end before
+ * reading four bytes.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final int readInt() throws IOException {
+ // TODO: has this been benchmarked against two alternate implementations?
+ // 1) Integer.reverseBytes(in.readInt())
+ // 2) keep a member byte[4], wrapped by an IntBuffer with appropriate endianness set,
+ // and call IntBuffer.get()
+ // Both seem like they might be faster.
+ int ch4 = in.read();
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0)
+ throw new EOFException();
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ private byte readBuffer[] = new byte[8];
+
+ /**
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @return the next eight bytes of this input stream, interpreted as a
+ * <code>long</code>.
+ * @exception EOFException if this input stream reaches the end before
+ * reading eight bytes.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public final long readLong() throws IOException {
+ // TODO: see perf question above in readInt
+ readFully(readBuffer, 0, 8);
+ return (((long)readBuffer[7] << 56) +
+ ((long)(readBuffer[6] & 255) << 48) +
+ ((long)(readBuffer[5] & 255) << 40) +
+ ((long)(readBuffer[4] & 255) << 32) +
+ ((long)(readBuffer[3] & 255) << 24) +
+ ((readBuffer[2] & 255) << 16) +
+ ((readBuffer[1] & 255) << 8) +
+ ((readBuffer[0] & 255) << 0));
+ }
+
+ /**
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @return the next four bytes of this input stream, interpreted as a
+ * <code>float</code>.
+ * @exception EOFException if this input stream reaches the end before
+ * reading four bytes.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.lang.Float#intBitsToFloat(int)
+ */
+ public final float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /**
+ * Bytes
+ * for this operation are read from the contained
+ * input stream.
+ *
+ * @return the next eight bytes of this input stream, interpreted as a
+ * <code>double</code>.
+ * @exception EOFException if this input stream reaches the end before
+ * reading eight bytes.
+ * @exception IOException the stream has been closed and the contained
+ * input stream does not support reading after close, or
+ * another I/O error occurs.
+ * @see java.lang.Double#longBitsToDouble(long)
+ */
+ public final double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
new file mode 100644
index 0000000..9d4a8a9
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import org.apache.parquet.IOExceptionUtils;
+import org.apache.parquet.ParquetRuntimeException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Based on DataOutputStream but in little endian and without the String/char methods
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class LittleEndianDataOutputStream extends OutputStream {
+
+ private final OutputStream out;
+
+ /**
+ * Creates a new data output stream to write data to the specified
+ * underlying output stream. The counter <code>written</code> is
+ * set to zero.
+ *
+ * @param out the underlying output stream, to be saved for later
+ * use.
+ * @see java.io.FilterOutputStream#out
+ */
+ public LittleEndianDataOutputStream(OutputStream out) {
+ this.out = out;
+ }
+
+ /**
+ * Writes the specified byte (the low eight bits of the argument
+ * <code>b</code>) to the underlying output stream. If no exception
+ * is thrown, the counter <code>written</code> is incremented by
+ * <code>1</code>.
+ * <p>
+ * Implements the <code>write</code> method of <code>OutputStream</code>.
+ *
+ * @param b the <code>byte</code> to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array
+ * starting at offset <code>off</code> to the underlying output stream.
+ * If no exception is thrown, the counter <code>written</code> is
+ * incremented by <code>len</code>.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public void write(byte b[], int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ /**
+ * Flushes this data output stream. This forces any buffered output
+ * bytes to be written out to the stream.
+ * <p>
+ * The <code>flush</code> method of <code>DataOutputStream</code>
+ * calls the <code>flush</code> method of its underlying output stream.
+ *
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ * @see java.io.OutputStream#flush()
+ */
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ /**
+ * Writes a <code>boolean</code> to the underlying output stream as
+ * a 1-byte value. The value <code>true</code> is written out as the
+ * value <code>(byte)1</code>; the value <code>false</code> is
+ * written out as the value <code>(byte)0</code>. If no exception is
+ * thrown, the counter <code>written</code> is incremented by
+ * <code>1</code>.
+ *
+ * @param v a <code>boolean</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public final void writeBoolean(boolean v) throws IOException {
+ out.write(v ? 1 : 0);
+ }
+
+ /**
+ * Writes out a <code>byte</code> to the underlying output stream as
+ * a 1-byte value. If no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>1</code>.
+ *
+ * @param v a <code>byte</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public final void writeByte(int v) throws IOException {
+ out.write(v);
+ }
+
+ /**
+ * Writes a <code>short</code> to the underlying output stream as two
+ * bytes, low byte first. If no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>2</code>.
+ *
+ * @param v a <code>short</code> to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public final void writeShort(int v) throws IOException {
+ out.write((v >>> 0) & 0xFF);
+ out.write((v >>> 8) & 0xFF);
+ }
+
+ /**
+ * Writes an <code>int</code> to the underlying output stream as four
+ * bytes, low byte first. If no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>4</code>.
+ *
+ * @param v an <code>int</code> to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public final void writeInt(int v) throws IOException {
+ // TODO: see note in LittleEndianDataInputStream: maybe faster
+ // to use Integer.reverseBytes() and then writeInt, or a ByteBuffer
+ // approach
+ out.write((v >>> 0) & 0xFF);
+ out.write((v >>> 8) & 0xFF);
+ out.write((v >>> 16) & 0xFF);
+ out.write((v >>> 24) & 0xFF);
+ }
+
+ private byte writeBuffer[] = new byte[8];
+
+ /**
+ * Writes a <code>long</code> to the underlying output stream as eight
+ * bytes, low byte first. In no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>8</code>.
+ *
+ * @param v a <code>long</code> to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public final void writeLong(long v) throws IOException {
+ writeBuffer[7] = (byte)(v >>> 56);
+ writeBuffer[6] = (byte)(v >>> 48);
+ writeBuffer[5] = (byte)(v >>> 40);
+ writeBuffer[4] = (byte)(v >>> 32);
+ writeBuffer[3] = (byte)(v >>> 24);
+ writeBuffer[2] = (byte)(v >>> 16);
+ writeBuffer[1] = (byte)(v >>> 8);
+ writeBuffer[0] = (byte)(v >>> 0);
+ out.write(writeBuffer, 0, 8);
+ }
+
+ /**
+ * Converts the float argument to an <code>int</code> using the
+ * <code>floatToIntBits</code> method in class <code>Float</code>,
+ * and then writes that <code>int</code> value to the underlying
+ * output stream as a 4-byte quantity, low byte first. If no
+ * exception is thrown, the counter <code>written</code> is
+ * incremented by <code>4</code>.
+ *
+ * @param v a <code>float</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ * @see java.lang.Float#floatToIntBits(float)
+ */
+ public final void writeFloat(float v) throws IOException {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ /**
+ * Converts the double argument to a <code>long</code> using the
+ * <code>doubleToLongBits</code> method in class <code>Double</code>,
+ * and then writes that <code>long</code> value to the underlying
+ * output stream as an 8-byte quantity, low byte first. If no
+ * exception is thrown, the counter <code>written</code> is
+ * incremented by <code>8</code>.
+ *
+ * @param v a <code>double</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ * @see java.lang.Double#doubleToLongBits(double)
+ */
+ public final void writeDouble(double v) throws IOException {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void close() {
+ IOExceptionUtils.closeQuietly(out);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
new file mode 100644
index 0000000..5b1b657
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.compression;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface CompressionCodecFactory {
+
+ BytesInputCompressor getCompressor(CompressionCodecName codecName);
+
+ BytesInputDecompressor getDecompressor(CompressionCodecName codecName);
+
+ void release();
+
+ interface BytesInputCompressor {
+ BytesInput compress(BytesInput bytes) throws IOException;
+ CompressionCodecName getCodecName();
+ void release();
+ }
+
+ interface BytesInputDecompressor {
+ BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException;
+ void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException;
+ void release();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java b/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
new file mode 100644
index 0000000..bf2da32
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * This exception will be thrown when the codec is not supported by parquet, meaning there is no
+ * matching codec defined in {@link CompressionCodecName}
+ */
+public class CompressionCodecNotSupportedException extends RuntimeException {
+ private final Class codecClass;
+
+ public CompressionCodecNotSupportedException(Class codecClass) {
+ super("codec not supported: " + codecClass.getName());
+ this.codecClass = codecClass;
+ }
+
+ public Class getCodecClass() {
+ return codecClass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
new file mode 100644
index 0000000..8cdede0
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+
+import org.apache.parquet.format.CompressionCodec;
+import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException;
+import java.util.Locale;
+
+public enum CompressionCodecName {
+ UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""),
+ SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", CompressionCodec.SNAPPY, ".snappy"),
+ GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
+ LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
+ BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"),
+ LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
+ ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, ".zstd");
+
+ public static CompressionCodecName fromConf(String name) {
+ if (name == null) {
+ return UNCOMPRESSED;
+ }
+ return valueOf(name.toUpperCase(Locale.ENGLISH));
+ }
+
+ public static CompressionCodecName fromCompressionCodec(Class<?> clazz) {
+ if (clazz == null) {
+ return UNCOMPRESSED;
+ }
+ String name = clazz.getName();
+ for (CompressionCodecName codec : CompressionCodecName.values()) {
+ if (name.equals(codec.getHadoopCompressionCodecClassName())) {
+ return codec;
+ }
+ }
+ throw new CompressionCodecNotSupportedException(clazz);
+ }
+
+ public static CompressionCodecName fromParquet(CompressionCodec codec) {
+ for (CompressionCodecName codecName : CompressionCodecName.values()) {
+ if (codec.equals(codecName.parquetCompressionCodec)) {
+ return codecName;
+ }
+ }
+ throw new IllegalArgumentException("Unknown compression codec " + codec);
+ }
+
+ private final String hadoopCompressionCodecClass;
+ private final CompressionCodec parquetCompressionCodec;
+ private final String extension;
+
+ private CompressionCodecName(String hadoopCompressionCodecClass, CompressionCodec parquetCompressionCodec, String extension) {
+ this.hadoopCompressionCodecClass = hadoopCompressionCodecClass;
+ this.parquetCompressionCodec = parquetCompressionCodec;
+ this.extension = extension;
+ }
+
+ public String getHadoopCompressionCodecClassName() {
+ return hadoopCompressionCodecClass;
+ }
+
+ public Class getHadoopCompressionCodecClass() {
+ String codecClassName = getHadoopCompressionCodecClassName();
+ if (codecClassName==null) {
+ return null;
+ }
+ try {
+ return Class.forName(codecClassName);
+ } catch (ClassNotFoundException e) {
+ return null;
+ }
+ }
+
+ public CompressionCodec getParquetCompressionCodec() {
+ return parquetCompressionCodec;
+ }
+
+ public String getExtension() {
+ return extension;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
new file mode 100644
index 0000000..9e52428
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class DelegatingPositionOutputStream extends PositionOutputStream {
+ private final OutputStream stream;
+
+ public DelegatingPositionOutputStream(OutputStream stream) {
+ this.stream = stream;
+ }
+
+ public OutputStream getStream() {
+ return stream;
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public abstract long getPos() throws IOException;
+
+ @Override
+ public void write(int b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ stream.write(b, off, len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java
new file mode 100644
index 0000000..bc4940c
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingSeekableInputStream.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Implements read methods required by {@link SeekableInputStream} for generic input streams.
+ * <p>
+ * Implementations must implement {@link #getPos()} and {@link #seek(long)} and may optionally
+ * implement other read methods to improve performance.
+ */
+public abstract class DelegatingSeekableInputStream extends SeekableInputStream {
+
+ private final int COPY_BUFFER_SIZE = 8192;
+ private final byte[] temp = new byte[COPY_BUFFER_SIZE];
+
+ private final InputStream stream;
+
+ public DelegatingSeekableInputStream(InputStream stream) {
+ this.stream = stream;
+ }
+
+ public InputStream getStream() {
+ return stream;
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+
+ @Override
+ public abstract long getPos() throws IOException;
+
+ @Override
+ public abstract void seek(long newPos) throws IOException;
+
+ @Override
+ public int read() throws IOException {
+ return stream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ @Override
+ public void readFully(byte[] bytes) throws IOException {
+ readFully(stream, bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void readFully(byte[] bytes, int start, int len) throws IOException {
+ readFully(stream, bytes, start, len);
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (buf.hasArray()) {
+ return readHeapBuffer(stream, buf);
+ } else {
+ return readDirectBuffer(stream, buf, temp);
+ }
+ }
+
+ @Override
+ public void readFully(ByteBuffer buf) throws IOException {
+ if (buf.hasArray()) {
+ readFullyHeapBuffer(stream, buf);
+ } else {
+ readFullyDirectBuffer(stream, buf, temp);
+ }
+ }
+
+ // Visible for testing
+ static void readFully(InputStream f, byte[] bytes, int start, int len) throws IOException {
+ int offset = start;
+ int remaining = len;
+ while (remaining > 0) {
+ int bytesRead = f.read(bytes, offset, remaining);
+ if (bytesRead < 0) {
+ throw new EOFException(
+ "Reached the end of stream with " + remaining + " bytes left to read");
+ }
+
+ remaining -= bytesRead;
+ offset += bytesRead;
+ }
+ }
+
+ // Visible for testing
+ static int readHeapBuffer(InputStream f, ByteBuffer buf) throws IOException {
+ int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+ if (bytesRead < 0) {
+ // if this resulted in EOF, don't update position
+ return bytesRead;
+ } else {
+ buf.position(buf.position() + bytesRead);
+ return bytesRead;
+ }
+ }
+
+ // Visible for testing
+ static void readFullyHeapBuffer(InputStream f, ByteBuffer buf) throws IOException {
+ readFully(f, buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+ buf.position(buf.limit());
+ }
+
+ // Visible for testing
+ static int readDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) throws IOException {
+ // copy all the bytes that return immediately, stopping at the first
+ // read that doesn't return a full buffer.
+ int nextReadLength = Math.min(buf.remaining(), temp.length);
+ int totalBytesRead = 0;
+ int bytesRead;
+
+ while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) {
+ buf.put(temp);
+ totalBytesRead += bytesRead;
+ nextReadLength = Math.min(buf.remaining(), temp.length);
+ }
+
+ if (bytesRead < 0) {
+ // return -1 if nothing was read
+ return totalBytesRead == 0 ? -1 : totalBytesRead;
+ } else {
+ // copy the last partial buffer
+ buf.put(temp, 0, bytesRead);
+ totalBytesRead += bytesRead;
+ return totalBytesRead;
+ }
+ }
+
+ // Visible for testing
+ static void readFullyDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) throws IOException {
+ int nextReadLength = Math.min(buf.remaining(), temp.length);
+ int bytesRead = 0;
+
+ while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) {
+ buf.put(temp, 0, bytesRead);
+ nextReadLength = Math.min(buf.remaining(), temp.length);
+ }
+
+ if (bytesRead < 0 && buf.remaining() > 0) {
+ throw new EOFException(
+ "Reached the end of stream with " + buf.remaining() + " bytes left to read");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
index e2c7cc0..f910074 100644
--- a/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
+++ b/parquet-common/src/main/java/org/apache/parquet/io/InputFile.java
@@ -28,15 +28,16 @@ import java.io.IOException;
public interface InputFile {
/**
- * Returns the total length of the file, in bytes.
+ * @return the total length of the file, in bytes.
* @throws IOException if the length cannot be determined
*/
long getLength() throws IOException;
/**
- * Opens a new {@link SeekableInputStream} for the underlying
- * data file.
- * @throws IOException if the stream cannot be opened.
+ * Open a new {@link SeekableInputStream} for the underlying data file.
+ *
+ * @return a new {@link SeekableInputStream} to read the file
+ * @throws IOException if the stream cannot be opened
*/
SeekableInputStream newStream() throws IOException;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
new file mode 100644
index 0000000..2d6de44
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/OutputFile.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.IOException;
+
+public interface OutputFile {
+
+ PositionOutputStream create(long blockSizeHint) throws IOException;
+
+ PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException;
+
+ boolean supportsBlockSize();
+
+ long defaultBlockSize();
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java
new file mode 100644
index 0000000..066c46b
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/PositionOutputStream.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * {@code PositionOutputStream} is an interface with the methods needed by
+ * Parquet to write data to a file or Hadoop data stream.
+ */
+public abstract class PositionOutputStream extends OutputStream {
+
+ /**
+ * Reports the current position of this output stream.
+ *
+ * @return a long, the current position in bytes starting from 0
+ * @throws IOException when the underlying stream throws IOException
+ */
+ public abstract long getPos() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java b/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java
new file mode 100644
index 0000000..42e3a8a
--- /dev/null
+++ b/parquet-common/src/test/java/org/apache/parquet/io/MockInputStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.ByteArrayInputStream;
+
+class MockInputStream extends ByteArrayInputStream {
+
+ static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+ private int[] lengths;
+ private int current = 0;
+ MockInputStream(int... actualReadLengths) {
+ super(TEST_ARRAY);
+ this.lengths = actualReadLengths;
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) {
+ if (current < lengths.length) {
+ if (len <= lengths[current]) {
+ // when len == lengths[current], the next read will by 0 bytes
+ int bytesRead = super.read(b, off, len);
+ lengths[current] -= bytesRead;
+ return bytesRead;
+ } else {
+ int bytesRead = super.read(b, off, lengths[current]);
+ current += 1;
+ return bytesRead;
+ }
+ } else {
+ return super.read(b, off, len);
+ }
+ }
+
+ public long getPos() {
+ return this.pos;
+ }
+}
[3/4] parquet-mr git commit: PARQUET-1142: Add alternatives to Hadoop
classes in the API
Posted by bl...@apache.org.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java b/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
new file mode 100644
index 0000000..078bc8f
--- /dev/null
+++ b/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.io;
+
+import org.apache.parquet.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+
+import static org.apache.parquet.io.MockInputStream.TEST_ARRAY;
+
+
+public class TestDelegatingSeekableInputStream {
+
+ @Test
+ public void testReadFully() throws Exception {
+ byte[] buffer = new byte[5];
+
+ MockInputStream stream = new MockInputStream();
+ DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+
+ Assert.assertArrayEquals("Byte array contents should match",
+ Arrays.copyOfRange(TEST_ARRAY, 0, 5), buffer);
+ Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
+ }
+
+ @Test
+ public void testReadFullySmallReads() throws Exception {
+ byte[] buffer = new byte[5];
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+ DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+
+ Assert.assertArrayEquals("Byte array contents should match",
+ Arrays.copyOfRange(TEST_ARRAY, 0, 5), buffer);
+ Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
+ }
+
+ @Test
+ public void testReadFullyJustRight() throws Exception {
+ final byte[] buffer = new byte[10];
+
+ final MockInputStream stream = new MockInputStream(2, 3, 3);
+ DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+
+ Assert.assertArrayEquals("Byte array contents should match", TEST_ARRAY, buffer);
+ Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos());
+
+ TestUtils.assertThrows("Should throw EOFException if no more bytes left",
+ EOFException.class, new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ DelegatingSeekableInputStream.readFully(stream, buffer, 0, 1);
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testReadFullyUnderflow() throws Exception {
+ final byte[] buffer = new byte[11];
+
+ final MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ TestUtils.assertThrows("Should throw EOFException if no more bytes left",
+ EOFException.class, new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+ return null;
+ }
+ });
+
+ Assert.assertArrayEquals("Should have consumed bytes",
+ TEST_ARRAY, Arrays.copyOfRange(buffer, 0, 10));
+ Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos());
+ }
+
+ @Test
+ public void testReadFullyStartAndLength() throws IOException {
+ byte[] buffer = new byte[10];
+
+ MockInputStream stream = new MockInputStream();
+ DelegatingSeekableInputStream.readFully(stream, buffer, 2, 5);
+
+ Assert.assertArrayEquals("Byte array contents should match",
+ Arrays.copyOfRange(TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7));
+ Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
+ }
+
+ @Test
+ public void testReadFullyZeroByteRead() throws IOException {
+ byte[] buffer = new byte[0];
+
+ MockInputStream stream = new MockInputStream();
+ DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+
+ Assert.assertEquals("Stream position should reflect bytes read", 0, stream.getPos());
+ }
+
+ @Test
+ public void testReadFullySmallReadsWithStartAndLength() throws IOException {
+ byte[] buffer = new byte[10];
+
+ MockInputStream stream = new MockInputStream(2, 2, 3);
+ DelegatingSeekableInputStream.readFully(stream, buffer, 2, 5);
+
+ Assert.assertArrayEquals("Byte array contents should match",
+ Arrays.copyOfRange(TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7));
+ Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
+ }
+
+ private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() {
+ @Override
+ protected byte[] initialValue() {
+ return new byte[8192];
+ }
+ };
+
+ @Test
+ public void testHeapRead() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+ MockInputStream stream = new MockInputStream();
+
+ int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(-1, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapSmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(5);
+
+ MockInputStream stream = new MockInputStream();
+
+ int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(5, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(5, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(0, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
+ }
+
+ @Test
+ public void testHeapSmallReads() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(2, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapPosition() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+ readBuffer.position(10);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(8);
+
+ int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(8, len);
+ Assert.assertEquals(18, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(20, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(-1, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+ readBuffer.limit(8);
+
+ MockInputStream stream = new MockInputStream(7);
+
+ int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(0, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testHeapPositionAndLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+ readBuffer.position(5);
+ readBuffer.limit(13);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(7);
+
+ int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(12, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(13, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(0, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectRead() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+ MockInputStream stream = new MockInputStream();
+
+ int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(-1, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectSmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(5);
+
+ MockInputStream stream = new MockInputStream();
+
+ int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(5, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(5, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(0, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
+ }
+
+ @Test
+ public void testDirectSmallReads() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(2, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectPosition() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+ readBuffer.position(10);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(8);
+
+ int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(8, len);
+ Assert.assertEquals(18, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(20, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(-1, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(20);
+ readBuffer.limit(8);
+
+ MockInputStream stream = new MockInputStream(7);
+
+ int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(0, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectPositionAndLimit() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+ readBuffer.position(5);
+ readBuffer.limit(13);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(7);
+
+ int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(12, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(13, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(0, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectSmallTempBufferSmallReads() throws Exception {
+ byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
+
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(2, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(5, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(3, len);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(2, len);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(-1, len);
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception {
+ byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
+
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+ readBuffer.position(5);
+ readBuffer.limit(13);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(7);
+
+ int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(7, len);
+ Assert.assertEquals(12, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(13, readBuffer.position());
+ Assert.assertEquals(13, readBuffer.limit());
+
+ len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(0, len);
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullySmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocate(8);
+
+ MockInputStream stream = new MockInputStream();
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyLargeBuffer() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+ final MockInputStream stream = new MockInputStream();
+
+ TestUtils.assertThrows("Should throw EOFException",
+ EOFException.class, new Callable() {
+ @Override
+ public Object call() throws Exception {
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ return null;
+ }
+ });
+
+ Assert.assertEquals(0, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+ }
+
+ @Test
+ public void testHeapReadFullyJustRight() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+ MockInputStream stream = new MockInputStream();
+
+ // reads all of the bytes available without EOFException
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ // trying to read 0 more bytes doesn't result in EOFException
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullySmallReads() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyPosition() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.position(3);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyLimit() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.limit(7);
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testHeapReadFullyPositionAndLimit() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+ readBuffer.position(3);
+ readBuffer.limit(7);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullySmallBuffer() throws Exception {
+ ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
+
+ MockInputStream stream = new MockInputStream();
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(8, readBuffer.position());
+ Assert.assertEquals(8, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyLargeBuffer() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+ final MockInputStream stream = new MockInputStream();
+
+ TestUtils.assertThrows("Should throw EOFException",
+ EOFException.class, new Callable() {
+ @Override
+ public Object call() throws Exception {
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ return null;
+ }
+ });
+
+ // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
+ // several read operations that will read up to the end of the input. This
+ // is a correct value because the bytes in the buffer are valid. This
+ // behavior can't be implemented for the heap buffer without using the read
+ // method instead of the readFully method on the underlying
+ // FSDataInputStream.
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(20, readBuffer.limit());
+ }
+
+ @Test
+ public void testDirectReadFullyJustRight() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ MockInputStream stream = new MockInputStream();
+
+ // reads all of the bytes available without EOFException
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ // trying to read 0 more bytes doesn't result in EOFException
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullySmallReads() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyPosition() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.position(3);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyLimit() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.limit(7);
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.flip();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullyPositionAndLimit() throws Exception {
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.position(3);
+ readBuffer.limit(7);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+
+ @Test
+ public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws Exception {
+ byte[] temp = new byte[2]; // this will cause readFully to loop
+
+ final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+ readBuffer.position(3);
+ readBuffer.limit(7);
+ readBuffer.mark();
+
+ MockInputStream stream = new MockInputStream(2, 3, 3);
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(7, readBuffer.position());
+ Assert.assertEquals(7, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+ readBuffer.position(7);
+ readBuffer.limit(10);
+ DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, temp);
+ Assert.assertEquals(10, readBuffer.position());
+ Assert.assertEquals(10, readBuffer.limit());
+
+ readBuffer.reset();
+ Assert.assertEquals("Buffer contents should match",
+ ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
deleted file mode 100644
index 6e593c2..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A source of bytes capable of writing itself to an output.
- * A BytesInput should be consumed right away.
- * It is not a container.
- * For example if it is referring to a stream,
- * subsequent BytesInput reads from the stream will be incorrect
- * if the previous has not been consumed.
- *
- * @author Julien Le Dem
- *
- */
-abstract public class BytesInput {
- private static final Logger LOG = LoggerFactory.getLogger(BytesInput.class);
- private static final boolean DEBUG = false;//Log.DEBUG;
- private static final EmptyBytesInput EMPTY_BYTES_INPUT = new EmptyBytesInput();
-
- /**
- * logically concatenate the provided inputs
- * @param inputs the inputs to concatenate
- * @return a concatenated input
- */
- public static BytesInput concat(BytesInput... inputs) {
- return new SequenceBytesIn(Arrays.asList(inputs));
- }
-
- /**
- * logically concatenate the provided inputs
- * @param inputs the inputs to concatenate
- * @return a concatenated input
- */
- public static BytesInput concat(List<BytesInput> inputs) {
- return new SequenceBytesIn(inputs);
- }
-
- /**
- * @param in
- * @param bytes number of bytes to read
- * @return a BytesInput that will read that number of bytes from the stream
- */
- public static BytesInput from(InputStream in, int bytes) {
- return new StreamBytesInput(in, bytes);
- }
-
- /**
- * @param buffer
- * @param length number of bytes to read
- * @return a BytesInput that will read the given bytes from the ByteBuffer
- */
- public static BytesInput from(ByteBuffer buffer, int offset, int length) {
- return new ByteBufferBytesInput(buffer, offset, length);
- }
-
- /**
- *
- * @param in
- * @return a Bytes input that will write the given bytes
- */
- public static BytesInput from(byte[] in) {
- LOG.debug("BytesInput from array of {} bytes", in.length);
- return new ByteArrayBytesInput(in, 0 , in.length);
- }
-
- public static BytesInput from(byte[] in, int offset, int length) {
- LOG.debug("BytesInput from array of {} bytes", length);
- return new ByteArrayBytesInput(in, offset, length);
- }
-
- /**
- * @param intValue the int to write
- * @return a BytesInput that will write 4 bytes in little endian
- */
- public static BytesInput fromInt(int intValue) {
- return new IntBytesInput(intValue);
- }
-
- /**
- * @param intValue the int to write
- * @return a BytesInput that will write var int
- */
- public static BytesInput fromUnsignedVarInt(int intValue) {
- return new UnsignedVarIntBytesInput(intValue);
- }
-
- /**
- *
- * @param intValue the int to write
- */
- public static BytesInput fromZigZagVarInt(int intValue) {
- int zigZag = (intValue << 1) ^ (intValue >> 31);
- return new UnsignedVarIntBytesInput(zigZag);
- }
-
- /**
- * @param longValue the long to write
- * @return a BytesInput that will write var long
- */
- public static BytesInput fromUnsignedVarLong(long longValue) {
- return new UnsignedVarLongBytesInput(longValue);
- }
-
- /**
- *
- * @param longValue the long to write
- */
- public static BytesInput fromZigZagVarLong(long longValue) {
- long zigZag = (longValue << 1) ^ (longValue >> 63);
- return new UnsignedVarLongBytesInput(zigZag);
- }
-
- /**
- * @param arrayOut
- * @return a BytesInput that will write the content of the buffer
- */
- public static BytesInput from(CapacityByteArrayOutputStream arrayOut) {
- return new CapacityBAOSBytesInput(arrayOut);
- }
-
- /**
- * @param baos - stream to wrap into a BytesInput
- * @return a BytesInput that will write the content of the buffer
- */
- public static BytesInput from(ByteArrayOutputStream baos) {
- return new BAOSBytesInput(baos);
- }
-
- /**
- * @return an empty bytes input
- */
- public static BytesInput empty() {
- return EMPTY_BYTES_INPUT;
- }
-
- /**
- * copies the input into a new byte array
- * @param bytesInput
- * @return
- * @throws IOException
- */
- public static BytesInput copy(BytesInput bytesInput) throws IOException {
- return from(bytesInput.toByteArray());
- }
-
- /**
- * writes the bytes into a stream
- * @param out
- * @throws IOException
- */
- abstract public void writeAllTo(OutputStream out) throws IOException;
-
- /**
- *
- * @return a new byte array materializing the contents of this input
- * @throws IOException
- */
- public byte[] toByteArray() throws IOException {
- BAOS baos = new BAOS((int)size());
- this.writeAllTo(baos);
- LOG.debug("converted {} to byteArray of {} bytes", size() , baos.size());
- return baos.getBuf();
- }
-
- /**
- *
- * @return a new ByteBuffer materializing the contents of this input
- * @throws IOException
- */
- public ByteBuffer toByteBuffer() throws IOException {
- return ByteBuffer.wrap(toByteArray());
- }
-
- /**
- *
- * @return a new InputStream materializing the contents of this input
- * @throws IOException
- */
- public InputStream toInputStream() throws IOException {
- return new ByteBufferInputStream(toByteBuffer());
- }
-
- /**
- *
- * @return the size in bytes that would be written
- */
- abstract public long size();
-
- private static final class BAOS extends ByteArrayOutputStream {
- private BAOS(int size) {
- super(size);
- }
-
- public byte[] getBuf() {
- return this.buf;
- }
- }
-
- private static class StreamBytesInput extends BytesInput {
- private static final Logger LOG = LoggerFactory.getLogger(BytesInput.StreamBytesInput.class);
- private final InputStream in;
- private final int byteCount;
-
- private StreamBytesInput(InputStream in, int byteCount) {
- super();
- this.in = in;
- this.byteCount = byteCount;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- LOG.debug("write All {} bytes", byteCount);
- // TODO: more efficient
- out.write(this.toByteArray());
- }
-
- public byte[] toByteArray() throws IOException {
- LOG.debug("read all {} bytes", byteCount);
- byte[] buf = new byte[byteCount];
- new DataInputStream(in).readFully(buf);
- return buf;
- }
-
- @Override
- public long size() {
- return byteCount;
- }
-
- }
-
- private static class SequenceBytesIn extends BytesInput {
- private static final Logger LOG = LoggerFactory.getLogger(BytesInput.SequenceBytesIn.class);
-
- private final List<BytesInput> inputs;
- private final long size;
-
- private SequenceBytesIn(List<BytesInput> inputs) {
- this.inputs = inputs;
- long total = 0;
- for (BytesInput input : inputs) {
- total += input.size();
- }
- this.size = total;
- }
-
- @SuppressWarnings("unused")
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- for (BytesInput input : inputs) {
-
- LOG.debug("write {} bytes to out", input.size());
- if (input instanceof SequenceBytesIn) LOG.debug("{");
- input.writeAllTo(out);
- if (input instanceof SequenceBytesIn) LOG.debug("}");
- }
- }
-
- @Override
- public long size() {
- return size;
- }
-
- }
-
- private static class IntBytesInput extends BytesInput {
-
- private final int intValue;
-
- public IntBytesInput(int intValue) {
- this.intValue = intValue;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- BytesUtils.writeIntLittleEndian(out, intValue);
- }
-
- public ByteBuffer toByteBuffer() throws IOException {
- return ByteBuffer.allocate(4).putInt(0, intValue);
- }
-
- @Override
- public long size() {
- return 4;
- }
-
- }
-
- private static class UnsignedVarIntBytesInput extends BytesInput {
-
- private final int intValue;
-
- public UnsignedVarIntBytesInput(int intValue) {
- this.intValue = intValue;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- BytesUtils.writeUnsignedVarInt(intValue, out);
- }
-
- public ByteBuffer toByteBuffer() throws IOException {
- ByteBuffer ret = ByteBuffer.allocate((int) size());
- BytesUtils.writeUnsignedVarInt(intValue, ret);
- return ret;
- }
-
- @Override
- public long size() {
- int s = (38 - Integer.numberOfLeadingZeros(intValue)) / 7;
- return s == 0 ? 1 : s;
- }
- }
-
- private static class UnsignedVarLongBytesInput extends BytesInput {
-
- private final long longValue;
-
- public UnsignedVarLongBytesInput(long longValue) {
- this.longValue = longValue;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- BytesUtils.writeUnsignedVarLong(longValue, out);
- }
-
- @Override
- public long size() {
- int s = (70 - Long.numberOfLeadingZeros(longValue)) / 7;
- return s == 0 ? 1 : s;
- }
- }
-
- private static class EmptyBytesInput extends BytesInput {
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- }
-
- @Override
- public long size() {
- return 0;
- }
-
- public ByteBuffer toByteBuffer() throws IOException {
- return ByteBuffer.allocate(0);
- }
-
- }
-
- private static class CapacityBAOSBytesInput extends BytesInput {
-
- private final CapacityByteArrayOutputStream arrayOut;
-
- private CapacityBAOSBytesInput(CapacityByteArrayOutputStream arrayOut) {
- this.arrayOut = arrayOut;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- arrayOut.writeTo(out);
- }
-
- @Override
- public long size() {
- return arrayOut.size();
- }
-
- }
-
- private static class BAOSBytesInput extends BytesInput {
-
- private final ByteArrayOutputStream arrayOut;
-
- private BAOSBytesInput(ByteArrayOutputStream arrayOut) {
- this.arrayOut = arrayOut;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- arrayOut.writeTo(out);
- }
-
- @Override
- public long size() {
- return arrayOut.size();
- }
-
- }
-
- private static class ByteArrayBytesInput extends BytesInput {
-
- private final byte[] in;
- private final int offset;
- private final int length;
-
- private ByteArrayBytesInput(byte[] in, int offset, int length) {
- this.in = in;
- this.offset = offset;
- this.length = length;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- out.write(in, offset, length);
- }
-
- public ByteBuffer toByteBuffer() throws IOException {
- return ByteBuffer.wrap(in, offset, length);
- }
-
- @Override
- public long size() {
- return length;
- }
-
- }
-
- private static class ByteBufferBytesInput extends BytesInput {
-
- private final ByteBuffer byteBuf;
- private final int length;
- private final int offset;
-
- private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) {
- this.byteBuf = byteBuf;
- this.offset = offset;
- this.length = length;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- final WritableByteChannel outputChannel = Channels.newChannel(out);
- byteBuf.position(offset);
- ByteBuffer tempBuf = byteBuf.slice();
- tempBuf.limit(length);
- outputChannel.write(tempBuf);
- }
-
- @Override
- public ByteBuffer toByteBuffer() throws IOException {
- byteBuf.position(offset);
- ByteBuffer buf = byteBuf.slice();
- buf.limit(length);
- return buf;
- }
-
- @Override
- public long size() {
- return length;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
deleted file mode 100644
index 92674d4..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import static java.lang.Math.max;
-import static java.lang.Math.pow;
-import static java.lang.String.format;
-import static org.apache.parquet.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.parquet.OutputStreamCloseException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying.
- * Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a new larger array, this output
- * stream grows by allocating a new array (slab) and adding it to a list of previous arrays.
- *
- * Each new slab is allocated to be the same size as all the previous slabs combined, so these allocations become
- * exponentially less frequent, just like ByteArrayOutputStream, with one difference. This output stream accepts a
- * max capacity hint, which is a hint describing the max amount of data that will be written to this stream. As the
- * total size of this stream nears this max, this stream starts to grow linearly instead of exponentially.
- * So new slabs are allocated to be 1/5th of the max capacity hint,
- * instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly
- * twice the needed space when a new slab is added just before the stream is done being used.
- *
- * When reusing this stream it will adjust the initial slab size based on the previous data size, aiming for fewer
- * allocations, with the assumption that a similar amount of data will be written to this stream on re-use.
- * See ({@link CapacityByteArrayOutputStream#reset()}).
- *
- * @author Julien Le Dem
- *
- */
-public class CapacityByteArrayOutputStream extends OutputStream {
- private static final Logger LOG = LoggerFactory.getLogger(CapacityByteArrayOutputStream.class);
- private static final ByteBuffer EMPTY_SLAB = ByteBuffer.wrap(new byte[0]);
-
- private int initialSlabSize;
- private final int maxCapacityHint;
- private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();
-
- private ByteBuffer currentSlab;
- private int currentSlabIndex;
- private int bytesAllocated = 0;
- private int bytesUsed = 0;
- private ByteBufferAllocator allocator;
-
- /**
- * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it
- * will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be
- * a balance between the overhead of creating new slabs and wasting memory by eagerly making
- * initial slabs too big.
- *
- * Note that targetCapacity here need not match maxCapacityHint in the constructor of
- * CapacityByteArrayOutputStream, though often that would make sense.
- *
- * @param minSlabSize no matter what we shouldn't make slabs any smaller than this
- * @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have?
- * @param targetNumSlabs how many slabs should it take to reach targetCapacity?
- */
- public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) {
- // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times
- // before reaching the targetCapacity
- // eg for page size of 1MB we start at 1024 bytes.
- // we also don't want to start too small, so we also apply a minimum.
- return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
- }
-
- public static CapacityByteArrayOutputStream withTargetNumSlabs(
- int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
- return withTargetNumSlabs(minSlabSize, maxCapacityHint, targetNumSlabs, new HeapByteBufferAllocator());
- }
-
- /**
- * Construct a CapacityByteArrayOutputStream configured such that its initial slab size is
- * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint
- */
- public static CapacityByteArrayOutputStream withTargetNumSlabs(
- int minSlabSize, int maxCapacityHint, int targetNumSlabs, ByteBufferAllocator allocator) {
-
- return new CapacityByteArrayOutputStream(
- initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
- maxCapacityHint, allocator);
- }
-
- /**
- * Defaults maxCapacityHint to 1MB
- * @param initialSlabSize
- * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
- */
- @Deprecated
- public CapacityByteArrayOutputStream(int initialSlabSize) {
- this(initialSlabSize, 1024 * 1024, new HeapByteBufferAllocator());
- }
-
- /**
- * Defaults maxCapacityHint to 1MB
- * @param initialSlabSize
- * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
- */
- @Deprecated
- public CapacityByteArrayOutputStream(int initialSlabSize, ByteBufferAllocator allocator) {
- this(initialSlabSize, 1024 * 1024, allocator);
- }
-
- /**
- * @param initialSlabSize the size to make the first slab
- * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
- * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
- */
- @Deprecated
- public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
- this(initialSlabSize, maxCapacityHint, new HeapByteBufferAllocator());
- }
-
- /**
- * @param initialSlabSize the size to make the first slab
- * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
- */
- public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint, ByteBufferAllocator allocator) {
- checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
- checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
- checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint));
- this.initialSlabSize = initialSlabSize;
- this.maxCapacityHint = maxCapacityHint;
- this.allocator = allocator;
- reset();
- }
-
- /**
- * the new slab is guaranteed to be at least minimumSize
- * @param minimumSize the size of the data we want to copy in the new slab
- */
- private void addSlab(int minimumSize) {
- int nextSlabSize;
-
- if (bytesUsed == 0) {
- nextSlabSize = initialSlabSize;
- } else if (bytesUsed > maxCapacityHint / 5) {
- // to avoid an overhead of up to twice the needed size, we get linear when approaching target page size
- nextSlabSize = maxCapacityHint / 5;
- } else {
- // double the size every time
- nextSlabSize = bytesUsed;
- }
-
- if (nextSlabSize < minimumSize) {
- LOG.debug("slab size {} too small for value of size {}. Bumping up slab size", nextSlabSize, minimumSize);
- nextSlabSize = minimumSize;
- }
-
- LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize);
-
- this.currentSlab = allocator.allocate(nextSlabSize);
- this.slabs.add(currentSlab);
- this.bytesAllocated += nextSlabSize;
- this.currentSlabIndex = 0;
- }
-
- @Override
- public void write(int b) {
- if (!currentSlab.hasRemaining()) {
- addSlab(1);
- }
- currentSlab.put(currentSlabIndex, (byte) b);
- currentSlabIndex += 1;
- currentSlab.position(currentSlabIndex);
- bytesUsed += 1;
- }
-
- @Override
- public void write(byte b[], int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) - b.length > 0)) {
- throw new IndexOutOfBoundsException(
- String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off));
- }
- if (len >= currentSlab.remaining()) {
- final int length1 = currentSlab.remaining();
- currentSlab.put(b, off, length1);
- bytesUsed += length1;
- currentSlabIndex += length1;
- final int length2 = len - length1;
- addSlab(length2);
- currentSlab.put(b, off + length1, length2);
- currentSlabIndex = length2;
- bytesUsed += length2;
- } else {
- currentSlab.put(b, off, len);
- currentSlabIndex += len;
- bytesUsed += len;
- }
- }
-
- private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException {
- if (buf.hasArray()) {
- out.write(buf.array(), buf.arrayOffset(), len);
- } else {
- // The OutputStream interface only takes a byte[], unfortunately this means that a ByteBuffer
- // not backed by a byte array must be copied to fulfil this interface
- byte[] copy = new byte[len];
- buf.flip();
- buf.get(copy);
- out.write(copy);
- }
- }
-
- /**
- * Writes the complete contents of this buffer to the specified output stream argument. the output
- * stream's write method <code>out.write(slab, 0, slab.length)</code>) will be called once per slab.
- *
- * @param out the output stream to which to write the data.
- * @exception IOException if an I/O error occurs.
- */
- public void writeTo(OutputStream out) throws IOException {
- for (int i = 0; i < slabs.size() - 1; i++) {
- writeToOutput(out, slabs.get(i), slabs.get(i).position());
- }
- writeToOutput(out, currentSlab, currentSlabIndex);
- }
-
- /**
- * @return The total size in bytes of data written to this stream.
- */
- public long size() {
- return bytesUsed;
- }
-
- /**
- *
- * @return The total size in bytes currently allocated for this stream.
- */
- public int getCapacity() {
- return bytesAllocated;
- }
-
- /**
- * When re-using an instance with reset, it will adjust slab size based on previous data size.
- * The intent is to reuse the same instance for the same type of data (for example, the same column).
- * The assumption is that the size in the buffer will be consistent.
- */
- public void reset() {
- // readjust slab size.
- // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size
- this.initialSlabSize = max(bytesUsed / 7, initialSlabSize);
- LOG.debug("initial slab of size {}", initialSlabSize);
- for (ByteBuffer slab : slabs) {
- allocator.release(slab);
- }
- this.slabs.clear();
- this.bytesAllocated = 0;
- this.bytesUsed = 0;
- this.currentSlab = EMPTY_SLAB;
- this.currentSlabIndex = 0;
- }
-
- /**
- * @return the index of the last value written to this stream, which
- * can be passed to {@link #setByte(long, byte)} in order to change it
- */
- public long getCurrentIndex() {
- checkArgument(bytesUsed > 0, "This is an empty stream");
- return bytesUsed - 1;
- }
-
- /**
- * Replace the byte stored at position index in this stream with value
- *
- * @param index which byte to replace
- * @param value the value to replace it with
- */
- public void setByte(long index, byte value) {
- checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of: " + bytesUsed);
-
- long seen = 0;
- for (int i = 0; i < slabs.size(); i++) {
- ByteBuffer slab = slabs.get(i);
- if (index < seen + slab.limit()) {
- // ok found index
- slab.put((int)(index-seen), value);
- break;
- }
- seen += slab.limit();
- }
- }
-
- /**
- * @param prefix a prefix to be used for every new line in the string
- * @return a text representation of the memory usage of this structure
- */
- public String memUsageString(String prefix) {
- return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), getCapacity());
- }
-
- /**
- * @return the total number of allocated slabs
- */
- int getSlabCount() {
- return slabs.size();
- }
-
- @Override
- public void close() {
- for (ByteBuffer slab : slabs) {
- allocator.release(slab);
- }
- try {
- super.close();
- }catch(IOException e){
- throw new OutputStreamCloseException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
deleted file mode 100644
index d333168..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.lang.String.format;
-
-public class ConcatenatingByteArrayCollector extends BytesInput {
- private final List<byte[]> slabs = new ArrayList<byte[]>();
- private long size = 0;
-
- public void collect(BytesInput bytesInput) throws IOException {
- byte[] bytes = bytesInput.toByteArray();
- slabs.add(bytes);
- size += bytes.length;
- }
-
- public void reset() {
- size = 0;
- slabs.clear();
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- for (byte[] slab : slabs) {
- out.write(slab);
- }
- }
-
- @Override
- public long size() {
- return size;
- }
-
- /**
- * @param prefix a prefix to be used for every new line in the string
- * @return a text representation of the memory usage of this structure
- */
- public String memUsageString(String prefix) {
- return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
deleted file mode 100644
index a092753..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Based on DataInputStream but little endian and without the String/char methods
- *
- * @author Julien Le Dem
- *
- */
-public final class LittleEndianDataInputStream extends InputStream {
-
- private final InputStream in;
-
- /**
- * Creates a LittleEndianDataInputStream that uses the specified
- * underlying InputStream.
- *
- * @param in the specified input stream
- */
- public LittleEndianDataInputStream(InputStream in) {
- this.in = in;
- }
-
- /**
- * See the general contract of the <code>readFully</code>
- * method of <code>DataInput</code>.
- * <p>
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @param b the buffer into which the data is read.
- * @exception EOFException if this input stream reaches the end before
- * reading all the bytes.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final void readFully(byte b[]) throws IOException {
- readFully(b, 0, b.length);
- }
-
- /**
- * See the general contract of the <code>readFully</code>
- * method of <code>DataInput</code>.
- * <p>
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @param b the buffer into which the data is read.
- * @param off the start offset of the data.
- * @param len the number of bytes to read.
- * @exception EOFException if this input stream reaches the end before
- * reading all the bytes.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final void readFully(byte b[], int off, int len) throws IOException {
- if (len < 0)
- throw new IndexOutOfBoundsException();
- int n = 0;
- while (n < len) {
- int count = in.read(b, off + n, len - n);
- if (count < 0)
- throw new EOFException();
- n += count;
- }
- }
-
- /**
- * See the general contract of the <code>skipBytes</code>
- * method of <code>DataInput</code>.
- * <p>
- * Bytes for this operation are read from the contained
- * input stream.
- *
- * @param n the number of bytes to be skipped.
- * @return the actual number of bytes skipped.
- * @exception IOException if the contained input stream does not support
- * seek, or the stream has been closed and
- * the contained input stream does not support
- * reading after close, or another I/O error occurs.
- */
- public final int skipBytes(int n) throws IOException {
- int total = 0;
- int cur = 0;
-
- while ((total<n) && ((cur = (int) in.skip(n-total)) > 0)) {
- total += cur;
- }
-
- return total;
- }
-
- /**
- * @return
- * @throws IOException
- * @see java.io.InputStream#read()
- */
- public int read() throws IOException {
- return in.read();
- }
-
- /**
- * @return
- * @see java.lang.Object#hashCode()
- */
- public int hashCode() {
- return in.hashCode();
- }
-
- /**
- * @param b
- * @return
- * @throws IOException
- * @see java.io.InputStream#read(byte[])
- */
- public int read(byte[] b) throws IOException {
- return in.read(b);
- }
-
- /**
- * @param obj
- * @return
- * @see java.lang.Object#equals(java.lang.Object)
- */
- public boolean equals(Object obj) {
- return in.equals(obj);
- }
-
- /**
- * @param b
- * @param off
- * @param len
- * @return
- * @throws IOException
- * @see java.io.InputStream#read(byte[], int, int)
- */
- public int read(byte[] b, int off, int len) throws IOException {
- return in.read(b, off, len);
- }
-
- /**
- * @param n
- * @return
- * @throws IOException
- * @see java.io.InputStream#skip(long)
- */
- public long skip(long n) throws IOException {
- return in.skip(n);
- }
-
- /**
- * @return
- * @throws IOException
- * @see java.io.InputStream#available()
- */
- public int available() throws IOException {
- return in.available();
- }
-
- /**
- * @throws IOException
- * @see java.io.InputStream#close()
- */
- public void close() throws IOException {
- in.close();
- }
-
- /**
- * @param readlimit
- * @see java.io.InputStream#mark(int)
- */
- public void mark(int readlimit) {
- in.mark(readlimit);
- }
-
- /**
- * @throws IOException
- * @see java.io.InputStream#reset()
- */
- public void reset() throws IOException {
- in.reset();
- }
-
- /**
- * @return
- * @see java.io.InputStream#markSupported()
- */
- public boolean markSupported() {
- return in.markSupported();
- }
-
- /**
- * See the general contract of the <code>readBoolean</code>
- * method of <code>DataInput</code>.
- * <p>
- * Bytes for this operation are read from the contained
- * input stream.
- *
- * @return the <code>boolean</code> value read.
- * @exception EOFException if this input stream has reached the end.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final boolean readBoolean() throws IOException {
- int ch = in.read();
- if (ch < 0)
- throw new EOFException();
- return (ch != 0);
- }
-
- /**
- * See the general contract of the <code>readByte</code>
- * method of <code>DataInput</code>.
- * <p>
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @return the next byte of this input stream as a signed 8-bit
- * <code>byte</code>.
- * @exception EOFException if this input stream has reached the end.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final byte readByte() throws IOException {
- int ch = in.read();
- if (ch < 0)
- throw new EOFException();
- return (byte)(ch);
- }
-
- /**
- * See the general contract of the <code>readUnsignedByte</code>
- * method of <code>DataInput</code>.
- * <p>
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @return the next byte of this input stream, interpreted as an
- * unsigned 8-bit number.
- * @exception EOFException if this input stream has reached the end.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final int readUnsignedByte() throws IOException {
- int ch = in.read();
- if (ch < 0)
- throw new EOFException();
- return ch;
- }
-
- /**
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @return the next two bytes of this input stream, interpreted as a
- * signed 16-bit number.
- * @exception EOFException if this input stream reaches the end before
- * reading two bytes.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final short readShort() throws IOException {
- int ch2 = in.read();
- int ch1 = in.read();
- if ((ch1 | ch2) < 0)
- throw new EOFException();
- return (short)((ch1 << 8) + (ch2 << 0));
- }
-
- /**
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @return the next two bytes of this input stream, interpreted as an
- * unsigned 16-bit integer.
- * @exception EOFException if this input stream reaches the end before
- * reading two bytes.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final int readUnsignedShort() throws IOException {
- int ch2 = in.read();
- int ch1 = in.read();
- if ((ch1 | ch2) < 0)
- throw new EOFException();
- return (ch1 << 8) + (ch2 << 0);
- }
-
- /**
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @return the next four bytes of this input stream, interpreted as an
- * <code>int</code>.
- * @exception EOFException if this input stream reaches the end before
- * reading four bytes.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final int readInt() throws IOException {
- // TODO: has this been benchmarked against two alternate implementations?
- // 1) Integer.reverseBytes(in.readInt())
- // 2) keep a member byte[4], wrapped by an IntBuffer with appropriate endianness set,
- // and call IntBuffer.get()
- // Both seem like they might be faster.
- int ch4 = in.read();
- int ch3 = in.read();
- int ch2 = in.read();
- int ch1 = in.read();
- if ((ch1 | ch2 | ch3 | ch4) < 0)
- throw new EOFException();
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
- }
-
- private byte readBuffer[] = new byte[8];
-
- /**
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @return the next eight bytes of this input stream, interpreted as a
- * <code>long</code>.
- * @exception EOFException if this input stream reaches the end before
- * reading eight bytes.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public final long readLong() throws IOException {
- // TODO: see perf question above in readInt
- readFully(readBuffer, 0, 8);
- return (((long)readBuffer[7] << 56) +
- ((long)(readBuffer[6] & 255) << 48) +
- ((long)(readBuffer[5] & 255) << 40) +
- ((long)(readBuffer[4] & 255) << 32) +
- ((long)(readBuffer[3] & 255) << 24) +
- ((readBuffer[2] & 255) << 16) +
- ((readBuffer[1] & 255) << 8) +
- ((readBuffer[0] & 255) << 0));
- }
-
- /**
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @return the next four bytes of this input stream, interpreted as a
- * <code>float</code>.
- * @exception EOFException if this input stream reaches the end before
- * reading four bytes.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.lang.Float#intBitsToFloat(int)
- */
- public final float readFloat() throws IOException {
- return Float.intBitsToFloat(readInt());
- }
-
- /**
- * Bytes
- * for this operation are read from the contained
- * input stream.
- *
- * @return the next eight bytes of this input stream, interpreted as a
- * <code>double</code>.
- * @exception EOFException if this input stream reaches the end before
- * reading eight bytes.
- * @exception IOException the stream has been closed and the contained
- * input stream does not support reading after close, or
- * another I/O error occurs.
- * @see java.lang.Double#longBitsToDouble(long)
- */
- public final double readDouble() throws IOException {
- return Double.longBitsToDouble(readLong());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
deleted file mode 100644
index 9d4a8a9..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import org.apache.parquet.IOExceptionUtils;
-import org.apache.parquet.ParquetRuntimeException;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Based on DataOutputStream but in little endian and without the String/char methods
- *
- * @author Julien Le Dem
- *
- */
-public class LittleEndianDataOutputStream extends OutputStream {
-
- private final OutputStream out;
-
- /**
- * Creates a new data output stream to write data to the specified
- * underlying output stream. The counter <code>written</code> is
- * set to zero.
- *
- * @param out the underlying output stream, to be saved for later
- * use.
- * @see java.io.FilterOutputStream#out
- */
- public LittleEndianDataOutputStream(OutputStream out) {
- this.out = out;
- }
-
- /**
- * Writes the specified byte (the low eight bits of the argument
- * <code>b</code>) to the underlying output stream. If no exception
- * is thrown, the counter <code>written</code> is incremented by
- * <code>1</code>.
- * <p>
- * Implements the <code>write</code> method of <code>OutputStream</code>.
- *
- * @param b the <code>byte</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- public void write(int b) throws IOException {
- out.write(b);
- }
-
- /**
- * Writes <code>len</code> bytes from the specified byte array
- * starting at offset <code>off</code> to the underlying output stream.
- * If no exception is thrown, the counter <code>written</code> is
- * incremented by <code>len</code>.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- public void write(byte b[], int off, int len) throws IOException {
- out.write(b, off, len);
- }
-
- /**
- * Flushes this data output stream. This forces any buffered output
- * bytes to be written out to the stream.
- * <p>
- * The <code>flush</code> method of <code>DataOutputStream</code>
- * calls the <code>flush</code> method of its underlying output stream.
- *
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.io.OutputStream#flush()
- */
- public void flush() throws IOException {
- out.flush();
- }
-
- /**
- * Writes a <code>boolean</code> to the underlying output stream as
- * a 1-byte value. The value <code>true</code> is written out as the
- * value <code>(byte)1</code>; the value <code>false</code> is
- * written out as the value <code>(byte)0</code>. If no exception is
- * thrown, the counter <code>written</code> is incremented by
- * <code>1</code>.
- *
- * @param v a <code>boolean</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- public final void writeBoolean(boolean v) throws IOException {
- out.write(v ? 1 : 0);
- }
-
- /**
- * Writes out a <code>byte</code> to the underlying output stream as
- * a 1-byte value. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>1</code>.
- *
- * @param v a <code>byte</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- public final void writeByte(int v) throws IOException {
- out.write(v);
- }
-
- /**
- * Writes a <code>short</code> to the underlying output stream as two
- * bytes, low byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>2</code>.
- *
- * @param v a <code>short</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- public final void writeShort(int v) throws IOException {
- out.write((v >>> 0) & 0xFF);
- out.write((v >>> 8) & 0xFF);
- }
-
- /**
- * Writes an <code>int</code> to the underlying output stream as four
- * bytes, low byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>4</code>.
- *
- * @param v an <code>int</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- public final void writeInt(int v) throws IOException {
- // TODO: see note in LittleEndianDataInputStream: maybe faster
- // to use Integer.reverseBytes() and then writeInt, or a ByteBuffer
- // approach
- out.write((v >>> 0) & 0xFF);
- out.write((v >>> 8) & 0xFF);
- out.write((v >>> 16) & 0xFF);
- out.write((v >>> 24) & 0xFF);
- }
-
- private byte writeBuffer[] = new byte[8];
-
- /**
- * Writes a <code>long</code> to the underlying output stream as eight
- * bytes, low byte first. In no exception is thrown, the counter
- * <code>written</code> is incremented by <code>8</code>.
- *
- * @param v a <code>long</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- public final void writeLong(long v) throws IOException {
- writeBuffer[7] = (byte)(v >>> 56);
- writeBuffer[6] = (byte)(v >>> 48);
- writeBuffer[5] = (byte)(v >>> 40);
- writeBuffer[4] = (byte)(v >>> 32);
- writeBuffer[3] = (byte)(v >>> 24);
- writeBuffer[2] = (byte)(v >>> 16);
- writeBuffer[1] = (byte)(v >>> 8);
- writeBuffer[0] = (byte)(v >>> 0);
- out.write(writeBuffer, 0, 8);
- }
-
- /**
- * Converts the float argument to an <code>int</code> using the
- * <code>floatToIntBits</code> method in class <code>Float</code>,
- * and then writes that <code>int</code> value to the underlying
- * output stream as a 4-byte quantity, low byte first. If no
- * exception is thrown, the counter <code>written</code> is
- * incremented by <code>4</code>.
- *
- * @param v a <code>float</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.lang.Float#floatToIntBits(float)
- */
- public final void writeFloat(float v) throws IOException {
- writeInt(Float.floatToIntBits(v));
- }
-
- /**
- * Converts the double argument to a <code>long</code> using the
- * <code>doubleToLongBits</code> method in class <code>Double</code>,
- * and then writes that <code>long</code> value to the underlying
- * output stream as an 8-byte quantity, low byte first. If no
- * exception is thrown, the counter <code>written</code> is
- * incremented by <code>8</code>.
- *
- * @param v a <code>double</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.lang.Double#doubleToLongBits(double)
- */
- public final void writeDouble(double v) throws IOException {
- writeLong(Double.doubleToLongBits(v));
- }
-
- public void close() {
- IOExceptionUtils.closeQuietly(out);
- }
-
-}