You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/12 03:13:06 UTC

hive git commit: HIVE-11295 : LLAP: clean up ORC dependencies on object pools (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/llap 2374cfb7a -> 3bf0a45f8


HIVE-11295 : LLAP: clean up ORC dependencies on object pools (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3bf0a45f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3bf0a45f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3bf0a45f

Branch: refs/heads/llap
Commit: 3bf0a45f82ab3baced4c5b1cf5274d38715f0d5c
Parents: 2374cfb
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Aug 11 18:01:15 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Aug 11 18:01:15 2015 -0700

----------------------------------------------------------------------
 .../hive/common/util/FixedSizedObjectPool.java  |  23 +-
 .../common/util/TestFixedSizedObjectPool.java   |  25 +-
 .../llap/io/decode/EncodedDataConsumer.java     |  15 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   2 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |  53 +++-
 .../hive/llap/cache/TestLowLevelCacheImpl.java  |   2 +-
 .../org/apache/hadoop/hive/llap/DebugUtils.java |   5 +-
 .../apache/hadoop/hive/ql/io/orc/Reader.java    |   1 -
 .../hive/ql/io/orc/encoded/CacheChunk.java      |  69 ++++
 .../hive/ql/io/orc/encoded/EncodedReader.java   |  24 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 317 +++++++------------
 .../hadoop/hive/ql/io/orc/encoded/Reader.java   |  36 ++-
 .../hive/ql/io/orc/encoded/ReaderImpl.java      |   4 +-
 .../org/apache/hadoop/hive/common/Pool.java     |  32 ++
 14 files changed, 367 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
index b9d12c7..45e8a71 100644
--- a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
+++ b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
@@ -21,26 +21,16 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.Pool;
 
 import com.google.common.annotations.VisibleForTesting;
 
 /** Simple object pool of limited size. Implemented as a lock-free ring buffer;
  * may fail to produce items if there are too many concurrent users. */
-public class FixedSizedObjectPool<T> {
+public class FixedSizedObjectPool<T> implements Pool<T> {
   public static final Log LOG = LogFactory.getLog(FixedSizedObjectPool.class);
 
   /**
-   * Object helper for objects stored in the pool.
-   */
-  public static abstract class PoolObjectHelper<T> {
-    /** Called to create an object when one cannot be provided. */
-    protected abstract T create();
-    /** Called before the object is put in the pool (regardless of whether put succeeds),
-     *  if the pool size is not 0 . */
-    protected void resetBeforeOffer(T t) {}
-  }
-
-  /**
    * Ring buffer has two "markers" - where objects are present ('objects' list), and where they are
    * removed ('empty' list). This class contains bit shifts and masks for one marker's components
    * within a long, and provides utility methods to get/set the components.
@@ -141,12 +131,19 @@ public class FixedSizedObjectPool<T> {
     casLog = doTraceLog ? new CasLog() : null;
   }
 
+  @Override
   public T take() {
     T result = pool.length > 0 ? takeImpl() : null;
     return (result == null) ? helper.create() : result;
   }
 
-  public boolean offer(T t) {
+  @Override
+  public void offer(T t) {
+    tryOffer(t);
+  }
+
+  @VisibleForTesting
+  public boolean tryOffer(T t) {
     if (t == null || pool.length == 0) return false; // 0 size means no-pooling case - passthru.
     helper.resetBeforeOffer(t);
     return offerImpl(t);

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
index 0d6f2b9..17b640f 100644
--- a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
+++ b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
@@ -30,6 +30,7 @@ import java.util.concurrent.FutureTask;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hadoop.hive.common.Pool;
 import org.junit.Test;
 
 public class TestFixedSizedObjectPool {
@@ -67,7 +68,7 @@ public class TestFixedSizedObjectPool {
 
     protected void doOneOp() {
       Object o = new Object();
-      if (pool.offer(o)) {
+      if (pool.tryOffer(o)) {
         objects.add(o);
       }
     }
@@ -87,19 +88,27 @@ public class TestFixedSizedObjectPool {
     }
   }
 
-  private static class DummyHelper extends FixedSizedObjectPool.PoolObjectHelper<Object> {
+  private static class DummyHelper implements Pool.PoolObjectHelper<Object> {
     @Override
     public Object create() {
       return new Object();
     }
+
+    @Override
+    public void resetBeforeOffer(Object t) {
+    }
   }
 
-  private static class OneObjHelper extends FixedSizedObjectPool.PoolObjectHelper<Object> {
+  private static class OneObjHelper implements Pool.PoolObjectHelper<Object> {
     public static final Object THE_OBJECT = new Object();
     @Override
     public Object create() {
       return THE_OBJECT;
     }
+
+    @Override
+    public void resetBeforeOffer(Object t) {
+    }
   }
 
   @Test
@@ -111,9 +120,9 @@ public class TestFixedSizedObjectPool {
     for (int i = 0; i < SIZE; ++i) {
       Object obj = new Object();
       offered.add(obj);
-      assertTrue(pool.offer(obj));
+      assertTrue(pool.tryOffer(obj));
     }
-    assertFalse(pool.offer(newObj));
+    assertFalse(pool.tryOffer(newObj));
     for (int i = 0; i < SIZE; ++i) {
       Object obj = pool.take();
       assertTrue(offered.remove(obj));
@@ -166,7 +175,7 @@ public class TestFixedSizedObjectPool {
     for (int i = 0; i < (size >> 1); ++i) {
       Object o = new Object();
       allGiven.add(o);
-      assertTrue(pool.offer(o));
+      assertTrue(pool.tryOffer(o));
     }
     @SuppressWarnings("unchecked")
     FutureTask<Object>[] tasks = new FutureTask[TASK_COUNT];
@@ -217,9 +226,9 @@ public class TestFixedSizedObjectPool {
     // Verify that we can drain the pool, then cycle it, i.e. the state is not corrupted.
     while (pool.take() != OneObjHelper.THE_OBJECT);
     for (int i = 0; i < size; ++i) {
-      assertTrue(pool.offer(new Object()));
+      assertTrue(pool.tryOffer(new Object()));
     }
-    assertFalse(pool.offer(new Object()));
+    assertFalse(pool.tryOffer(new Object()));
     for (int i = 0; i < size; ++i) {
       assertTrue(OneObjHelper.THE_OBJECT != pool.take());
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 21a78fe..23c2c51 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -22,13 +22,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
 
+import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hive.common.util.FixedSizedObjectPool;
-import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
 
 /**
  *
@@ -52,11 +52,14 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
     this.downstreamConsumer = consumer;
     this.queueMetrics = queueMetrics;
     cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE,
-        new PoolObjectHelper<ColumnVectorBatch>() {
-              @Override
-              public ColumnVectorBatch create() {
-                return new ColumnVectorBatch(colCount);
-              }
+        new Pool.PoolObjectHelper<ColumnVectorBatch>() {
+          @Override
+          public ColumnVectorBatch create() {
+            return new ColumnVectorBatch(colCount);
+          }
+          @Override
+          public void resetBeforeOffer(ColumnVectorBatch t) {
+          }
         });
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 3408ac0..85cb8d6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.EncodedTreeReaderFactory;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 8066dfd..6de6932 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -13,6 +13,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.CallableWithNdc;
+import org.apache.hadoop.hive.common.Pool;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -50,15 +52,16 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
 import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hive.common.util.FixedSizedObjectPool;
 
 /**
  * This produces EncodedColumnBatch via ORC EncodedDataImpl.
@@ -69,6 +72,44 @@ import org.apache.hadoop.mapred.InputSplit;
 public class OrcEncodedDataReader extends CallableWithNdc<Void>
     implements ConsumerFeedback<OrcEncodedColumnBatch>, Consumer<OrcEncodedColumnBatch> {
   private static final Log LOG = LogFactory.getLog(OrcEncodedDataReader.class);
+  public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
+      new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+        @Override
+        public ColumnStreamData create() {
+          return new ColumnStreamData();
+        }
+        @Override
+        public void resetBeforeOffer(ColumnStreamData t) {
+          t.reset();
+        }
+      });
+  public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+        @Override
+        public OrcEncodedColumnBatch create() {
+          return new OrcEncodedColumnBatch();
+        }
+        @Override
+        public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+          t.reset();
+        }
+      });
+  private static final PoolFactory POOL_FACTORY = new PoolFactory() {
+    @Override
+    public <T> Pool<T> createPool(int size, PoolObjectHelper<T> helper) {
+      return new FixedSizedObjectPool<>(size, helper);
+    }
+
+    @Override
+    public Pool<ColumnStreamData> createColumnStreamDataPool() {
+      return CSD_POOL;
+    }
+
+    @Override
+    public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() {
+      return ECB_POOL;
+    }
+  };
 
   private final OrcMetadataCache metadataCache;
   private final LowLevelCache lowLevelCache;
@@ -246,7 +287,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       ensureOrcReader();
       // Reader creating updates HDFS counters, don't do it here.
       DataWrapperForOrc dw = new DataWrapperForOrc();
-      stripeReader = orcReader.encodedReader(fileId, dw, dw);
+      stripeReader = orcReader.encodedReader(fileId, dw, dw, POOL_FACTORY);
       stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled());
     } catch (Throwable t) {
       consumer.setError(t);
@@ -600,11 +641,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
           }
         }
         lowLevelCache.releaseBuffers(data.getCacheBuffers());
-        EncodedReaderImpl.SB_POOL.offer(data);
+        CSD_POOL.offer(data);
       }
     }
     // We can offer ECB even with some streams not discarded; reset() will clear the arrays.
-    EncodedReaderImpl.ECB_POOL.offer(ecb);
+    ECB_POOL.offer(ecb);
   }
 
   /**
@@ -745,7 +786,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       boolean[] isMissingAnyRgs = new boolean[cols.length];
       int totalRgCount = getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride);
       for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) {
-        OrcEncodedColumnBatch col = EncodedReaderImpl.ECB_POOL.take();
+        OrcEncodedColumnBatch col = ECB_POOL.take();
         col.init(fileId, key.stripeIx, rgIx, cols.length);
         boolean hasAnyCached = false;
         try {

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 94684ca..1e673ad 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
 import org.junit.Test;
 
 public class TestLowLevelCacheImpl {

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index a5291ee..ea626d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.llap;
 
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl;
-
 /**
  * A class that contains debug methods; also allows enabling the logging of various
  * trace messages with low runtime cost, in order to investigate reproducible bugs.
@@ -30,9 +28,8 @@ public class DebugUtils {
     return false;
   }
 
-  private final static boolean isTraceOrcEnabled = EncodedReaderImpl.LOG.isDebugEnabled();
   public static boolean isTraceOrcEnabled() {
-    return isTraceOrcEnabled; // TODO: temporary, should be hardcoded false
+    return false;
   }
 
   public static boolean isTraceLockingEnabled() {

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 0c57758..a0c58a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
new file mode 100644
index 0000000..b3c9169
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -0,0 +1,69 @@
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * DiskRange containing encoded, uncompressed data from cache.
+ * It should be hidden inside EncodedReaderImpl, but we need to expose it for tests.
+ */
+@VisibleForTesting
+public class CacheChunk extends DiskRangeList {
+  protected MemoryBuffer buffer;
+
+  public CacheChunk() {
+    super(-1, -1);
+  }
+
+  public void init(MemoryBuffer buffer, long offset, long end) {
+    this.buffer = buffer;
+    this.offset = offset;
+    this.end = end;
+  }
+
+  @Override
+  public boolean hasData() {
+    return buffer != null;
+  }
+
+  @Override
+  public ByteBuffer getData() {
+    // Callers duplicate the buffer, they have to for BufferChunk
+    return buffer.getByteBufferRaw();
+  }
+
+  @Override
+  public String toString() {
+    return "start: " + offset + " end: " + end + " cache buffer: " + getBuffer();
+  }
+
+  @Override
+  public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
+    throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted ["
+        + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") ");
+  }
+
+  public MemoryBuffer getBuffer() {
+    return buffer;
+  }
+
+  public void setBuffer(MemoryBuffer buffer) {
+    this.buffer = buffer;
+  }
+
+  public void handleCacheCollision(DataCache cache,
+      MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
+    throw new UnsupportedOperationException();
+  }
+
+  public void reset() {
+    init(null, -1, -1);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
index 7f582a4..0691050 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
@@ -24,16 +24,36 @@ import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 
 public interface EncodedReader {
-  // TODO#: document
+
+  /**
+   * Reads encoded data from ORC file.
+   * @param stripeIx Index of the stripe to read.
+   * @param stripe Externally provided metadata (from metadata reader or external cache).
+   * @param index Externally provided metadata (from metadata reader or external cache).
+   * @param encodings Externally provided metadata (from metadata reader or external cache).
+   * @param streams Externally provided metadata (from metadata reader or external cache).
+   * @param included The array of booleans indicating whether each column should be read.
+   * @param colRgs Arrays of rgs, per column set to true in included, that are to be read.
+   *               null in each respective position means all rgs for this column need to be read.
+   * @param consumer The sink for data that has been read.
+   */
   void readEncodedColumns(int stripeIx, StripeInformation stripe,
       RowIndex[] index, List<ColumnEncoding> encodings, List<Stream> streams,
       boolean[] included, boolean[][] colRgs,
       Consumer<OrcEncodedColumnBatch> consumer) throws IOException;
 
+  /**
+   * Closes the reader.
+   */
   void close() throws IOException;
 
+  /**
+   * Controls the low-level debug tracing. (Hopefully) allows for optimization where tracing
+   * checks are entirely eliminated because this method is called with constant value, similar
+   * to just checking the constant in the first place.
+   */
   void setDebugTracing(boolean isEnabled);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 50780ac..ce503d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -20,18 +20,18 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.Pool;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
 import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
@@ -47,10 +47,9 @@ import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
-import org.apache.hive.common.util.FixedSizedObjectPool;
-import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 
 
 /**
@@ -71,7 +70,7 @@ import com.google.common.annotations.VisibleForTesting;
  *
  * For non-dictionary case:
  * 1) All the ColumnStreamData-s for normal data always have refcount 1; we return them once.
- * 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of SB.
+ * 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of CSD.
  * 3) When caller is done, it therefore decrefs SB to 0, and decrefs all the MBs involved.
  * 4) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we return
  *    the MB to caller, and he decrefs it, the MB can't be evicted and will be there if we want to
@@ -83,78 +82,20 @@ import com.google.common.annotations.VisibleForTesting;
  * 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then
  *    not use it; thus, at the end we go thru all the MBs, and release those not released by (5).
  */
-public class EncodedReaderImpl implements EncodedReader {
+class EncodedReaderImpl implements EncodedReader {
   public static final Log LOG = LogFactory.getLog(EncodedReaderImpl.class);
-  private static final FixedSizedObjectPool<ColumnReadContext> COLCTX_POOL =
-      new FixedSizedObjectPool<>(256, new FixedSizedObjectPool.PoolObjectHelper<ColumnReadContext>() {
-        @Override
-        public ColumnReadContext create() {
-          return new ColumnReadContext();
-        }
-        @Override
-        public void resetBeforeOffer(ColumnReadContext t) {
-          t.reset();
-        }
-      });
-  private static final FixedSizedObjectPool<StreamContext> STREAMCTX_POOL =
-      new FixedSizedObjectPool<>(256, new FixedSizedObjectPool.PoolObjectHelper<StreamContext>() {
-        @Override
-        public StreamContext create() {
-          return new StreamContext();
-        }
-        @Override
-        public void resetBeforeOffer(StreamContext t) {
-          t.reset();
-        }
-      });
-  public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
-      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
-        @Override
-        protected OrcEncodedColumnBatch create() {
-          return new OrcEncodedColumnBatch();
-        }
-        @Override
-        protected void resetBeforeOffer(OrcEncodedColumnBatch t) {
-          t.reset();
-        }
-      });
-  public static final FixedSizedObjectPool<ColumnStreamData> SB_POOL =
-      new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
-        @Override
-        protected ColumnStreamData create() {
-          return new ColumnStreamData();
-        }
-        @Override
-        protected void resetBeforeOffer(ColumnStreamData t) {
-          t.reset();
-        }
-      });
-  private static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
-      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
-        @Override
-        protected CacheChunk create() {
-          return new CacheChunk();
-        }
-        @Override
-        protected void resetBeforeOffer(CacheChunk t) {
-          t.reset();
-        }
-      });
-  private static final FixedSizedObjectPool<ProcCacheChunk> PCC_POOL =
-      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<ProcCacheChunk>() {
-        @Override
-        protected ProcCacheChunk create() {
-          return new ProcCacheChunk();
-        }
-        @Override
-        protected void resetBeforeOffer(ProcCacheChunk t) {
-          t.reset();
-        }
-      });
+  private static final Object POOLS_CREATION_LOCK = new Object();
+  private static Pools POOLS;
+  private static class Pools {
+    Pool<CacheChunk> tccPool;
+    Pool<ProcCacheChunk> pccPool;
+    Pool<OrcEncodedColumnBatch> ecbPool;
+    Pool<ColumnStreamData> csdPool;
+  }
   private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
         @Override
         public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
-          CacheChunk tcc = TCC_POOL.take();
+          CacheChunk tcc = POOLS.tccPool.take();
           tcc.init(buffer, offset, end);
           return tcc;
         }
