You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/12 03:13:06 UTC
hive git commit: HIVE-11295 : LLAP: clean up ORC dependencies on
object pools (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/llap 2374cfb7a -> 3bf0a45f8
HIVE-11295 : LLAP: clean up ORC dependencies on object pools (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3bf0a45f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3bf0a45f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3bf0a45f
Branch: refs/heads/llap
Commit: 3bf0a45f82ab3baced4c5b1cf5274d38715f0d5c
Parents: 2374cfb
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Aug 11 18:01:15 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Aug 11 18:01:15 2015 -0700
----------------------------------------------------------------------
.../hive/common/util/FixedSizedObjectPool.java | 23 +-
.../common/util/TestFixedSizedObjectPool.java | 25 +-
.../llap/io/decode/EncodedDataConsumer.java | 15 +-
.../llap/io/decode/OrcEncodedDataConsumer.java | 2 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 53 +++-
.../hive/llap/cache/TestLowLevelCacheImpl.java | 2 +-
.../org/apache/hadoop/hive/llap/DebugUtils.java | 5 +-
.../apache/hadoop/hive/ql/io/orc/Reader.java | 1 -
.../hive/ql/io/orc/encoded/CacheChunk.java | 69 ++++
.../hive/ql/io/orc/encoded/EncodedReader.java | 24 +-
.../ql/io/orc/encoded/EncodedReaderImpl.java | 317 +++++++------------
.../hadoop/hive/ql/io/orc/encoded/Reader.java | 36 ++-
.../hive/ql/io/orc/encoded/ReaderImpl.java | 4 +-
.../org/apache/hadoop/hive/common/Pool.java | 32 ++
14 files changed, 367 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
index b9d12c7..45e8a71 100644
--- a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
+++ b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
@@ -21,26 +21,16 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.Pool;
import com.google.common.annotations.VisibleForTesting;
/** Simple object pool of limited size. Implemented as a lock-free ring buffer;
* may fail to produce items if there are too many concurrent users. */
-public class FixedSizedObjectPool<T> {
+public class FixedSizedObjectPool<T> implements Pool<T> {
public static final Log LOG = LogFactory.getLog(FixedSizedObjectPool.class);
/**
- * Object helper for objects stored in the pool.
- */
- public static abstract class PoolObjectHelper<T> {
- /** Called to create an object when one cannot be provided. */
- protected abstract T create();
- /** Called before the object is put in the pool (regardless of whether put succeeds),
- * if the pool size is not 0 . */
- protected void resetBeforeOffer(T t) {}
- }
-
- /**
* Ring buffer has two "markers" - where objects are present ('objects' list), and where they are
* removed ('empty' list). This class contains bit shifts and masks for one marker's components
* within a long, and provides utility methods to get/set the components.
@@ -141,12 +131,19 @@ public class FixedSizedObjectPool<T> {
casLog = doTraceLog ? new CasLog() : null;
}
+ @Override
public T take() {
T result = pool.length > 0 ? takeImpl() : null;
return (result == null) ? helper.create() : result;
}
- public boolean offer(T t) {
+ @Override
+ public void offer(T t) {
+ tryOffer(t);
+ }
+
+ @VisibleForTesting
+ public boolean tryOffer(T t) {
if (t == null || pool.length == 0) return false; // 0 size means no-pooling case - passthru.
helper.resetBeforeOffer(t);
return offerImpl(t);
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
index 0d6f2b9..17b640f 100644
--- a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
+++ b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
@@ -30,6 +30,7 @@ import java.util.concurrent.FutureTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hadoop.hive.common.Pool;
import org.junit.Test;
public class TestFixedSizedObjectPool {
@@ -67,7 +68,7 @@ public class TestFixedSizedObjectPool {
protected void doOneOp() {
Object o = new Object();
- if (pool.offer(o)) {
+ if (pool.tryOffer(o)) {
objects.add(o);
}
}
@@ -87,19 +88,27 @@ public class TestFixedSizedObjectPool {
}
}
- private static class DummyHelper extends FixedSizedObjectPool.PoolObjectHelper<Object> {
+ private static class DummyHelper implements Pool.PoolObjectHelper<Object> {
@Override
public Object create() {
return new Object();
}
+
+ @Override
+ public void resetBeforeOffer(Object t) {
+ }
}
- private static class OneObjHelper extends FixedSizedObjectPool.PoolObjectHelper<Object> {
+ private static class OneObjHelper implements Pool.PoolObjectHelper<Object> {
public static final Object THE_OBJECT = new Object();
@Override
public Object create() {
return THE_OBJECT;
}
+
+ @Override
+ public void resetBeforeOffer(Object t) {
+ }
}
@Test
@@ -111,9 +120,9 @@ public class TestFixedSizedObjectPool {
for (int i = 0; i < SIZE; ++i) {
Object obj = new Object();
offered.add(obj);
- assertTrue(pool.offer(obj));
+ assertTrue(pool.tryOffer(obj));
}
- assertFalse(pool.offer(newObj));
+ assertFalse(pool.tryOffer(newObj));
for (int i = 0; i < SIZE; ++i) {
Object obj = pool.take();
assertTrue(offered.remove(obj));
@@ -166,7 +175,7 @@ public class TestFixedSizedObjectPool {
for (int i = 0; i < (size >> 1); ++i) {
Object o = new Object();
allGiven.add(o);
- assertTrue(pool.offer(o));
+ assertTrue(pool.tryOffer(o));
}
@SuppressWarnings("unchecked")
FutureTask<Object>[] tasks = new FutureTask[TASK_COUNT];
@@ -217,9 +226,9 @@ public class TestFixedSizedObjectPool {
// Verify that we can drain the pool, then cycle it, i.e. the state is not corrupted.
while (pool.take() != OneObjHelper.THE_OBJECT);
for (int i = 0; i < size; ++i) {
- assertTrue(pool.offer(new Object()));
+ assertTrue(pool.tryOffer(new Object()));
}
- assertFalse(pool.offer(new Object()));
+ assertFalse(pool.tryOffer(new Object()));
for (int i = 0; i < size; ++i) {
assertTrue(OneObjHelper.THE_OBJECT != pool.take());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 21a78fe..23c2c51 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -22,13 +22,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
+import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hive.common.util.FixedSizedObjectPool;
-import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
/**
*
@@ -52,11 +52,14 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
this.downstreamConsumer = consumer;
this.queueMetrics = queueMetrics;
cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE,
- new PoolObjectHelper<ColumnVectorBatch>() {
- @Override
- public ColumnVectorBatch create() {
- return new ColumnVectorBatch(colCount);
- }
+ new Pool.PoolObjectHelper<ColumnVectorBatch>() {
+ @Override
+ public ColumnVectorBatch create() {
+ return new ColumnVectorBatch(colCount);
+ }
+ @Override
+ public void resetBeforeOffer(ColumnVectorBatch t) {
+ }
});
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 3408ac0..85cb8d6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.EncodedTreeReaderFactory;
import org.apache.hadoop.hive.ql.io.orc.OrcProto;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 8066dfd..6de6932 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -13,6 +13,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.CallableWithNdc;
+import org.apache.hadoop.hive.common.Pool;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -50,15 +52,16 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hive.common.util.FixedSizedObjectPool;
/**
* This produces EncodedColumnBatch via ORC EncodedDataImpl.
@@ -69,6 +72,44 @@ import org.apache.hadoop.mapred.InputSplit;
public class OrcEncodedDataReader extends CallableWithNdc<Void>
implements ConsumerFeedback<OrcEncodedColumnBatch>, Consumer<OrcEncodedColumnBatch> {
private static final Log LOG = LogFactory.getLog(OrcEncodedDataReader.class);
+ public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
+ new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+ @Override
+ public ColumnStreamData create() {
+ return new ColumnStreamData();
+ }
+ @Override
+ public void resetBeforeOffer(ColumnStreamData t) {
+ t.reset();
+ }
+ });
+ public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+ @Override
+ public OrcEncodedColumnBatch create() {
+ return new OrcEncodedColumnBatch();
+ }
+ @Override
+ public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+ t.reset();
+ }
+ });
+ private static final PoolFactory POOL_FACTORY = new PoolFactory() {
+ @Override
+ public <T> Pool<T> createPool(int size, PoolObjectHelper<T> helper) {
+ return new FixedSizedObjectPool<>(size, helper);
+ }
+
+ @Override
+ public Pool<ColumnStreamData> createColumnStreamDataPool() {
+ return CSD_POOL;
+ }
+
+ @Override
+ public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() {
+ return ECB_POOL;
+ }
+ };
private final OrcMetadataCache metadataCache;
private final LowLevelCache lowLevelCache;
@@ -246,7 +287,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
ensureOrcReader();
// Reader creating updates HDFS counters, don't do it here.
DataWrapperForOrc dw = new DataWrapperForOrc();
- stripeReader = orcReader.encodedReader(fileId, dw, dw);
+ stripeReader = orcReader.encodedReader(fileId, dw, dw, POOL_FACTORY);
stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled());
} catch (Throwable t) {
consumer.setError(t);
@@ -600,11 +641,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
}
lowLevelCache.releaseBuffers(data.getCacheBuffers());
- EncodedReaderImpl.SB_POOL.offer(data);
+ CSD_POOL.offer(data);
}
}
// We can offer ECB even with some streams not discarded; reset() will clear the arrays.
- EncodedReaderImpl.ECB_POOL.offer(ecb);
+ ECB_POOL.offer(ecb);
}
/**
@@ -745,7 +786,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
boolean[] isMissingAnyRgs = new boolean[cols.length];
int totalRgCount = getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride);
for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) {
- OrcEncodedColumnBatch col = EncodedReaderImpl.ECB_POOL.take();
+ OrcEncodedColumnBatch col = ECB_POOL.take();
col.init(fileId, key.stripeIx, rgIx, cols.length);
boolean hasAnyCached = false;
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 94684ca..1e673ad 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
import org.junit.Test;
public class TestLowLevelCacheImpl {
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index a5291ee..ea626d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.llap;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl;
-
/**
* A class that contains debug methods; also allows enabling the logging of various
* trace messages with low runtime cost, in order to investigate reproducible bugs.
@@ -30,9 +28,8 @@ public class DebugUtils {
return false;
}
- private final static boolean isTraceOrcEnabled = EncodedReaderImpl.LOG.isDebugEnabled();
public static boolean isTraceOrcEnabled() {
- return isTraceOrcEnabled; // TODO: temporary, should be hardcoded false
+ return false;
}
public static boolean isTraceLockingEnabled() {
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 0c57758..a0c58a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
new file mode 100644
index 0000000..b3c9169
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -0,0 +1,69 @@
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * DiskRange containing encoded, uncompressed data from cache.
+ * It should be hidden inside EncodedReaderImpl, but we need to expose it for tests.
+ */
+@VisibleForTesting
+public class CacheChunk extends DiskRangeList {
+ protected MemoryBuffer buffer;
+
+ public CacheChunk() {
+ super(-1, -1);
+ }
+
+ public void init(MemoryBuffer buffer, long offset, long end) {
+ this.buffer = buffer;
+ this.offset = offset;
+ this.end = end;
+ }
+
+ @Override
+ public boolean hasData() {
+ return buffer != null;
+ }
+
+ @Override
+ public ByteBuffer getData() {
+ // Callers duplicate the buffer, they have to for BufferChunk
+ return buffer.getByteBufferRaw();
+ }
+
+ @Override
+ public String toString() {
+ return "start: " + offset + " end: " + end + " cache buffer: " + getBuffer();
+ }
+
+ @Override
+ public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
+ throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted ["
+ + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") ");
+ }
+
+ public MemoryBuffer getBuffer() {
+ return buffer;
+ }
+
+ public void setBuffer(MemoryBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public void handleCacheCollision(DataCache cache,
+ MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reset() {
+ init(null, -1, -1);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
index 7f582a4..0691050 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
@@ -24,16 +24,36 @@ import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
-import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
public interface EncodedReader {
- // TODO#: document
+
+ /**
+ * Reads encoded data from ORC file.
+ * @param stripeIx Index of the stripe to read.
+ * @param stripe Externally provided metadata (from metadata reader or external cache).
+ * @param index Externally provided metadata (from metadata reader or external cache).
+ * @param encodings Externally provided metadata (from metadata reader or external cache).
+ * @param streams Externally provided metadata (from metadata reader or external cache).
+ * @param included The array of booleans indicating whether each column should be read.
+ * @param colRgs Arrays of rgs, per column set to true in included, that are to be read.
+ * null in each respective position means all rgs for this column need to be read.
+ * @param consumer The sink for data that has been read.
+ */
void readEncodedColumns(int stripeIx, StripeInformation stripe,
RowIndex[] index, List<ColumnEncoding> encodings, List<Stream> streams,
boolean[] included, boolean[][] colRgs,
Consumer<OrcEncodedColumnBatch> consumer) throws IOException;
+ /**
+ * Closes the reader.
+ */
void close() throws IOException;
+ /**
+ * Controls the low-level debug tracing. (Hopefully) allows for optimization where tracing
+ * checks are entirely eliminated because this method is called with constant value, similar
+ * to just checking the constant in the first place.
+ */
void setDebugTracing(boolean isEnabled);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 50780ac..ce503d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -20,18 +20,18 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.Pool;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
@@ -47,10 +47,9 @@ import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
-import org.apache.hive.common.util.FixedSizedObjectPool;
-import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
-import com.google.common.annotations.VisibleForTesting;
/**
@@ -71,7 +70,7 @@ import com.google.common.annotations.VisibleForTesting;
*
* For non-dictionary case:
* 1) All the ColumnStreamData-s for normal data always have refcount 1; we return them once.
- * 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of SB.
+ * 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of CSD.
* 3) When caller is done, it therefore decrefs SB to 0, and decrefs all the MBs involved.
* 4) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we return
* the MB to caller, and he decrefs it, the MB can't be evicted and will be there if we want to
@@ -83,78 +82,20 @@ import com.google.common.annotations.VisibleForTesting;
* 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then
* not use it; thus, at the end we go thru all the MBs, and release those not released by (5).
*/
-public class EncodedReaderImpl implements EncodedReader {
+class EncodedReaderImpl implements EncodedReader {
public static final Log LOG = LogFactory.getLog(EncodedReaderImpl.class);
- private static final FixedSizedObjectPool<ColumnReadContext> COLCTX_POOL =
- new FixedSizedObjectPool<>(256, new FixedSizedObjectPool.PoolObjectHelper<ColumnReadContext>() {
- @Override
- public ColumnReadContext create() {
- return new ColumnReadContext();
- }
- @Override
- public void resetBeforeOffer(ColumnReadContext t) {
- t.reset();
- }
- });
- private static final FixedSizedObjectPool<StreamContext> STREAMCTX_POOL =
- new FixedSizedObjectPool<>(256, new FixedSizedObjectPool.PoolObjectHelper<StreamContext>() {
- @Override
- public StreamContext create() {
- return new StreamContext();
- }
- @Override
- public void resetBeforeOffer(StreamContext t) {
- t.reset();
- }
- });
- public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
- new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
- @Override
- protected OrcEncodedColumnBatch create() {
- return new OrcEncodedColumnBatch();
- }
- @Override
- protected void resetBeforeOffer(OrcEncodedColumnBatch t) {
- t.reset();
- }
- });
- public static final FixedSizedObjectPool<ColumnStreamData> SB_POOL =
- new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
- @Override
- protected ColumnStreamData create() {
- return new ColumnStreamData();
- }
- @Override
- protected void resetBeforeOffer(ColumnStreamData t) {
- t.reset();
- }
- });
- private static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
- new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
- @Override
- protected CacheChunk create() {
- return new CacheChunk();
- }
- @Override
- protected void resetBeforeOffer(CacheChunk t) {
- t.reset();
- }
- });
- private static final FixedSizedObjectPool<ProcCacheChunk> PCC_POOL =
- new FixedSizedObjectPool<>(1024, new PoolObjectHelper<ProcCacheChunk>() {
- @Override
- protected ProcCacheChunk create() {
- return new ProcCacheChunk();
- }
- @Override
- protected void resetBeforeOffer(ProcCacheChunk t) {
- t.reset();
- }
- });
+ private static final Object POOLS_CREATION_LOCK = new Object();
+ private static Pools POOLS;
+ private static class Pools {
+ Pool<CacheChunk> tccPool;
+ Pool<ProcCacheChunk> pccPool;
+ Pool<OrcEncodedColumnBatch> ecbPool;
+ Pool<ColumnStreamData> csdPool;
+ }
private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
@Override
public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
- CacheChunk tcc = TCC_POOL.take();
+ CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, offset, end);
return tcc;
}
@@ -171,7 +112,8 @@ public class EncodedReaderImpl implements EncodedReader {
private boolean isDebugTracingEnabled;
public EncodedReaderImpl(long fileId, List<OrcProto.Type> types, CompressionCodec codec,
- int bufferSize, long strideRate, DataCache cache, DataReader dataReader) throws IOException {
+ int bufferSize, long strideRate, DataCache cache, DataReader dataReader, PoolFactory pf)
+ throws IOException {
this.fileId = fileId;
this.codec = codec;
this.types = types;
@@ -179,22 +121,26 @@ public class EncodedReaderImpl implements EncodedReader {
this.rowIndexStride = strideRate;
this.cache = cache;
this.dataReader = dataReader;
+ if (POOLS != null) return;
+ if (pf == null) {
+ pf = new NoopPoolFactory();
+ }
+ Pools pools = createPools(pf);
+ synchronized (POOLS_CREATION_LOCK) {
+ if (POOLS != null) return;
+ POOLS = pools;
+ }
}
/** Helper context for each column being read */
private static final class ColumnReadContext {
- public void init(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
+ public ColumnReadContext(int colIx, ColumnEncoding encoding, RowIndex rowIndex) {
this.encoding = encoding;
this.rowIndex = rowIndex;
this.colIx = colIx;
streamCount = 0;
}
- public void reset() {
- encoding = null;
- rowIndex = null;
- streamCount = 0;
- Arrays.fill(streams, null);
- }
+
public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
/** The number of streams that are part of this column. */
int streamCount = 0;
@@ -207,8 +153,7 @@ public class EncodedReaderImpl implements EncodedReader {
int colIx;
public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
- StreamContext sctx = streams[streamCount++] = STREAMCTX_POOL.take();
- sctx.init(stream, offset, indexIx);
+ streams[streamCount++] = new StreamContext(stream, offset, indexIx);
}
@Override
@@ -229,17 +174,13 @@ public class EncodedReaderImpl implements EncodedReader {
}
private static final class StreamContext {
- public void init(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
+ public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
this.kind = stream.getKind();
this.length = stream.getLength();
this.offset = streamOffset;
this.streamIndexOffset = streamIndexOffset;
}
- void reset() {
- bufferIter = null;
- stripeLevelStream = null;
- kind = null;
- }
+
/** Offsets of each stream in the column. */
public long offset, length;
public int streamIndexOffset;
@@ -260,18 +201,6 @@ public class EncodedReaderImpl implements EncodedReader {
}
}
- public static final class OrcEncodedColumnBatch extends EncodedColumnBatch<OrcBatchKey> {
- public static final int ALL_RGS = -1;
- public void init(long fileId, int stripeIx, int rgIx, int columnCount) {
- if (batchKey == null) {
- batchKey = new OrcBatchKey(fileId, stripeIx, rgIx);
- } else {
- batchKey.set(fileId, stripeIx, rgIx);
- }
- resetColumnArrays(columnCount);
- }
- }
-
@Override
public void readEncodedColumns(int stripeIx, StripeInformation stripe,
RowIndex[] indexes, List<ColumnEncoding> encodings, List<Stream> streamList,
@@ -317,8 +246,8 @@ public class EncodedReaderImpl implements EncodedReader {
assert colCtxs[colRgIx] == null;
lastColIx = colIx;
includedRgs = colRgs[colRgIx];
- ctx = colCtxs[colRgIx] = COLCTX_POOL.take();
- ctx.init(colIx, encodings.get(colIx), indexes[colIx]);
+ ctx = colCtxs[colRgIx] = new ColumnReadContext(
+ colIx, encodings.get(colIx), indexes[colIx]);
if (isDebugTracingEnabled) {
LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
}
@@ -350,13 +279,12 @@ public class EncodedReaderImpl implements EncodedReader {
// No data to read for this stripe. Check if we have some included index-only columns.
// TODO: there may be a bug here. Could there be partial RG filtering on index-only column?
if (hasIndexOnlyCols && (includedRgs == null)) {
- OrcEncodedColumnBatch ecb = ECB_POOL.take();
+ OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
ecb.init(fileId, stripeIx, OrcEncodedColumnBatch.ALL_RGS, colRgs.length);
consumer.consumeData(ecb);
} else {
LOG.warn("Nothing to read for stripe [" + stripe + "]");
}
- releaseContexts(colCtxs);
return;
}
@@ -366,14 +294,14 @@ public class EncodedReaderImpl implements EncodedReader {
LOG.info("Resulting disk ranges to read (file " + fileId + "): "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
- BooleanRef result = new BooleanRef();
- cache.getFileData(fileId, toRead.next, stripeOffset, CC_FACTORY, result);
+ BooleanRef isAllInCache = new BooleanRef();
+ cache.getFileData(fileId, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
- if (!result.value) {
+ if (!isAllInCache.value) {
if (!isDataReaderOpen) {
this.dataReader.open();
isDataReaderOpen = true;
@@ -408,7 +336,7 @@ public class EncodedReaderImpl implements EncodedReader {
for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
boolean isLastRg = rgIx == rgCount - 1;
// Create the batch we will use to return data for this RG.
- OrcEncodedColumnBatch ecb = ECB_POOL.take();
+ OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
ecb.init(fileId, stripeIx, rgIx, colRgs.length);
boolean isRGSelected = true;
for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
@@ -431,7 +359,7 @@ public class EncodedReaderImpl implements EncodedReader {
+ " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
}
if (sctx.stripeLevelStream == null) {
- sctx.stripeLevelStream = SB_POOL.take();
+ sctx.stripeLevelStream = POOLS.csdPool.take();
sctx.stripeLevelStream.init(sctx.kind.getNumber());
// We will be using this for each RG while also sending RGs to processing.
// To avoid buffers being unlocked, run refcount one ahead; we will not increase
@@ -461,7 +389,7 @@ public class EncodedReaderImpl implements EncodedReader {
long endCOffset = sctx.offset + RecordReaderUtils.estimateRgEndOffset(
isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize);
long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
- cb = createRgStreamBuffer(
+ cb = createRgColumnStreamData(
rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
boolean isStartOfStream = sctx.bufferIter == null;
DiskRangeList lastCached = readEncodedStream(stripeOffset,
@@ -478,7 +406,6 @@ public class EncodedReaderImpl implements EncodedReader {
consumer.consumeData(ecb);
}
}
- releaseContexts(colCtxs);
if (isDebugTracingEnabled) {
LOG.info("Disk ranges after preparing all the data "
@@ -502,10 +429,10 @@ public class EncodedReaderImpl implements EncodedReader {
}
- private ColumnStreamData createRgStreamBuffer(int rgIx, boolean isLastRg,
+ private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg,
int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
ColumnStreamData cb;
- cb = SB_POOL.take();
+ cb = POOLS.csdPool.take();
cb.init(sctx.kind.getNumber());
cb.incRef();
if (isDebugTracingEnabled) {
@@ -517,21 +444,6 @@ public class EncodedReaderImpl implements EncodedReader {
return cb;
}
-
- private void releaseContexts(ColumnReadContext[] colCtxs) {
- // Return all contexts to the pools.
- for (ColumnReadContext ctx : colCtxs) {
- if (ctx == null) continue;
- for (int i = 0; i < ctx.streamCount; ++i) {
- StreamContext sctx = ctx.streams[i];
- if (sctx == null) continue;
- STREAMCTX_POOL.offer(sctx);
- }
- COLCTX_POOL.offer(ctx);
- }
- }
-
-
private void releaseInitialRefcounts(DiskRangeList current) {
while (current != null) {
DiskRangeList toFree = current;
@@ -559,61 +471,6 @@ public class EncodedReaderImpl implements EncodedReader {
}
}
- /** DiskRange containing encoded, uncompressed data from cache. */
- @VisibleForTesting
- public static class CacheChunk extends DiskRangeList {
- protected MemoryBuffer buffer;
-
- public CacheChunk() {
- super(-1, -1);
- }
-
- public void init(MemoryBuffer buffer, long offset, long end) {
- this.buffer = buffer;
- this.offset = offset;
- this.end = end;
- }
-
- @Override
- public boolean hasData() {
- return buffer != null;
- }
-
- @Override
- public ByteBuffer getData() {
- // Callers duplicate the buffer, they have to for BufferChunk
- return buffer.getByteBufferRaw();
- }
-
- @Override
- public String toString() {
- return "start: " + offset + " end: " + end + " cache buffer: " + getBuffer();
- }
-
- @Override
- public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
- throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted ["
- + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") ");
- }
-
- public MemoryBuffer getBuffer() {
- return buffer;
- }
-
- public void setBuffer(MemoryBuffer buffer) {
- this.buffer = buffer;
- }
-
- public void handleCacheCollision(DataCache cache,
- MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
- throw new UnsupportedOperationException();
- }
-
- public void reset() {
- init(null, -1, -1);
- }
- }
-
/**
* Fake cache chunk used for uncompressed data. Used in preRead for uncompressed files.
* Makes assumptions about preRead code; for example, we add chunks here when they are
@@ -811,7 +668,7 @@ public class EncodedReaderImpl implements EncodedReader {
}
private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
- long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer,
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException {
if (cOffset > current.getOffset()) {
// Target compression block is in the middle of the range; slice the range in two.
@@ -828,7 +685,7 @@ public class EncodedReaderImpl implements EncodedReader {
LOG.info("Locking " + cc.getBuffer() + " due to reuse");
}
cache.reuseBuffer(cc.getBuffer());
- streamBuffer.getCacheBuffers().add(cc.getBuffer());
+ columnStreamData.getCacheBuffers().add(cc.getBuffer());
currentOffset = cc.getEnd();
if (isDebugTracingEnabled) {
LOG.info("Adding an already-uncompressed buffer " + cc.getBuffer());
@@ -848,7 +705,7 @@ public class EncodedReaderImpl implements EncodedReader {
// several disk ranges, so we might need to combine them.
BufferChunk bc = (BufferChunk)current;
ProcCacheChunk newCached = addOneCompressionBuffer(
- bc, streamBuffer.getCacheBuffers(), toDecompress, toRelease);
+ bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease);
lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
next = (newCached != null) ? newCached.next : null;
currentOffset = (next != null) ? next.getOffset() : -1;
@@ -863,7 +720,7 @@ public class EncodedReaderImpl implements EncodedReader {
}
private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
- long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer)
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData)
throws IOException {
long currentOffset = cOffset;
CacheChunk lastUncompressed = null;
@@ -877,10 +734,10 @@ public class EncodedReaderImpl implements EncodedReader {
}
cache.reuseBuffer(lastUncompressed.getBuffer());
if (isFirst) {
- streamBuffer.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
+ columnStreamData.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
isFirst = false;
}
- streamBuffer.getCacheBuffers().add(lastUncompressed.getBuffer());
+ columnStreamData.getCacheBuffers().add(lastUncompressed.getBuffer());
currentOffset = lastUncompressed.getEnd();
if (isDebugTracingEnabled) {
LOG.info("Adding an uncompressed buffer " + lastUncompressed.getBuffer());
@@ -1079,7 +936,7 @@ public class EncodedReaderImpl implements EncodedReader {
MemoryBuffer buffer = singleAlloc[0];
cache.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
- CacheChunk tcc = TCC_POOL.take();
+ CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, partOffset, candidateEnd);
copyAndReplaceUncompressedChunks(candidateCached, dest, tcc);
return tcc;
@@ -1092,7 +949,7 @@ public class EncodedReaderImpl implements EncodedReader {
MemoryBuffer buffer = singleAlloc[0];
cache.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
- CacheChunk tcc = TCC_POOL.take();
+ CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, bc.getOffset(), bc.getEnd());
copyUncompressedChunk(bc.getChunk(), dest);
bc.replaceSelfWith(tcc);
@@ -1138,9 +995,9 @@ public class EncodedReaderImpl implements EncodedReader {
public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
while (current != null) {
if (current instanceof ProcCacheChunk) {
- PCC_POOL.offer((ProcCacheChunk)current);
+ POOLS.pccPool.offer((ProcCacheChunk)current);
} else if (current instanceof CacheChunk) {
- TCC_POOL.offer((CacheChunk)current);
+ POOLS.tccPool.offer((CacheChunk)current);
}
current = current.next;
}
@@ -1373,7 +1230,7 @@ public class EncodedReaderImpl implements EncodedReader {
// Add it to result in order we are processing.
cacheBuffers.add(futureAlloc);
// Add it to the list of work to decompress.
- ProcCacheChunk cc = PCC_POOL.take();
+ ProcCacheChunk cc = POOLS.pccPool.take();
cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
toDecompress.add(cc);
@@ -1401,4 +1258,72 @@ public class EncodedReaderImpl implements EncodedReader {
private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
return isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
}
+
+
+ private static Pools createPools(PoolFactory pf) {
+ Pools pools = new Pools();
+ pools.pccPool = pf.createPool(1024, new PoolObjectHelper<ProcCacheChunk>() {
+ @Override
+ public ProcCacheChunk create() {
+ return new ProcCacheChunk();
+ }
+ @Override
+ public void resetBeforeOffer(ProcCacheChunk t) {
+ t.reset();
+ }
+ });
+ pools.tccPool = pf.createPool(1024, new PoolObjectHelper<CacheChunk>() {
+ @Override
+ public CacheChunk create() {
+ return new CacheChunk();
+ }
+ @Override
+ public void resetBeforeOffer(CacheChunk t) {
+ t.reset();
+ }
+ });
+ pools.ecbPool = pf.createEncodedColumnBatchPool();
+ pools.csdPool = pf.createColumnStreamDataPool();
+ return pools;
+ }
+
+ /** Pool factory that is used if another one isn't specified - just creates the objects. */
+ private static class NoopPoolFactory implements PoolFactory {
+ @Override
+ public <T> Pool<T> createPool(int size, final PoolObjectHelper<T> helper) {
+ return new Pool<T>() {
+ public void offer(T t) {
+ }
+ public T take() {
+ return helper.create();
+ }
+ };
+ }
+
+ @Override
+ public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() {
+ return createPool(0, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+ @Override
+ public OrcEncodedColumnBatch create() {
+ return new OrcEncodedColumnBatch();
+ }
+ @Override
+ public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+ }
+ });
+ }
+
+ @Override
+ public Pool<ColumnStreamData> createColumnStreamDataPool() {
+ return createPool(0, new PoolObjectHelper<ColumnStreamData>() {
+ @Override
+ public ColumnStreamData create() {
+ return new ColumnStreamData();
+ }
+ @Override
+ public void resetBeforeOffer(ColumnStreamData t) {
+ }
+ });
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index c1f3f57..8e7c62e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -20,13 +20,47 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
+import org.apache.hadoop.hive.common.Pool;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.ql.io.orc.DataReader;
/**
* The interface for reading encoded data from ORC files.
*/
public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
+
+ /** The factory that can create (or return) the pools used by encoded reader. */
+ public interface PoolFactory {
+ <T> Pool<T> createPool(int size, PoolObjectHelper<T> helper);
+ Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool();
+ Pool<ColumnStreamData> createColumnStreamDataPool();
+ }
+
+ /** Implementation of EncodedColumnBatch for ORC. */
+ public static final class OrcEncodedColumnBatch extends EncodedColumnBatch<OrcBatchKey> {
+ /** RG index indicating the data applies for all RGs (e.g. a string dictionary). */
+ public static final int ALL_RGS = -1;
+ public void init(long fileId, int stripeIx, int rgIx, int columnCount) {
+ if (batchKey == null) {
+ batchKey = new OrcBatchKey(fileId, stripeIx, rgIx);
+ } else {
+ batchKey.set(fileId, stripeIx, rgIx);
+ }
+ resetColumnArrays(columnCount);
+ }
+ }
+
+ /**
+ * Creates the encoded reader.
+ * @param fileId File ID to read, to use for cache lookups and such.
+ * @param dataCache Data cache to use for cache lookups.
+ * @param dataReader Data reader to read data not found in cache (from disk, HDFS, and such).
+ * @param pf Pool factory to create object pools.
+ * @return The reader.
+ */
EncodedReader encodedReader(
- long fileId, DataCache dataCache, DataReader dataReader) throws IOException;
+ long fileId, DataCache dataCache, DataReader dataReader, PoolFactory pf) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index b9de5e0..7b3c2a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -35,8 +35,8 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
@Override
public EncodedReader encodedReader(
- long fileId, DataCache dataCache, DataReader dataReader) throws IOException {
+ long fileId, DataCache dataCache, DataReader dataReader, PoolFactory pf) throws IOException {
return new EncodedReaderImpl(fileId, types,
- codec, bufferSize, rowIndexStride, dataCache, dataReader);
+ codec, bufferSize, rowIndexStride, dataCache, dataReader, pf);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3bf0a45f/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
new file mode 100644
index 0000000..e41a515
--- /dev/null
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+/** Simple object pool to prevent GC on small objects passed between threads. */
+public interface Pool<T> {
+ /** Object helper for objects stored in the pool. */
+ public interface PoolObjectHelper<T> {
+ /** Called to create an object when one cannot be provided. */
+ T create();
+ /** Called before the object is put in the pool (regardless of whether put succeeds). */
+ void resetBeforeOffer(T t);
+ }
+
+ T take();
+ void offer(T t);
+}
\ No newline at end of file