You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by pr...@apache.org on 2017/03/23 22:58:12 UTC

orc git commit: ORC-166 : add codec pool to ORC; make sure end is called on underlying codecs (Sergey Shelukhin)

Repository: orc
Updated Branches:
  refs/heads/master a393bca04 -> 37a76d529


ORC-166 : add codec pool to ORC; make sure end is called on underlying codecs (Sergey Shelukhin)

Fixes #102

Signed-off-by: Prasanth Jayachandran <pr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/37a76d52
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/37a76d52
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/37a76d52

Branch: refs/heads/master
Commit: 37a76d52943fb660a81ec39b1d62cd9fa9379c74
Parents: a393bca
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Mar 20 19:13:06 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Mar 23 15:56:55 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/orc/CompressionCodec.java   |   5 +
 .../src/java/org/apache/orc/DataReader.java     |   4 +
 .../src/java/org/apache/orc/PhysicalWriter.java |   2 +
 .../org/apache/orc/impl/AircompressorCodec.java |  10 ++
 .../java/org/apache/orc/impl/HadoopShims.java   |   2 +
 .../org/apache/orc/impl/HadoopShimsCurrent.java |  48 ++++++--
 .../java/org/apache/orc/impl/OrcCodecPool.java  |  99 +++++++++++++++++
 .../src/java/org/apache/orc/impl/OrcTail.java   |  14 +--
 .../org/apache/orc/impl/PhysicalFsWriter.java   |   9 +-
 .../java/org/apache/orc/impl/ReaderImpl.java    |  34 ++++--
 .../org/apache/orc/impl/RecordReaderImpl.java   |  19 ++--
 .../org/apache/orc/impl/RecordReaderUtils.java  |  12 +-
 .../java/org/apache/orc/impl/SnappyCodec.java   |  34 ++++--
 .../java/org/apache/orc/impl/WriterImpl.java    |  20 ++--
 .../src/java/org/apache/orc/impl/ZlibCodec.java | 110 +++++++++++--------
 .../test/org/apache/orc/TestVectorOrcFile.java  | 110 ++++++++++++++++---
 16 files changed, 423 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/CompressionCodec.java b/java/core/src/java/org/apache/orc/CompressionCodec.java