@@ -171,7 +112,8 @@ public class EncodedReaderImpl implements EncodedReader {
   private boolean isDebugTracingEnabled;
 
   public EncodedReaderImpl(long fileId, List<OrcProto.Type> types, CompressionCodec codec,
-      int bufferSize, long strideRate, DataCache cache, DataReader dataReader) throws IOException {
+      int bufferSize, long strideRate, DataCache cache, DataReader dataReader, PoolFactory pf)
+          throws IOException {
     this.fileId = fileId;
     this.codec = codec;
     this.types = types;
@@ -179,22 +121,26 @@ public class EncodedReaderImpl implements EncodedReader {
     this.rowIndexStride = strideRate;
     this.cache = cache;
     this.dataReader = dataReader;
+    if (POOLS != null) return;
+    if (pf == null) {
+      pf = new NoopPoolFactory();
+    }
+    Pools pools = createPools(pf);
+    synchronized (POOLS_CREATION_LOCK) {
+      if (POOLS != null) return;
+      POOLS = pools;
+    }
   }
 
   /** Helper context for each column being read */
   private static final class ColumnReadContext {
-    public void init(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
+    public ColumnReadContext(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
       this.encoding = encoding;
       this.rowIndex = rowIndex;
       this.colIx = colIx;
       streamCount = 0;
     }
-    public void reset() {
-      encoding = null;
-      rowIndex = null;
-      streamCount = 0;
-      Arrays.fill(streams, null);
-    }
+
     public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
     /** The number of streams that are part of this column. */
     int streamCount = 0;
@@ -207,8 +153,7 @@ public class EncodedReaderImpl implements EncodedReader {
     int colIx;
 
     public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
-      StreamContext sctx = streams[streamCount++] = STREAMCTX_POOL.take();
-      sctx.init(stream, offset, indexIx);
+      streams[streamCount++] = new StreamContext(stream, offset, indexIx);
     }
 
     @Override
@@ -229,17 +174,13 @@ public class EncodedReaderImpl implements EncodedReader {
   }
 
   private static final class StreamContext {
-    public void init(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
+    public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
       this.kind = stream.getKind();
       this.length = stream.getLength();
       this.offset = streamOffset;
       this.streamIndexOffset = streamIndexOffset;
     }
-    void reset() {
-      bufferIter = null;
-      stripeLevelStream = null;
-      kind = null;
-    }
+
     /** Offsets of each stream in the column. */
     public long offset, length;
     public int streamIndexOffset;
@@ -260,18 +201,6 @@ public class EncodedReaderImpl implements EncodedReader {
     }
   }
 
-  public static final class OrcEncodedColumnBatch extends EncodedColumnBatch<OrcBatchKey> {
-    public static final int ALL_RGS = -1;
-    public void init(long fileId, int stripeIx, int rgIx, int columnCount) {
-      if (batchKey == null) {
-        batchKey = new OrcBatchKey(fileId, stripeIx, rgIx);
-      } else {
-        batchKey.set(fileId, stripeIx, rgIx);
-      }
-      resetColumnArrays(columnCount);
-    }
-  }
-
   @Override
   public void readEncodedColumns(int stripeIx, StripeInformation stripe,
       RowIndex[] indexes, List<ColumnEncoding> encodings, List<Stream> streamList,
@@ -317,8 +246,8 @@ public class EncodedReaderImpl implements EncodedReader {
         assert colCtxs[colRgIx] == null;
         lastColIx = colIx;
         includedRgs = colRgs[colRgIx];
-        ctx = colCtxs[colRgIx] = COLCTX_POOL.take();
-        ctx.init(colIx, encodings.get(colIx), indexes[colIx]);
+        ctx = colCtxs[colRgIx] = new ColumnReadContext(
+            colIx, encodings.get(colIx), indexes[colIx]);
         if (isDebugTracingEnabled) {
           LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
         }
@@ -350,13 +279,12 @@ public class EncodedReaderImpl implements EncodedReader {
       // No data to read for this stripe. Check if we have some included index-only columns.
       // TODO: there may be a bug here. Could there be partial RG filtering on index-only column?
       if (hasIndexOnlyCols && (includedRgs == null)) {
-        OrcEncodedColumnBatch ecb = ECB_POOL.take();
+        OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
         ecb.init(fileId, stripeIx, OrcEncodedColumnBatch.ALL_RGS, colRgs.length);
         consumer.consumeData(ecb);
       } else {
         LOG.warn("Nothing to read for stripe [" + stripe + "]");
       }
-      releaseContexts(colCtxs);
       return;
     }
 
@@ -366,14 +294,14 @@ public class EncodedReaderImpl implements EncodedReader {
       LOG.info("Resulting disk ranges to read (file " + fileId + "): "
           + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
-    BooleanRef result = new BooleanRef();
-    cache.getFileData(fileId, toRead.next, stripeOffset, CC_FACTORY, result);
+    BooleanRef isAllInCache = new BooleanRef();
+    cache.getFileData(fileId, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
     if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
       LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
           + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
 
-    if (!result.value) {
+    if (!isAllInCache.value) {
       if (!isDataReaderOpen) {
         this.dataReader.open();
         isDataReaderOpen = true;
@@ -408,7 +336,7 @@ public class EncodedReaderImpl implements EncodedReader {
     for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
       boolean isLastRg = rgIx == rgCount - 1;
       // Create the batch we will use to return data for this RG.
-      OrcEncodedColumnBatch ecb = ECB_POOL.take();
+      OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
       ecb.init(fileId, stripeIx, rgIx, colRgs.length);
       boolean isRGSelected = true;
       for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
@@ -431,7 +359,7 @@ public class EncodedReaderImpl implements EncodedReader {
                   + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
             }
             if (sctx.stripeLevelStream == null) {
-              sctx.stripeLevelStream = SB_POOL.take();
+              sctx.stripeLevelStream = POOLS.csdPool.take();
               sctx.stripeLevelStream.init(sctx.kind.getNumber());
               // We will be using this for each RG while also sending RGs to processing.
               // To avoid buffers being unlocked, run refcount one ahead; we will not increase
@@ -461,7 +389,7 @@ public class EncodedReaderImpl implements EncodedReader {
             long endCOffset = sctx.offset + RecordReaderUtils.estimateRgEndOffset(
                 isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize);
             long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
-            cb = createRgStreamBuffer(
+            cb = createRgColumnStreamData(
                 rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
             boolean isStartOfStream = sctx.bufferIter == null;
             DiskRangeList lastCached = readEncodedStream(stripeOffset,
@@ -478,7 +406,6 @@ public class EncodedReaderImpl implements EncodedReader {
         consumer.consumeData(ecb);
       }
     }
-    releaseContexts(colCtxs);
 
     if (isDebugTracingEnabled) {
       LOG.info("Disk ranges after preparing all the data "
@@ -502,10 +429,10 @@ public class EncodedReaderImpl implements EncodedReader {
   }
 
 
-  private ColumnStreamData createRgStreamBuffer(int rgIx, boolean isLastRg,
+  private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg,
       int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
     ColumnStreamData cb;
-    cb = SB_POOL.take();
+    cb = POOLS.csdPool.take();
     cb.init(sctx.kind.getNumber());
     cb.incRef();
     if (isDebugTracingEnabled) {
@@ -517,21 +444,6 @@ public class EncodedReaderImpl implements EncodedReader {
     return cb;
   }
 
-
-  private void releaseContexts(ColumnReadContext[] colCtxs) {
-    // Return all contexts to the pools.
-    for (ColumnReadContext ctx : colCtxs) {
-      if (ctx == null) continue;
-      for (int i = 0; i < ctx.streamCount; ++i) {
-        StreamContext sctx = ctx.streams[i];
-        if (sctx == null) continue;
-        STREAMCTX_POOL.offer(sctx);
-      }
-      COLCTX_POOL.offer(ctx);
-    }
-  }
-
-
   private void releaseInitialRefcounts(DiskRangeList current) {
     while (current != null) {
       DiskRangeList toFree = current;
@@ -559,61 +471,6 @@ public class EncodedReaderImpl implements EncodedReader {
     }
   }
 
-  /** DiskRange containing encoded, uncompressed data from cache. */
-  @VisibleForTesting
-  public static class CacheChunk extends DiskRangeList {
-    protected MemoryBuffer buffer;
-
-    public CacheChunk() {
-      super(-1, -1);
-    }
-
-    public void init(MemoryBuffer buffer, long offset, long end) {
-      this.buffer = buffer;
-      this.offset = offset;
-      this.end = end;
-    }
-
-    @Override
-    public boolean hasData() {
-      return buffer != null;
-    }
-
-    @Override
-    public ByteBuffer getData() {
-      // Callers duplicate the buffer, they have to for BufferChunk
-      return buffer.getByteBufferRaw();
-    }
-
-    @Override
-    public String toString() {
-      return "start: " + offset + " end: " + end + " cache buffer: " + getBuffer();
-    }
-
-    @Override
-    public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
-      throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted ["
-          + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") ");
-    }
-
-    public MemoryBuffer getBuffer() {
-      return buffer;
-    }
-
-    public void setBuffer(MemoryBuffer buffer) {
-      this.buffer = buffer;
-    }
-
-    public void handleCacheCollision(DataCache cache,
-        MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
-      throw new UnsupportedOperationException();
-    }
-
-    public void reset() {
-      init(null, -1, -1);
-    }
-  }
-
   /**
    * Fake cache chunk used for uncompressed data. Used in preRead for uncompressed files.
    * Makes assumptions about preRead code; for example, we add chunks here when they are
@@ -811,7 +668,7 @@ public class EncodedReaderImpl implements EncodedReader {
   }
 
   private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
-      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer,
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
       List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException {
     if (cOffset > current.getOffset()) {
       // Target compression block is in the middle of the range; slice the range in two.
@@ -828,7 +685,7 @@ public class EncodedReaderImpl implements EncodedReader {
           LOG.info("Locking " + cc.getBuffer() + " due to reuse");
         }
         cache.reuseBuffer(cc.getBuffer());
-        streamBuffer.getCacheBuffers().add(cc.getBuffer());
+        columnStreamData.getCacheBuffers().add(cc.getBuffer());
         currentOffset = cc.getEnd();
         if (isDebugTracingEnabled) {
           LOG.info("Adding an already-uncompressed buffer " + cc.getBuffer());
@@ -848,7 +705,7 @@ public class EncodedReaderImpl implements EncodedReader {
         // several disk ranges, so we might need to combine them.
         BufferChunk bc = (BufferChunk)current;
         ProcCacheChunk newCached = addOneCompressionBuffer(
-            bc, streamBuffer.getCacheBuffers(), toDecompress, toRelease);
+            bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease);
         lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
         next = (newCached != null) ? newCached.next : null;
         currentOffset = (next != null) ? next.getOffset() : -1;
@@ -863,7 +720,7 @@ public class EncodedReaderImpl implements EncodedReader {
   }
 
   private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
-      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer)
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData)
           throws IOException {
     long currentOffset = cOffset;
     CacheChunk lastUncompressed = null;
@@ -877,10 +734,10 @@ public class EncodedReaderImpl implements EncodedReader {
       }
       cache.reuseBuffer(lastUncompressed.getBuffer());
       if (isFirst) {
-        streamBuffer.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
+        columnStreamData.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
         isFirst = false;
       }
-      streamBuffer.getCacheBuffers().add(lastUncompressed.getBuffer());
+      columnStreamData.getCacheBuffers().add(lastUncompressed.getBuffer());
       currentOffset = lastUncompressed.getEnd();
       if (isDebugTracingEnabled) {
         LOG.info("Adding an uncompressed buffer " + lastUncompressed.getBuffer());
@@ -1079,7 +936,7 @@ public class EncodedReaderImpl implements EncodedReader {
     MemoryBuffer buffer = singleAlloc[0];
     cache.reuseBuffer(buffer);
     ByteBuffer dest = buffer.getByteBufferRaw();
-    CacheChunk tcc = TCC_POOL.take();
+    CacheChunk tcc = POOLS.tccPool.take();
     tcc.init(buffer, partOffset, candidateEnd);
     copyAndReplaceUncompressedChunks(candidateCached, dest, tcc);
     return tcc;
@@ -1092,7 +949,7 @@ public class EncodedReaderImpl implements EncodedReader {
     MemoryBuffer buffer = singleAlloc[0];
     cache.reuseBuffer(buffer);
     ByteBuffer dest = buffer.getByteBufferRaw();
-    CacheChunk tcc = TCC_POOL.take();
+    CacheChunk tcc = POOLS.tccPool.take();
     tcc.init(buffer, bc.getOffset(), bc.getEnd());
     copyUncompressedChunk(bc.getChunk(), dest);
     bc.replaceSelfWith(tcc);
@@ -1138,9 +995,9 @@ public class EncodedReaderImpl implements EncodedReader {
   public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
     while (current != null) {
       if (current instanceof ProcCacheChunk) {
-        PCC_POOL.offer((ProcCacheChunk)current);
+        POOLS.pccPool.offer((ProcCacheChunk)current);
       } else if (current instanceof CacheChunk) {
-        TCC_POOL.offer((CacheChunk)current);
+        POOLS.tccPool.offer((CacheChunk)current);
       }
       current = current.next;
     }
@@ -1373,7 +1230,7 @@ public class EncodedReaderImpl implements EncodedReader {
     // Add it to result in order we are processing.
     cacheBuffers.add(futureAlloc);
     // Add it to the list of work to decompress.
-    ProcCacheChunk cc = PCC_POOL.take();
+    ProcCacheChunk cc = POOLS.pccPool.take();
     cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
         fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
     toDecompress.add(cc);
@@ -1401,4 +1258,72 @@ public class EncodedReaderImpl implements EncodedReader {
   private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
     return isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
   }
+
+
+  private static Pools createPools(PoolFactory pf) {
+    Pools pools = new Pools();
+    pools.pccPool = pf.createPool(1024, new PoolObjectHelper<ProcCacheChunk>() {
+      @Override
+      public ProcCacheChunk create() {
+        return new ProcCacheChunk();
+      }
+      @Override
+      public void resetBeforeOffer(ProcCacheChunk t) {
+        t.reset();
+      }
+    });
+    pools.tccPool = pf.createPool(1024, new PoolObjectHelper<CacheChunk>() {
+      @Override
+      public CacheChunk create() {
+        return new CacheChunk();
+      }
+      @Override
+      public void resetBeforeOffer(CacheChunk t) {
+        t.reset();
+      }
+    });
+    pools.ecbPool = pf.createEncodedColumnBatchPool();
+    pools.csdPool = pf.createColumnStreamDataPool();
+    return pools;
+  }
+
+  /** Pool factory that is used if another one isn't specified - just creates the objects. */
+  private static class NoopPoolFactory implements PoolFactory {
+    @Override
+    public <T> Pool<T> createPool(int size, final PoolObjectHelper<T> helper) {
+      return new Pool<T>() {
+        public void offer(T t) {
+        }
+        public T take() {
+          return helper.create();
+        }
+      };
+    }
+
+    @Override
+    public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() {
+      return createPool(0, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+        @Override
+        public OrcEncodedColumnBatch create() {
+          return new OrcEncodedColumnBatch();
+        }
+        @Override
+        public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+        }
+      });
+    }
+
+    @Override
+    public Pool<ColumnStreamData> createColumnStreamDataPool() {
+      return createPool(0, new PoolObjectHelper<ColumnStreamData>() {
+        @Override
+        public ColumnStreamData create() {
+          return new ColumnStreamData();
+        }
+        @Override
+        public void resetBeforeOffer(ColumnStreamData t) {
+        }
+      });
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index c1f3f57..8e7c62e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -20,13 +20,47 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.common.Pool;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
 import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.ql.io.orc.DataReader;
 
 /**
  * The interface for reading encoded data from ORC files.
  */
 public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
+
+  /** The factory that can create (or return) the pools used by encoded reader. */
+  public interface PoolFactory {
+    <T> Pool<T> createPool(int size, PoolObjectHelper<T> helper);
+    Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool();
+    Pool<ColumnStreamData> createColumnStreamDataPool();
+  }
+
+  /** Implementation of EncodedColumnBatch for ORC. */
+  public static final class OrcEncodedColumnBatch extends EncodedColumnBatch<OrcBatchKey> {
+    /** RG index indicating the data applies for all RGs (e.g. a string dictionary). */
+    public static final int ALL_RGS = -1;
+    public void init(long fileId, int stripeIx, int rgIx, int columnCount) {
+      if (batchKey == null) {
+        batchKey = new OrcBatchKey(fileId, stripeIx, rgIx);
+      } else {
+        batchKey.set(fileId, stripeIx, rgIx);
+      }
+      resetColumnArrays(columnCount);
+    }
+  }
+
+  /**
+   * Creates the encoded reader.
+   * @param fileId File ID to read, to use for cache lookups and such.
+   * @param dataCache Data cache to use for cache lookups.
+   * @param dataReader Data reader to read data not found in cache (from disk, HDFS, and such).
+   * @param pf Pool factory to create object pools.
+   * @return The reader.
+   */
   EncodedReader encodedReader(
-      long fileId, DataCache dataCache, DataReader dataReader) throws IOException;
+      long fileId, DataCache dataCache, DataReader dataReader, PoolFactory pf) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index b9de5e0..7b3c2a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -35,8 +35,8 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
 
   @Override
   public EncodedReader encodedReader(
-      long fileId, DataCache dataCache, DataReader dataReader) throws IOException {
+      long fileId, DataCache dataCache, DataReader dataReader, PoolFactory pf) throws IOException {
     return new EncodedReaderImpl(fileId, types,
-        codec, bufferSize, rowIndexStride, dataCache, dataReader);
+        codec, bufferSize, rowIndexStride, dataCache, dataReader, pf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
new file mode 100644
index 0000000..e41a515
--- /dev/null
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+/** Simple object pool to prevent GC on small objects passed between threads. */
+public interface Pool<T> {
+  /** Object helper for objects stored in the pool. */
+  public interface PoolObjectHelper<T> {
+    /** Called to create an object when one cannot be provided. */
+    T create();
+    /** Called before the object is put in the pool (regardless of whether put succeeds). */
+    void resetBeforeOffer(T t);
+  }
+
+  T take();
+  void offer(T t);
+}
\ No newline at end of file