You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by pr...@apache.org on 2017/03/23 22:58:12 UTC
orc git commit: ORC-166 : add codec pool to ORC;
make sure end is called on underlying codecs (Sergey Shelukhin)
Repository: orc
Updated Branches:
refs/heads/master a393bca04 -> 37a76d529
ORC-166 : add codec pool to ORC; make sure end is called on underlying codecs (Sergey Shelukhin)
Fixes #102
Signed-off-by: Prasanth Jayachandran <pr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/37a76d52
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/37a76d52
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/37a76d52
Branch: refs/heads/master
Commit: 37a76d52943fb660a81ec39b1d62cd9fa9379c74
Parents: a393bca
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Mar 20 19:13:06 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Mar 23 15:56:55 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/orc/CompressionCodec.java | 5 +
.../src/java/org/apache/orc/DataReader.java | 4 +
.../src/java/org/apache/orc/PhysicalWriter.java | 2 +
.../org/apache/orc/impl/AircompressorCodec.java | 10 ++
.../java/org/apache/orc/impl/HadoopShims.java | 2 +
.../org/apache/orc/impl/HadoopShimsCurrent.java | 48 ++++++--
.../java/org/apache/orc/impl/OrcCodecPool.java | 99 +++++++++++++++++
.../src/java/org/apache/orc/impl/OrcTail.java | 14 +--
.../org/apache/orc/impl/PhysicalFsWriter.java | 9 +-
.../java/org/apache/orc/impl/ReaderImpl.java | 34 ++++--
.../org/apache/orc/impl/RecordReaderImpl.java | 19 ++--
.../org/apache/orc/impl/RecordReaderUtils.java | 12 +-
.../java/org/apache/orc/impl/SnappyCodec.java | 34 ++++--
.../java/org/apache/orc/impl/WriterImpl.java | 20 ++--
.../src/java/org/apache/orc/impl/ZlibCodec.java | 110 +++++++++++--------
.../test/org/apache/orc/TestVectorOrcFile.java | 110 ++++++++++++++++---
16 files changed, 423 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/CompressionCodec.java b/java/core/src/java/org/apache/orc/CompressionCodec.java
index d569bb6..dd517b3 100644
--- a/java/core/src/java/org/apache/orc/CompressionCodec.java
+++ b/java/core/src/java/org/apache/orc/CompressionCodec.java
@@ -64,4 +64,9 @@ public interface CompressionCodec {
*/
CompressionCodec modify(EnumSet<Modifier> modifiers);
+ /** Resets the codec, preparing it for reuse. */
+ void reset();
+
+ /** Closes the codec, releasing the resources. */
+ void close();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/DataReader.java b/java/core/src/java/org/apache/orc/DataReader.java
index ae11bf3..3155862 100644
--- a/java/core/src/java/org/apache/orc/DataReader.java
+++ b/java/core/src/java/org/apache/orc/DataReader.java
@@ -78,4 +78,8 @@ public interface DataReader extends AutoCloseable, Cloneable {
@Override
public void close() throws IOException;
+
+ /** Returns the compression codec used by this datareader.
+ * @return */
+ CompressionCodec getCompressionCodec();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java
index 9953d41..7589aa5 100644
--- a/java/core/src/java/org/apache/orc/PhysicalWriter.java
+++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java
@@ -130,4 +130,6 @@ public interface PhysicalWriter {
OrcProto.StripeInformation.Builder dirEntry
) throws IOException;
+ /** Gets a compression codec used by this writer. */
+ CompressionCodec getCompressionCodec();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
index a304730..39d678c 100644
--- a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
@@ -99,4 +99,14 @@ public class AircompressorCodec implements CompressionCodec {
// snappy allows no modifications
return this;
}
+
+ @Override
+ public void reset() {
+ // Nothing to do.
+ }
+
+ @Override
+ public void close() {
+ // Nothing to do.
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims.java b/java/core/src/java/org/apache/orc/impl/HadoopShims.java
index 8d505d2..3047376 100644
--- a/java/core/src/java/org/apache/orc/impl/HadoopShims.java
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShims.java
@@ -38,6 +38,8 @@ public interface HadoopShims {
interface DirectDecompressor {
void decompress(ByteBuffer var1, ByteBuffer var2) throws IOException;
+ void reset();
+ void end();
}
/**
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
index 54a4824..9f40272 100644
--- a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
@@ -21,6 +21,7 @@ package org.apache.orc.impl;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
import java.io.DataInputStream;
@@ -33,31 +34,62 @@ import java.nio.ByteBuffer;
*/
public class HadoopShimsCurrent implements HadoopShims {
- private static class DirectDecompressWrapper implements DirectDecompressor {
- private final org.apache.hadoop.io.compress.DirectDecompressor root;
+ private static class SnappyDirectDecompressWrapper implements DirectDecompressor {
+ private final SnappyDirectDecompressor root;
- DirectDecompressWrapper(org.apache.hadoop.io.compress.DirectDecompressor root) {
+ SnappyDirectDecompressWrapper(SnappyDirectDecompressor root) {
this.root = root;
}
- public void decompress(ByteBuffer input,
- ByteBuffer output) throws IOException {
+ public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
root.decompress(input, output);
}
+
+ @Override
+ public void reset() {
+ root.reset();
+ }
+
+ @Override
+ public void end() {
+ root.end();
+ }
+ }
+
+ private static class ZlibDirectDecompressWrapper implements DirectDecompressor {
+ private final ZlibDecompressor.ZlibDirectDecompressor root;
+
+ ZlibDirectDecompressWrapper(ZlibDecompressor.ZlibDirectDecompressor root) {
+ this.root = root;
+ }
+
+ public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
+ root.decompress(input, output);
+ }
+
+ @Override
+ public void reset() {
+ root.reset();
+ }
+
+ @Override
+ public void end() {
+ root.end();
+ }
}
public DirectDecompressor getDirectDecompressor(
DirectCompressionType codec) {
switch (codec) {
case ZLIB:
- return new DirectDecompressWrapper
+ return new ZlibDirectDecompressWrapper
(new ZlibDecompressor.ZlibDirectDecompressor());
case ZLIB_NOHEADER:
- return new DirectDecompressWrapper
+ return new ZlibDirectDecompressWrapper
(new ZlibDecompressor.ZlibDirectDecompressor
(ZlibDecompressor.CompressionHeader.NO_HEADER, 0));
case SNAPPY:
- return new DirectDecompressWrapper
+ return new SnappyDirectDecompressWrapper
(new SnappyDecompressor.SnappyDirectDecompressor());
default:
return null;
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
new file mode 100644
index 0000000..56b9896
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A clone of Hadoop codec pool for ORC; cause it has its own codecs...
+ */
+public final class OrcCodecPool {
+ private static final Logger LOG = LoggerFactory.getLogger(OrcCodecPool.class);
+
+ /**
+ * A global decompressor pool used to save the expensive
+ * construction/destruction of (possibly native) decompression codecs.
+ */
+ private static final ConcurrentHashMap<CompressionKind, List<CompressionCodec>> POOL =
+ new ConcurrentHashMap<>();
+
+ private static final int MAX_PER_KIND = 32;
+
+ public static CompressionCodec getCodec(CompressionKind kind) {
+ if (kind == CompressionKind.NONE) return null;
+ CompressionCodec codec = null;
+ List<CompressionCodec> codecList = POOL.get(kind);
+ if (codecList != null) {
+ synchronized (codecList) {
+ if (!codecList.isEmpty()) {
+ codec = codecList.remove(codecList.size() - 1);
+ }
+ }
+ }
+ if (codec == null) {
+ codec = WriterImpl.createCodec(kind);
+ LOG.info("Got brand-new codec " + kind);
+ } else {
+ LOG.debug("Got recycled codec");
+ }
+ return codec;
+ }
+
+ public static void returnCodec(CompressionKind kind, CompressionCodec codec) {
+ if (codec == null) {
+ return;
+ }
+ assert kind != CompressionKind.NONE;
+ codec.reset();
+ List<CompressionCodec> list = POOL.get(kind);
+ if (list == null) {
+ List<CompressionCodec> newList = new ArrayList<>();
+ List<CompressionCodec> oldList = POOL.putIfAbsent(kind, newList);
+ list = (oldList == null) ? newList : oldList;
+ }
+ synchronized (list) {
+ if (list.size() < MAX_PER_KIND) {
+ list.add(codec);
+ return;
+ }
+ }
+ // We didn't add the codec to the list.
+ codec.close();
+ }
+
+ public static int getPoolSize(CompressionKind kind) {
+ if (kind == CompressionKind.NONE) return 0;
+ List<CompressionCodec> codecList = POOL.get(kind);
+ if (codecList == null) return 0;
+ synchronized (codecList) {
+ return codecList.size();
+ }
+ }
+
+ private OrcCodecPool() {
+ // prevent instantiation
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java b/java/core/src/java/org/apache/orc/impl/OrcTail.java
index b5f85fb..f2f80a5 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcTail.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java
@@ -86,10 +86,6 @@ public final class OrcTail {
return CompressionKind.valueOf(fileTail.getPostscript().getCompression().name());
}
- public CompressionCodec getCompressionCodec() {
- return WriterImpl.createCodec(getCompressionKind());
- }
-
public int getCompressionBufferSize() {
return (int) fileTail.getPostscript().getCompressionBlockSize();
}
@@ -108,9 +104,13 @@ public final class OrcTail {
public List<OrcProto.StripeStatistics> getStripeStatisticsProto() throws IOException {
if (serializedTail == null) return null;
if (metadata == null) {
- metadata = extractMetadata(serializedTail, 0,
- (int) fileTail.getPostscript().getMetadataLength(),
- getCompressionCodec(), getCompressionBufferSize());
+ CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind());
+ try {
+ metadata = extractMetadata(serializedTail, 0,
+ (int) fileTail.getPostscript().getMetadataLength(), codec, getCompressionBufferSize());
+ } finally {
+ OrcCodecPool.returnCodec(getCompressionKind(), codec);
+ }
// clear does not clear the contents but sets position to 0 and limit = capacity
serializedTail.clear();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index dd4e23b..1769182 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -56,6 +56,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
private final double paddingTolerance;
private final long defaultStripeSize;
private final CompressionKind compress;
+ private final CompressionCodec codec;
private final boolean addBlockPadding;
// the streams that make up the current stripe
@@ -89,12 +90,17 @@ public class PhysicalFsWriter implements PhysicalWriter {
compress, bufferSize);
rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
fs.getDefaultReplication(path), blockSize);
- CompressionCodec codec = WriterImpl.createCodec(compress);
+ codec = OrcCodecPool.getCodec(compress);
writer = new OutStream("metadata", bufferSize, codec,
new DirectStream(rawWriter));
protobufWriter = CodedOutputStream.newInstance(writer);
}
+ @Override
+ public CompressionCodec getCompressionCodec() {
+ return codec;
+ }
+
private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException {
this.stripeStart = rawWriter.getPos();
final long currentStripeSize = indexSize + dataSize + footerSize;
@@ -218,6 +224,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
@Override
public void close() throws IOException {
+ OrcCodecPool.returnCodec(compress, codec);
rawWriter.close();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index ac5cfb2..ad1bc1e 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -63,7 +63,6 @@ public class ReaderImpl implements Reader {
private final long maxLength;
protected final Path path;
protected final org.apache.orc.CompressionKind compressionKind;
- protected CompressionCodec codec;
protected int bufferSize;
protected OrcProto.Metadata metadata;
private List<OrcProto.StripeStatistics> stripeStats;
@@ -359,7 +358,6 @@ public class ReaderImpl implements Reader {
if (fileMetadata != null) {
this.compressionKind = fileMetadata.getCompressionKind();
this.bufferSize = fileMetadata.getCompressionBufferSize();
- this.codec = WriterImpl.createCodec(compressionKind);
this.metadataSize = fileMetadata.getMetadataSize();
this.stripeStats = fileMetadata.getStripeStats();
this.versionList = fileMetadata.getVersionList();
@@ -381,7 +379,6 @@ public class ReaderImpl implements Reader {
tail = orcTail;
}
this.compressionKind = tail.getCompressionKind();
- this.codec = tail.getCompressionCodec();
this.bufferSize = tail.getCompressionBufferSize();
this.metadataSize = tail.getMetadataSize();
this.versionList = tail.getPostScript().getVersionList();
@@ -472,16 +469,21 @@ public class ReaderImpl implements Reader {
System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
int footerSize = (int) ps.getFooterLength();
- CompressionCodec codec = WriterImpl
- .createCodec(CompressionKind.valueOf(ps.getCompression().name()));
- OrcProto.Footer footer = extractFooter(buffer,
+ CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name());
+ OrcProto.FileTail.Builder fileTailBuilder = null;
+ CompressionCodec codec = OrcCodecPool.getCodec(kind);
+ try {
+ OrcProto.Footer footer = extractFooter(buffer,
(int) (buffer.position() + ps.getMetadataLength()),
footerSize, codec, (int) ps.getCompressionBlockSize());
- OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder()
+ fileTailBuilder = OrcProto.FileTail.newBuilder()
.setPostscriptLength(psLen)
.setPostscript(ps)
.setFooter(footer)
.setFileLength(fileLength);
+ } finally {
+ OrcCodecPool.returnCodec(kind, codec);
+ }
// clear does not clear the contents but sets position to 0 and limit = capacity
buffer.clear();
return new OrcTail(fileTailBuilder.build(), buffer.slice(), modificationTime);
@@ -527,7 +529,7 @@ public class ReaderImpl implements Reader {
int psOffset = readSize - 1 - psLen;
ps = extractPostScript(buffer, path, psLen, psOffset);
bufferSize = (int) ps.getCompressionBlockSize();
- codec = WriterImpl.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
+ CompressionKind compressionKind = CompressionKind.valueOf(ps.getCompression().name());
fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps);
int footerSize = (int) ps.getFooterLength();
@@ -560,8 +562,13 @@ public class ReaderImpl implements Reader {
buffer.position(footerOffset);
ByteBuffer footerBuffer = buffer.slice();
buffer.reset();
- OrcProto.Footer footer = extractFooter(footerBuffer, 0, footerSize,
- codec, bufferSize);
+ OrcProto.Footer footer = null;
+ CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
+ try {
+ footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize);
+ } finally {
+ OrcCodecPool.returnCodec(compressionKind, codec);
+ }
fileTailBuilder.setFooter(footer);
} finally {
try {
@@ -751,7 +758,12 @@ public class ReaderImpl implements Reader {
@Override
public List<StripeStatistics> getStripeStatistics() throws IOException {
if (metadata == null) {
- metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+ CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
+ try {
+ metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+ } finally {
+ OrcCodecPool.returnCodec(compressionKind, codec);
+ }
}
if (stripeStats == null) {
stripeStats = metadata.getStripeStatsList();
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index f75d70a..b73ca1e 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -17,6 +17,8 @@
*/
package org.apache.orc.impl;
+import org.apache.orc.CompressionKind;
+
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
@@ -73,7 +75,6 @@ public class RecordReaderImpl implements RecordReader {
new ArrayList<StripeInformation>();
private OrcProto.StripeFooter stripeFooter;
private final long totalRowCount;
- private final CompressionCodec codec;
protected final TypeDescription schema;
private final List<OrcProto.Type> types;
private final int bufferSize;
@@ -98,7 +99,7 @@ public class RecordReaderImpl implements RecordReader {
private final DataReader dataReader;
private final boolean ignoreNonUtf8BloomFilter;
private final OrcFile.WriterVersion writerVersion;
-
+
/**
* Given a list of column names, find the given column and return the index.
*
@@ -205,7 +206,6 @@ public class RecordReaderImpl implements RecordReader {
}
this.schema = evolution.getReaderSchema();
this.path = fileReader.path;
- this.codec = fileReader.codec;
this.types = fileReader.types;
this.bufferSize = fileReader.bufferSize;
this.rowIndexStride = fileReader.rowIndexStride;
@@ -252,7 +252,6 @@ public class RecordReaderImpl implements RecordReader {
.build());
}
this.dataReader.open();
-
firstRow = skippedRows;
totalRowCount = rows;
Boolean skipCorrupt = options.getSkipCorruptRecords();
@@ -973,7 +972,8 @@ public class RecordReaderImpl implements RecordReader {
DiskRangeList toRead = new DiskRangeList(start, end);
bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
- createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+ createStreams(streamDescriptions, bufferChunks, null,
+ dataReader.getCompressionCodec(), bufferSize, streams);
}
/**
@@ -1055,7 +1055,7 @@ public class RecordReaderImpl implements RecordReader {
private void readPartialDataStreams(StripeInformation stripe) throws IOException {
List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
DiskRangeList toRead = planReadPartialDataStreams(streamList,
- indexes, fileIncluded, includedRowGroups, codec != null,
+ indexes, fileIncluded, includedRowGroups, dataReader.getCompressionCodec() != null,
stripeFooter.getColumnsList(), types, bufferSize, true);
if (LOG.isDebugEnabled()) {
LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
@@ -1065,7 +1065,8 @@ public class RecordReaderImpl implements RecordReader {
LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
}
- createStreams(streamList, bufferChunks, fileIncluded, codec, bufferSize, streams);
+ createStreams(streamList, bufferChunks, fileIncluded,
+ dataReader.getCompressionCodec(), bufferSize, streams);
}
/**
@@ -1317,4 +1318,8 @@ public class RecordReaderImpl implements RecordReader {
}
return result;
}
+
+ public CompressionCodec getCompressionCodec() {
+ return dataReader.getCompressionCodec();
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index aa47219..6006634 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
import org.apache.orc.DataReader;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
@@ -150,12 +151,14 @@ public class RecordReaderUtils {
private final CompressionCodec codec;
private final int bufferSize;
private final int typeCount;
+ private CompressionKind compressionKind;
private DefaultDataReader(DataReaderProperties properties) {
this.fs = properties.getFileSystem();
this.path = properties.getPath();
this.useZeroCopy = properties.getZeroCopy();
- this.codec = WriterImpl.createCodec(properties.getCompression());
+ this.compressionKind = properties.getCompression();
+ this.codec = OrcCodecPool.getCodec(compressionKind);
this.bufferSize = properties.getBufferSize();
this.typeCount = properties.getTypeCount();
if (useZeroCopy) {
@@ -277,6 +280,9 @@ public class RecordReaderUtils {
@Override
public void close() throws IOException {
+ if (codec != null) {
+ OrcCodecPool.returnCodec(compressionKind, codec);
+ }
if (pool != null) {
pool.clear();
}
@@ -307,6 +313,10 @@ public class RecordReaderUtils {
}
}
+ @Override
+ public CompressionCodec getCompressionCodec() {
+ return codec;
+ }
}
public static DataReader createDefaultDataReader(DataReaderProperties properties) {
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
index f4d828a..106c75d 100644
--- a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
@@ -29,6 +29,7 @@ public class SnappyCodec extends AircompressorCodec
private static final HadoopShims SHIMS = HadoopShims.Factory.get();
Boolean direct = null;
+ HadoopShims.DirectDecompressor decompressShim = null;
SnappyCodec() {
super(new SnappyCompressor(), new SnappyDecompressor());
@@ -47,12 +48,8 @@ public class SnappyCodec extends AircompressorCodec
public boolean isAvailable() {
if (direct == null) {
try {
- if (SHIMS.getDirectDecompressor(
- HadoopShims.DirectCompressionType.SNAPPY) != null) {
- direct = Boolean.valueOf(true);
- } else {
- direct = Boolean.valueOf(false);
- }
+ ensureShim();
+ direct = (decompressShim != null);
} catch (UnsatisfiedLinkError ule) {
direct = Boolean.valueOf(false);
}
@@ -63,9 +60,30 @@ public class SnappyCodec extends AircompressorCodec
@Override
public void directDecompress(ByteBuffer in, ByteBuffer out)
throws IOException {
- HadoopShims.DirectDecompressor decompressShim =
- SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.SNAPPY);
+ ensureShim();
decompressShim.decompress(in, out);
out.flip(); // flip for read
}
+
+ private void ensureShim() {
+ if (decompressShim == null) {
+ decompressShim = SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.SNAPPY);
+ }
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ if (decompressShim != null) {
+ decompressShim.reset();
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ if (decompressShim != null) {
+ decompressShim.end();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index ce955e3..32820e1 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -104,7 +104,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private long adjustedStripeSize;
private final int rowIndexStride;
private final CompressionKind compress;
- private final CompressionCodec codec;
private int bufferSize;
private final long blockSize;
private final TypeDescription schema;
@@ -167,7 +166,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.rowIndexStride = opts.getRowIndexStride();
this.memoryManager = opts.getMemoryManager();
buildIndex = rowIndexStride > 0;
- codec = createCodec(compress);
int numColumns = schema.getMaximumId() + 1;
if (opts.isEnforceBufferSize()) {
this.bufferSize = opts.getBufferSize();
@@ -297,18 +295,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
- CompressionCodec result = codec;
- if (codec != null) {
+ // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
+ // but at this point there's no close() for the stream.
+ CompressionCodec result = physicalWriter.getCompressionCodec();
+ if (result != null) {
switch (kind) {
case BLOOM_FILTER:
case DATA:
case DICTIONARY_DATA:
case BLOOM_FILTER_UTF8:
if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
- result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
CompressionCodec.Modifier.TEXT));
} else {
- result = codec.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
CompressionCodec.Modifier.TEXT));
}
break;
@@ -318,7 +318,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
case ROW_INDEX:
case SECONDARY:
// easily compressed using the fastest modes
- result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
CompressionCodec.Modifier.BINARY));
break;
default:
@@ -379,7 +379,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* @return are the streams compressed
*/
public boolean isCompressed() {
- return codec != null;
+ return physicalWriter.getCompressionCodec() != null;
}
/**
@@ -2952,4 +2952,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
writeFileStatistics(builder, treeWriter);
return ReaderImpl.deserializeStats(builder.getStatisticsList());
}
+
+ public CompressionCodec getCompressionCodec() {
+ return physicalWriter.getCompressionCodec();
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
index 4c16cd8..7732ee8 100644
--- a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
@@ -28,10 +28,12 @@ import org.apache.orc.CompressionCodec;
public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+ // Note: shim path does not care about levels and strategies (only used for decompression).
+ private HadoopShims.DirectDecompressor decompressShim = null;
private Boolean direct = null;
- private final int level;
- private final int strategy;
+ private int level;
+ private int strategy;
public ZlibCodec() {
level = Deflater.DEFAULT_COMPRESSION;
@@ -46,29 +48,31 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
@Override
public boolean compress(ByteBuffer in, ByteBuffer out,
ByteBuffer overflow) throws IOException {
- Deflater deflater = new Deflater(level, true);
- deflater.setStrategy(strategy);
int length = in.remaining();
- deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
- deflater.finish();
int outSize = 0;
- int offset = out.arrayOffset() + out.position();
- while (!deflater.finished() && (length > outSize)) {
- int size = deflater.deflate(out.array(), offset, out.remaining());
- out.position(size + out.position());
- outSize += size;
- offset += size;
- // if we run out of space in the out buffer, use the overflow
- if (out.remaining() == 0) {
- if (overflow == null) {
- deflater.end();
- return false;
+ Deflater deflater = new Deflater(level, true);
+ try {
+ deflater.setStrategy(strategy);
+ deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
+ deflater.finish();
+ int offset = out.arrayOffset() + out.position();
+ while (!deflater.finished() && (length > outSize)) {
+ int size = deflater.deflate(out.array(), offset, out.remaining());
+ out.position(size + out.position());
+ outSize += size;
+ offset += size;
+ // if we run out of space in the out buffer, use the overflow
+ if (out.remaining() == 0) {
+ if (overflow == null) {
+ return false;
+ }
+ out = overflow;
+ offset = out.arrayOffset() + out.position();
}
- out = overflow;
- offset = out.arrayOffset() + out.position();
}
+ } finally {
+ deflater.end();
}
- deflater.end();
return length > outSize;
}
@@ -81,21 +85,24 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
}
Inflater inflater = new Inflater(true);
- inflater.setInput(in.array(), in.arrayOffset() + in.position(),
- in.remaining());
- while (!(inflater.finished() || inflater.needsDictionary() ||
- inflater.needsInput())) {
- try {
- int count = inflater.inflate(out.array(),
- out.arrayOffset() + out.position(),
- out.remaining());
- out.position(count + out.position());
- } catch (DataFormatException dfe) {
- throw new IOException("Bad compression data", dfe);
+ try {
+ inflater.setInput(in.array(), in.arrayOffset() + in.position(),
+ in.remaining());
+ while (!(inflater.finished() || inflater.needsDictionary() ||
+ inflater.needsInput())) {
+ try {
+ int count = inflater.inflate(out.array(),
+ out.arrayOffset() + out.position(),
+ out.remaining());
+ out.position(count + out.position());
+ } catch (DataFormatException dfe) {
+ throw new IOException("Bad compression data", dfe);
+ }
}
+ out.flip();
+ } finally {
+ inflater.end();
}
- out.flip();
- inflater.end();
in.position(in.limit());
}
@@ -104,12 +111,8 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
if (direct == null) {
// see nowrap option in new Inflater(boolean) which disables zlib headers
try {
- if (SHIMS.getDirectDecompressor(
- HadoopShims.DirectCompressionType.ZLIB_NOHEADER) != null) {
- direct = Boolean.valueOf(true);
- } else {
- direct = Boolean.valueOf(false);
- }
+ ensureShim();
+ direct = (decompressShim != null);
} catch (UnsatisfiedLinkError ule) {
direct = Boolean.valueOf(false);
}
@@ -117,11 +120,16 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
return direct.booleanValue();
}
+ private void ensureShim() {
+ if (decompressShim == null) {
+ decompressShim = SHIMS.getDirectDecompressor(
+ HadoopShims.DirectCompressionType.ZLIB_NOHEADER);
+ }
+ }
+
@Override
- public void directDecompress(ByteBuffer in, ByteBuffer out)
- throws IOException {
- HadoopShims.DirectDecompressor decompressShim =
- SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.ZLIB_NOHEADER);
+ public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ ensureShim();
decompressShim.decompress(in, out);
out.flip(); // flip for read
}
@@ -163,4 +171,20 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
}
return new ZlibCodec(l, s);
}
+
+ @Override
+ public void reset() {
+ level = Deflater.DEFAULT_COMPRESSION;
+ strategy = Deflater.DEFAULT_STRATEGY;
+ if (decompressShim != null) {
+ decompressShim.reset();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (decompressShim != null) {
+ decompressShim.end();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index b7fa8ee..f975b73 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -18,6 +18,12 @@
package org.apache.orc;
+import org.apache.orc.impl.OrcCodecPool;
+
+import org.apache.orc.impl.WriterImpl;
+
+import org.apache.orc.OrcFile.WriterOptions;
+
import com.google.common.collect.Lists;
import org.apache.orc.impl.ReaderImpl;
@@ -70,11 +76,7 @@ import java.util.Map;
import java.util.Random;
import static junit.framework.TestCase.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* Tests for the vectorized reader and writer for ORC files.
@@ -1703,16 +1705,8 @@ public class TestVectorOrcFile {
.compress(CompressionKind.SNAPPY)
.bufferSize(100));
VectorizedRowBatch batch = schema.createRowBatch();
- Random rand = new Random(12);
- batch.size = 1000;
- for(int b=0; b < 10; ++b) {
- for (int r=0; r < 1000; ++r) {
- ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
- ((BytesColumnVector) batch.cols[1]).setVal(r,
- Integer.toHexString(rand.nextInt()).getBytes());
- }
- writer.addRowBatch(batch);
- }
+ Random rand;
+ writeRandomIntBytesBatches(writer, batch, 10, 1000);
writer.close();
Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs));
@@ -1834,6 +1828,92 @@ public class TestVectorOrcFile {
}
/**
+ * Read and write a file; verify codec usage.
+ * @throws Exception
+ */
+ @Test
+ public void testCodecPool() throws Exception {
+ TypeDescription schema = createInnerSchema();
+ VectorizedRowBatch batch = schema.createRowBatch();
+ WriterOptions opts = OrcFile.writerOptions(conf)
+ .setSchema(schema).stripeSize(1000).bufferSize(100);
+
+ CompressionCodec snappyCodec, zlibCodec;
+ snappyCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
+ assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
+ Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+ Assert.assertEquals(CompressionKind.SNAPPY, reader.getCompressionKind());
+ CompressionCodec codec = readBatchesAndGetCodec(reader, 10, 1000);
+ assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
+ assertSame(snappyCodec, codec);
+
+ reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+ Assert.assertEquals(CompressionKind.SNAPPY, reader.getCompressionKind());
+ codec = readBatchesAndGetCodec(reader, 10, 1000);
+ assertSame(snappyCodec, codec);
+ assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
+
+ zlibCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZLIB), batch);
+ assertNotSame(snappyCodec, zlibCodec);
+ assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZLIB));
+ codec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZLIB), batch);
+ assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZLIB));
+ assertSame(zlibCodec, codec);
+
+ assertSame(snappyCodec, OrcCodecPool.getCodec(CompressionKind.SNAPPY));
+ CompressionCodec snappyCodec2 = writeBatchesAndGetCodec(
+ 10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
+ assertNotSame(snappyCodec, snappyCodec2);
+ OrcCodecPool.returnCodec(CompressionKind.SNAPPY, snappyCodec);
+ reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+ Assert.assertEquals(CompressionKind.SNAPPY, reader.getCompressionKind());
+ codec = readBatchesAndGetCodec(reader, 10, 1000);
+ assertEquals(2, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
+ assertTrue(snappyCodec == codec || snappyCodec2 == codec);
+ }
+
+ private CompressionCodec writeBatchesAndGetCodec(int count, int size, WriterOptions opts,
+ VectorizedRowBatch batch) throws IOException {
+ fs.delete(testFilePath, false);
+ Writer writer = OrcFile.createWriter(testFilePath, opts);
+ writeRandomIntBytesBatches(writer, batch, count, size);
+ CompressionCodec codec = ((WriterImpl)writer).getCompressionCodec();
+ writer.close();
+ return codec;
+ }
+
+ private CompressionCodec readBatchesAndGetCodec(
+ Reader reader, int count, int size) throws IOException {
+ RecordReader rows = reader.rows();
+ VectorizedRowBatch batch = reader.getSchema().createRowBatch(size);
+ for (int b = 0; b < count; ++b) {
+ rows.nextBatch(batch);
+ }
+ CompressionCodec codec = ((RecordReaderImpl)rows).getCompressionCodec();
+ rows.close();
+ return codec;
+ }
+
+ private void readRandomBatches(
+ Reader reader, RecordReader rows, int count, int size) throws IOException {
+
+ }
+
+ private void writeRandomIntBytesBatches(
+ Writer writer, VectorizedRowBatch batch, int count, int size) throws IOException {
+ Random rand = new Random(12);
+ batch.size = size;
+ for(int b=0; b < count; ++b) {
+ for (int r=0; r < size; ++r) {
+ ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
+ ((BytesColumnVector) batch.cols[1]).setVal(r,
+ Integer.toHexString(rand.nextInt()).getBytes());
+ }
+ writer.addRowBatch(batch);
+ }
+ }
+
+ /**
* Read and write a randomly generated snappy file.
* @throws Exception
*/