index d569bb6..dd517b3 100644
--- a/java/core/src/java/org/apache/orc/CompressionCodec.java
+++ b/java/core/src/java/org/apache/orc/CompressionCodec.java
@@ -64,4 +64,9 @@ public interface CompressionCodec {
    */
   CompressionCodec modify(EnumSet<Modifier> modifiers);
 
+  /** Resets the codec, preparing it for reuse. */
+  void reset();
+
+  /** Closes the codec, releasing the resources. */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/DataReader.java b/java/core/src/java/org/apache/orc/DataReader.java
index ae11bf3..3155862 100644
--- a/java/core/src/java/org/apache/orc/DataReader.java
+++ b/java/core/src/java/org/apache/orc/DataReader.java
@@ -78,4 +78,8 @@ public interface DataReader extends AutoCloseable, Cloneable {
 
   @Override
   public void close() throws IOException;
+
+  /** Returns the compression codec used by this datareader. 
+   * @return */
+  CompressionCodec getCompressionCodec();
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java
index 9953d41..7589aa5 100644
--- a/java/core/src/java/org/apache/orc/PhysicalWriter.java
+++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java
@@ -130,4 +130,6 @@ public interface PhysicalWriter {
                        OrcProto.StripeInformation.Builder dirEntry
                        ) throws IOException;
 
+  /** Gets a compression codec used by this writer. */
+  CompressionCodec getCompressionCodec();
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
index a304730..39d678c 100644
--- a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
@@ -99,4 +99,14 @@ public class AircompressorCodec implements CompressionCodec {
     // snappy allows no modifications
     return this;
   }
+
+  @Override
+  public void reset() {
+    // Nothing to do.
+  }
+
+  @Override
+  public void close() {
+    // Nothing to do.
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims.java b/java/core/src/java/org/apache/orc/impl/HadoopShims.java
index 8d505d2..3047376 100644
--- a/java/core/src/java/org/apache/orc/impl/HadoopShims.java
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShims.java
@@ -38,6 +38,8 @@ public interface HadoopShims {
 
   interface DirectDecompressor {
     void decompress(ByteBuffer var1, ByteBuffer var2) throws IOException;
+    void reset();
+    void end();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
index 54a4824..9f40272 100644
--- a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
@@ -21,6 +21,7 @@ package org.apache.orc.impl;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
 
 import java.io.DataInputStream;
@@ -33,31 +34,62 @@ import java.nio.ByteBuffer;
  */
 public class HadoopShimsCurrent implements HadoopShims {
 
-  private static class DirectDecompressWrapper implements DirectDecompressor {
-    private final org.apache.hadoop.io.compress.DirectDecompressor root;
+  private static class SnappyDirectDecompressWrapper implements DirectDecompressor {
+    private final SnappyDirectDecompressor root;
 
-    DirectDecompressWrapper(org.apache.hadoop.io.compress.DirectDecompressor root) {
+    SnappyDirectDecompressWrapper(SnappyDirectDecompressor root) {
       this.root = root;
     }
 
-    public void decompress(ByteBuffer input,
-                           ByteBuffer output) throws IOException {
+    public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
       root.decompress(input, output);
     }
+
+    @Override
+    public void reset() {
+      root.reset();
+    }
+
+    @Override
+    public void end() {
+      root.end();
+    }
+  }
+
+  private static class ZlibDirectDecompressWrapper implements DirectDecompressor {
+    private final ZlibDecompressor.ZlibDirectDecompressor root;
+
+    ZlibDirectDecompressWrapper(ZlibDecompressor.ZlibDirectDecompressor root) {
+      this.root = root;
+    }
+
+    public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
+      root.decompress(input, output);
+    }
+
+    @Override
+    public void reset() {
+      root.reset();
+    }
+
+    @Override
+    public void end() {
+      root.end();
+    }
   }
 
   public DirectDecompressor getDirectDecompressor(
       DirectCompressionType codec) {
     switch (codec) {
       case ZLIB:
-        return new DirectDecompressWrapper
+        return new ZlibDirectDecompressWrapper
             (new ZlibDecompressor.ZlibDirectDecompressor());
       case ZLIB_NOHEADER:
-        return new DirectDecompressWrapper
+        return new ZlibDirectDecompressWrapper
             (new ZlibDecompressor.ZlibDirectDecompressor
                 (ZlibDecompressor.CompressionHeader.NO_HEADER, 0));
       case SNAPPY:
-        return new DirectDecompressWrapper
+        return new SnappyDirectDecompressWrapper
             (new SnappyDecompressor.SnappyDirectDecompressor());
       default:
         return null;

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
new file mode 100644
index 0000000..56b9896
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A clone of Hadoop codec pool for ORC; cause it has its own codecs...
+ */
+public final class OrcCodecPool {
+  private static final Logger LOG = LoggerFactory.getLogger(OrcCodecPool.class);
+
+  /**
+   * A global decompressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final ConcurrentHashMap<CompressionKind, List<CompressionCodec>> POOL =
+      new ConcurrentHashMap<>();
+
+  private static final int MAX_PER_KIND = 32;
+
+  public static CompressionCodec getCodec(CompressionKind kind) {
+    if (kind == CompressionKind.NONE) return null;
+    CompressionCodec codec = null;
+    List<CompressionCodec> codecList = POOL.get(kind);
+    if (codecList != null) {
+      synchronized (codecList) {
+        if (!codecList.isEmpty()) {
+          codec = codecList.remove(codecList.size() - 1);
+        }
+      }
+    }
+    if (codec == null) {
+      codec = WriterImpl.createCodec(kind);
+      LOG.info("Got brand-new codec " + kind);
+    } else {
+      LOG.debug("Got recycled codec");
+    }
+    return codec;
+  }
+
+  public static void returnCodec(CompressionKind kind, CompressionCodec codec) {
+    if (codec == null) {
+      return;
+    }
+    assert kind != CompressionKind.NONE;
+    codec.reset();
+    List<CompressionCodec> list = POOL.get(kind);
+    if (list == null) {
+      List<CompressionCodec> newList = new ArrayList<>();
+      List<CompressionCodec> oldList = POOL.putIfAbsent(kind, newList);
+      list = (oldList == null) ? newList : oldList;
+    }
+    synchronized (list) {
+      if (list.size() < MAX_PER_KIND) {
+        list.add(codec);
+        return;
+      }
+    }
+    // We didn't add the codec to the list.
+    codec.close();
+  }
+
+  public static int getPoolSize(CompressionKind kind) {
+    if (kind == CompressionKind.NONE) return 0;
+    List<CompressionCodec> codecList = POOL.get(kind);
+    if (codecList == null) return 0;
+    synchronized (codecList) {
+      return codecList.size();
+    }
+  }
+
+  private OrcCodecPool() {
+    // prevent instantiation
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java b/java/core/src/java/org/apache/orc/impl/OrcTail.java
index b5f85fb..f2f80a5 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcTail.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java
@@ -86,10 +86,6 @@ public final class OrcTail {
     return CompressionKind.valueOf(fileTail.getPostscript().getCompression().name());
   }
 
-  public CompressionCodec getCompressionCodec() {
-    return WriterImpl.createCodec(getCompressionKind());
-  }
-
   public int getCompressionBufferSize() {
     return (int) fileTail.getPostscript().getCompressionBlockSize();
   }
@@ -108,9 +104,13 @@ public final class OrcTail {
   public List<OrcProto.StripeStatistics> getStripeStatisticsProto() throws IOException {
     if (serializedTail == null) return null;
     if (metadata == null) {
-      metadata = extractMetadata(serializedTail, 0,
-          (int) fileTail.getPostscript().getMetadataLength(),
-          getCompressionCodec(), getCompressionBufferSize());
+      CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind());
+      try {
+        metadata = extractMetadata(serializedTail, 0,
+            (int) fileTail.getPostscript().getMetadataLength(), codec, getCompressionBufferSize());
+      } finally {
+        OrcCodecPool.returnCodec(getCompressionKind(), codec);
+      }
       // clear does not clear the contents but sets position to 0 and limit = capacity
       serializedTail.clear();
     }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index dd4e23b..1769182 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -56,6 +56,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
   private final double paddingTolerance;
   private final long defaultStripeSize;
   private final CompressionKind compress;
+  private final CompressionCodec codec;
   private final boolean addBlockPadding;
 
   // the streams that make up the current stripe
@@ -89,12 +90,17 @@ public class PhysicalFsWriter implements PhysicalWriter {
         compress, bufferSize);
     rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
         fs.getDefaultReplication(path), blockSize);
-    CompressionCodec codec = WriterImpl.createCodec(compress);
+    codec = OrcCodecPool.getCodec(compress);
     writer = new OutStream("metadata", bufferSize, codec,
         new DirectStream(rawWriter));
     protobufWriter = CodedOutputStream.newInstance(writer);
   }
 
+  @Override
+  public CompressionCodec getCompressionCodec() {
+    return codec;
+  }
+
   private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException {
     this.stripeStart = rawWriter.getPos();
     final long currentStripeSize = indexSize + dataSize + footerSize;
@@ -218,6 +224,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
 
   @Override
   public void close() throws IOException {
+    OrcCodecPool.returnCodec(compress, codec);
     rawWriter.close();
   }
 

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index ac5cfb2..ad1bc1e 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -63,7 +63,6 @@ public class ReaderImpl implements Reader {
   private final long maxLength;
   protected final Path path;
   protected final org.apache.orc.CompressionKind compressionKind;
-  protected CompressionCodec codec;
   protected int bufferSize;
   protected OrcProto.Metadata metadata;
   private List<OrcProto.StripeStatistics> stripeStats;
@@ -359,7 +358,6 @@ public class ReaderImpl implements Reader {
     if (fileMetadata != null) {
       this.compressionKind = fileMetadata.getCompressionKind();
       this.bufferSize = fileMetadata.getCompressionBufferSize();
-      this.codec = WriterImpl.createCodec(compressionKind);
       this.metadataSize = fileMetadata.getMetadataSize();
       this.stripeStats = fileMetadata.getStripeStats();
       this.versionList = fileMetadata.getVersionList();
@@ -381,7 +379,6 @@ public class ReaderImpl implements Reader {
         tail = orcTail;
       }
       this.compressionKind = tail.getCompressionKind();
-      this.codec = tail.getCompressionCodec();
       this.bufferSize = tail.getCompressionBufferSize();
       this.metadataSize = tail.getMetadataSize();
       this.versionList = tail.getPostScript().getVersionList();
@@ -472,16 +469,21 @@ public class ReaderImpl implements Reader {
     System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
     OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
     int footerSize = (int) ps.getFooterLength();
-    CompressionCodec codec = WriterImpl
-        .createCodec(CompressionKind.valueOf(ps.getCompression().name()));
-    OrcProto.Footer footer = extractFooter(buffer,
+    CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name());
+    OrcProto.FileTail.Builder fileTailBuilder = null;
+    CompressionCodec codec = OrcCodecPool.getCodec(kind);
+    try {
+      OrcProto.Footer footer = extractFooter(buffer,
         (int) (buffer.position() + ps.getMetadataLength()),
         footerSize, codec, (int) ps.getCompressionBlockSize());
-    OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder()
+      fileTailBuilder = OrcProto.FileTail.newBuilder()
         .setPostscriptLength(psLen)
         .setPostscript(ps)
         .setFooter(footer)
         .setFileLength(fileLength);
+    } finally {
+      OrcCodecPool.returnCodec(kind, codec);
+    }
     // clear does not clear the contents but sets position to 0 and limit = capacity
     buffer.clear();
     return new OrcTail(fileTailBuilder.build(), buffer.slice(), modificationTime);
@@ -527,7 +529,7 @@ public class ReaderImpl implements Reader {
       int psOffset = readSize - 1 - psLen;
       ps = extractPostScript(buffer, path, psLen, psOffset);
       bufferSize = (int) ps.getCompressionBlockSize();
-      codec = WriterImpl.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
+      CompressionKind compressionKind = CompressionKind.valueOf(ps.getCompression().name());
       fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps);
 
       int footerSize = (int) ps.getFooterLength();
@@ -560,8 +562,13 @@ public class ReaderImpl implements Reader {
       buffer.position(footerOffset);
       ByteBuffer footerBuffer = buffer.slice();
       buffer.reset();
-      OrcProto.Footer footer = extractFooter(footerBuffer, 0, footerSize,
-          codec, bufferSize);
+      OrcProto.Footer footer = null;
+      CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
+      try {
+        footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize);
+      } finally {
+        OrcCodecPool.returnCodec(compressionKind, codec);
+      }
       fileTailBuilder.setFooter(footer);
     } finally {
       try {
@@ -751,7 +758,12 @@ public class ReaderImpl implements Reader {
   @Override
   public List<StripeStatistics> getStripeStatistics() throws IOException {
     if (metadata == null) {
-      metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+      CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
+      try {
+        metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+      } finally {
+        OrcCodecPool.returnCodec(compressionKind, codec);
+      }
     }
     if (stripeStats == null) {
       stripeStats = metadata.getStripeStatsList();

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index f75d70a..b73ca1e 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.orc.impl;
 
+import org.apache.orc.CompressionKind;
+
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Date;
@@ -73,7 +75,6 @@ public class RecordReaderImpl implements RecordReader {
       new ArrayList<StripeInformation>();
   private OrcProto.StripeFooter stripeFooter;
   private final long totalRowCount;
-  private final CompressionCodec codec;
   protected final TypeDescription schema;
   private final List<OrcProto.Type> types;
   private final int bufferSize;
@@ -98,7 +99,7 @@ public class RecordReaderImpl implements RecordReader {
   private final DataReader dataReader;
   private final boolean ignoreNonUtf8BloomFilter;
   private final OrcFile.WriterVersion writerVersion;
-
+  
   /**
    * Given a list of column names, find the given column and return the index.
    *
@@ -205,7 +206,6 @@ public class RecordReaderImpl implements RecordReader {
     }
     this.schema = evolution.getReaderSchema();
     this.path = fileReader.path;
-    this.codec = fileReader.codec;
     this.types = fileReader.types;
     this.bufferSize = fileReader.bufferSize;
     this.rowIndexStride = fileReader.rowIndexStride;
@@ -252,7 +252,6 @@ public class RecordReaderImpl implements RecordReader {
               .build());
     }
     this.dataReader.open();
-
     firstRow = skippedRows;
     totalRowCount = rows;
     Boolean skipCorrupt = options.getSkipCorruptRecords();
@@ -973,7 +972,8 @@ public class RecordReaderImpl implements RecordReader {
     DiskRangeList toRead = new DiskRangeList(start, end);
     bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
     List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
-    createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+    createStreams(streamDescriptions, bufferChunks, null,
+        dataReader.getCompressionCodec(), bufferSize, streams);
   }
 
   /**
@@ -1055,7 +1055,7 @@ public class RecordReaderImpl implements RecordReader {
   private void readPartialDataStreams(StripeInformation stripe) throws IOException {
     List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
     DiskRangeList toRead = planReadPartialDataStreams(streamList,
-        indexes, fileIncluded, includedRowGroups, codec != null,
+        indexes, fileIncluded, includedRowGroups, dataReader.getCompressionCodec() != null,
         stripeFooter.getColumnsList(), types, bufferSize, true);
     if (LOG.isDebugEnabled()) {
       LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
@@ -1065,7 +1065,8 @@ public class RecordReaderImpl implements RecordReader {
       LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
     }
 
-    createStreams(streamList, bufferChunks, fileIncluded, codec, bufferSize, streams);
+    createStreams(streamList, bufferChunks, fileIncluded,
+        dataReader.getCompressionCodec(), bufferSize, streams);
   }
 
   /**
@@ -1317,4 +1318,8 @@ public class RecordReaderImpl implements RecordReader {
     }
     return result;
   }
+
+  public CompressionCodec getCompressionCodec() {
+    return dataReader.getCompressionCodec();
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index aa47219..6006634 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
 import org.apache.orc.DataReader;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
@@ -150,12 +151,14 @@ public class RecordReaderUtils {
     private final CompressionCodec codec;
     private final int bufferSize;
     private final int typeCount;
+    private CompressionKind compressionKind;
 
     private DefaultDataReader(DataReaderProperties properties) {
       this.fs = properties.getFileSystem();
       this.path = properties.getPath();
       this.useZeroCopy = properties.getZeroCopy();
-      this.codec = WriterImpl.createCodec(properties.getCompression());
+      this.compressionKind = properties.getCompression();
+      this.codec = OrcCodecPool.getCodec(compressionKind);
       this.bufferSize = properties.getBufferSize();
       this.typeCount = properties.getTypeCount();
       if (useZeroCopy) {
@@ -277,6 +280,9 @@ public class RecordReaderUtils {
 
     @Override
     public void close() throws IOException {
+      if (codec != null) {
+        OrcCodecPool.returnCodec(compressionKind, codec);
+      }
       if (pool != null) {
         pool.clear();
       }
@@ -307,6 +313,10 @@ public class RecordReaderUtils {
       }
     }
 
+    @Override
+    public CompressionCodec getCompressionCodec() {
+      return codec;
+    }
   }
 
   public static DataReader createDefaultDataReader(DataReaderProperties properties) {

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
index f4d828a..106c75d 100644
--- a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
@@ -29,6 +29,7 @@ public class SnappyCodec extends AircompressorCodec
   private static final HadoopShims SHIMS = HadoopShims.Factory.get();
 
   Boolean direct = null;
+  HadoopShims.DirectDecompressor decompressShim = null;
 
   SnappyCodec() {
     super(new SnappyCompressor(), new SnappyDecompressor());
@@ -47,12 +48,8 @@ public class SnappyCodec extends AircompressorCodec
   public boolean isAvailable() {
     if (direct == null) {
       try {
-        if (SHIMS.getDirectDecompressor(
-            HadoopShims.DirectCompressionType.SNAPPY) != null) {
-          direct = Boolean.valueOf(true);
-        } else {
-          direct = Boolean.valueOf(false);
-        }
+        ensureShim();
+        direct = (decompressShim != null);
       } catch (UnsatisfiedLinkError ule) {
         direct = Boolean.valueOf(false);
       }
@@ -63,9 +60,30 @@ public class SnappyCodec extends AircompressorCodec
   @Override
   public void directDecompress(ByteBuffer in, ByteBuffer out)
       throws IOException {
-    HadoopShims.DirectDecompressor decompressShim =
-        SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.SNAPPY);
+    ensureShim();
     decompressShim.decompress(in, out);
     out.flip(); // flip for read
   }
+
+  private void ensureShim() {
+    if (decompressShim == null) {
+      decompressShim = SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.SNAPPY);
+    }
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    if (decompressShim != null) {
+      decompressShim.reset();
+    }
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if (decompressShim != null) {
+      decompressShim.end();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index ce955e3..32820e1 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -104,7 +104,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private long adjustedStripeSize;
   private final int rowIndexStride;
   private final CompressionKind compress;
-  private final CompressionCodec codec;
   private int bufferSize;
   private final long blockSize;
   private final TypeDescription schema;
@@ -167,7 +166,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     this.rowIndexStride = opts.getRowIndexStride();
     this.memoryManager = opts.getMemoryManager();
     buildIndex = rowIndexStride > 0;
-    codec = createCodec(compress);
     int numColumns = schema.getMaximumId() + 1;
     if (opts.isEnforceBufferSize()) {
       this.bufferSize = opts.getBufferSize();
@@ -297,18 +295,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
-    CompressionCodec result = codec;
-    if (codec != null) {
+    // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
+    //       but at this point there's no close() for the stream.
+    CompressionCodec result = physicalWriter.getCompressionCodec();
+    if (result != null) {
       switch (kind) {
         case BLOOM_FILTER:
         case DATA:
         case DICTIONARY_DATA:
         case BLOOM_FILTER_UTF8:
           if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
-            result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
+            result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
                 CompressionCodec.Modifier.TEXT));
           } else {
-            result = codec.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+            result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
                 CompressionCodec.Modifier.TEXT));
           }
           break;
@@ -318,7 +318,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         case ROW_INDEX:
         case SECONDARY:
           // easily compressed using the fastest modes
-          result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
+          result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
               CompressionCodec.Modifier.BINARY));
           break;
         default:
@@ -379,7 +379,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * @return are the streams compressed
      */
     public boolean isCompressed() {
-      return codec != null;
+      return physicalWriter.getCompressionCodec() != null;
     }
 
     /**
@@ -2952,4 +2952,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     writeFileStatistics(builder, treeWriter);
     return ReaderImpl.deserializeStats(builder.getStatisticsList());
   }
+
+  public CompressionCodec getCompressionCodec() {
+    return physicalWriter.getCompressionCodec();
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
index 4c16cd8..7732ee8 100644
--- a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
@@ -28,10 +28,12 @@ import org.apache.orc.CompressionCodec;
 
 public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
   private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+  // Note: shim path does not care about levels and strategies (only used for decompression).
+  private HadoopShims.DirectDecompressor decompressShim = null;
   private Boolean direct = null;
 
-  private final int level;
-  private final int strategy;
+  private int level;
+  private int strategy;
 
   public ZlibCodec() {
     level = Deflater.DEFAULT_COMPRESSION;
@@ -46,29 +48,31 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
   @Override
   public boolean compress(ByteBuffer in, ByteBuffer out,
                           ByteBuffer overflow) throws IOException {
-    Deflater deflater = new Deflater(level, true);
-    deflater.setStrategy(strategy);
     int length = in.remaining();
-    deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
-    deflater.finish();
     int outSize = 0;
-    int offset = out.arrayOffset() + out.position();
-    while (!deflater.finished() && (length > outSize)) {
-      int size = deflater.deflate(out.array(), offset, out.remaining());
-      out.position(size + out.position());
-      outSize += size;
-      offset += size;
-      // if we run out of space in the out buffer, use the overflow
-      if (out.remaining() == 0) {
-        if (overflow == null) {
-          deflater.end();
-          return false;
+    Deflater deflater = new Deflater(level, true);
+    try {
+      deflater.setStrategy(strategy);
+      deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
+      deflater.finish();
+      int offset = out.arrayOffset() + out.position();
+      while (!deflater.finished() && (length > outSize)) {
+        int size = deflater.deflate(out.array(), offset, out.remaining());
+        out.position(size + out.position());
+        outSize += size;
+        offset += size;
+        // if we run out of space in the out buffer, use the overflow
+        if (out.remaining() == 0) {
+          if (overflow == null) {
+            return false;
+          }
+          out = overflow;
+          offset = out.arrayOffset() + out.position();
         }
-        out = overflow;
-        offset = out.arrayOffset() + out.position();
       }
+    } finally {
+      deflater.end();
     }
-    deflater.end();
     return length > outSize;
   }
 
@@ -81,21 +85,24 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
     }
 
     Inflater inflater = new Inflater(true);
-    inflater.setInput(in.array(), in.arrayOffset() + in.position(),
-                      in.remaining());
-    while (!(inflater.finished() || inflater.needsDictionary() ||
-             inflater.needsInput())) {
-      try {
-        int count = inflater.inflate(out.array(),
-                                     out.arrayOffset() + out.position(),
-                                     out.remaining());
-        out.position(count + out.position());
-      } catch (DataFormatException dfe) {
-        throw new IOException("Bad compression data", dfe);
+    try {
+      inflater.setInput(in.array(), in.arrayOffset() + in.position(),
+                        in.remaining());
+      while (!(inflater.finished() || inflater.needsDictionary() ||
+               inflater.needsInput())) {
+        try {
+          int count = inflater.inflate(out.array(),
+                                       out.arrayOffset() + out.position(),
+                                       out.remaining());
+          out.position(count + out.position());
+        } catch (DataFormatException dfe) {
+          throw new IOException("Bad compression data", dfe);
+        }
       }
+      out.flip();
+    } finally {
+      inflater.end();
     }
-    out.flip();
-    inflater.end();
     in.position(in.limit());
   }
 
@@ -104,12 +111,8 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
     if (direct == null) {
       // see nowrap option in new Inflater(boolean) which disables zlib headers
       try {
-        if (SHIMS.getDirectDecompressor(
-            HadoopShims.DirectCompressionType.ZLIB_NOHEADER) != null) {
-          direct = Boolean.valueOf(true);
-        } else {
-          direct = Boolean.valueOf(false);
-        }
+        ensureShim();
+        direct = (decompressShim != null);
       } catch (UnsatisfiedLinkError ule) {
         direct = Boolean.valueOf(false);
       }
@@ -117,11 +120,16 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
     return direct.booleanValue();
   }
 
+  private void ensureShim() {
+    if (decompressShim == null) {
+      decompressShim = SHIMS.getDirectDecompressor(
+          HadoopShims.DirectCompressionType.ZLIB_NOHEADER);
+    }
+  }
+
   @Override
-  public void directDecompress(ByteBuffer in, ByteBuffer out)
-      throws IOException {
-    HadoopShims.DirectDecompressor decompressShim =
-        SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.ZLIB_NOHEADER);
+  public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException {
+    ensureShim();
     decompressShim.decompress(in, out);
     out.flip(); // flip for read
   }
@@ -163,4 +171,20 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
     }
     return new ZlibCodec(l, s);
   }
+
+  @Override
+  public void reset() {
+    level = Deflater.DEFAULT_COMPRESSION;
+    strategy = Deflater.DEFAULT_STRATEGY;
+    if (decompressShim != null) {
+      decompressShim.reset();
+    }
+  }
+
+  @Override
+  public void close() {
+    if (decompressShim != null) {
+      decompressShim.end();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/37a76d52/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index b7fa8ee..f975b73 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -18,6 +18,12 @@
 
 package org.apache.orc;
 
+import org.apache.orc.impl.OrcCodecPool;
+
+import org.apache.orc.impl.WriterImpl;
+
+import org.apache.orc.OrcFile.WriterOptions;
+
 import com.google.common.collect.Lists;
 
 import org.apache.orc.impl.ReaderImpl;
@@ -70,11 +76,7 @@ import java.util.Map;
 import java.util.Random;
 
 import static junit.framework.TestCase.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 /**
  * Tests for the vectorized reader and writer for ORC files.
@@ -1703,16 +1705,8 @@ public class TestVectorOrcFile {
                                          .compress(CompressionKind.SNAPPY)
                                          .bufferSize(100));
     VectorizedRowBatch batch = schema.createRowBatch();
-    Random rand = new Random(12);
-    batch.size = 1000;
-    for(int b=0; b < 10; ++b) {
-      for (int r=0; r < 1000; ++r) {
-        ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
-        ((BytesColumnVector) batch.cols[1]).setVal(r,
-            Integer.toHexString(rand.nextInt()).getBytes());
-      }
-      writer.addRowBatch(batch);
-    }
+    Random rand;
+    writeRandomIntBytesBatches(writer, batch, 10, 1000);
     writer.close();
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
@@ -1834,6 +1828,92 @@ public class TestVectorOrcFile {
   }
 
   /**
+   * Read and write a file; verify codec usage.
+   * @throws Exception
+   */
+  @Test
+  public void testCodecPool() throws Exception {
+    TypeDescription schema = createInnerSchema();
+    VectorizedRowBatch batch = schema.createRowBatch();
+    WriterOptions opts = OrcFile.writerOptions(conf)
+        .setSchema(schema).stripeSize(1000).bufferSize(100);
+
+    CompressionCodec snappyCodec, zlibCodec;
+    snappyCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
+    assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+    Assert.assertEquals(CompressionKind.SNAPPY, reader.getCompressionKind());
+    CompressionCodec codec = readBatchesAndGetCodec(reader, 10, 1000);
+    assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
+    assertSame(snappyCodec, codec);
+
+    reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+    Assert.assertEquals(CompressionKind.SNAPPY, reader.getCompressionKind());
+    codec = readBatchesAndGetCodec(reader, 10, 1000);
+    assertSame(snappyCodec, codec);
+    assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
+
+    zlibCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZLIB), batch);
+    assertNotSame(snappyCodec, zlibCodec);
+    assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZLIB));
+    codec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZLIB), batch);
+    assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZLIB));
+    assertSame(zlibCodec, codec);
+
+    assertSame(snappyCodec, OrcCodecPool.getCodec(CompressionKind.SNAPPY));
+    CompressionCodec snappyCodec2 = writeBatchesAndGetCodec(
+        10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
+    assertNotSame(snappyCodec, snappyCodec2);
+    OrcCodecPool.returnCodec(CompressionKind.SNAPPY, snappyCodec);
+    reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+    Assert.assertEquals(CompressionKind.SNAPPY, reader.getCompressionKind());
+    codec = readBatchesAndGetCodec(reader, 10, 1000);
+    assertEquals(2, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
+    assertTrue(snappyCodec == codec || snappyCodec2 == codec);
+  }
+
+  private CompressionCodec writeBatchesAndGetCodec(int count, int size, WriterOptions opts,
+      VectorizedRowBatch batch) throws IOException {
+    fs.delete(testFilePath, false);
+    Writer writer = OrcFile.createWriter(testFilePath, opts);
+    writeRandomIntBytesBatches(writer, batch, count, size);
+    CompressionCodec codec = ((WriterImpl)writer).getCompressionCodec();
+    writer.close();
+    return codec;
+  }
+
+  private CompressionCodec readBatchesAndGetCodec(
+      Reader reader, int count, int size) throws IOException {
+    RecordReader rows = reader.rows();
+    VectorizedRowBatch batch = reader.getSchema().createRowBatch(size);
+    for (int b = 0; b < count; ++b) {
+      rows.nextBatch(batch);
+    }
+    CompressionCodec codec = ((RecordReaderImpl)rows).getCompressionCodec();
+    rows.close();
+    return codec;
+  }
+
+  private void readRandomBatches(
+      Reader reader, RecordReader rows, int count, int size) throws IOException {
+
+  }
+
+  private void writeRandomIntBytesBatches(
+      Writer writer, VectorizedRowBatch batch, int count, int size) throws IOException {
+    Random rand = new Random(12);
+    batch.size = size;
+    for(int b=0; b < count; ++b) {
+      for (int r=0; r < size; ++r) {
+        ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
+        ((BytesColumnVector) batch.cols[1]).setVal(r,
+            Integer.toHexString(rand.nextInt()).getBytes());
+      }
+      writer.addRowBatch(batch);
+    }
+  }
+
+  /**
    * Read and write a randomly generated snappy file.
    * @throws Exception
    */