You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/03/15 21:50:26 UTC
[2/2] hive git commit: HIVE-12995 : LLAP: Synthetic file ids need
collision checks (Sergey Shelukhin, reviewed by Gopal V)
HIVE-12995 : LLAP: Synthetic file ids need collision checks (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26b5c7b5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26b5c7b5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26b5c7b5
Branch: refs/heads/master
Commit: 26b5c7b56a4f28ce3eabc0207566cce46b29b558
Parents: 4458b1a
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Mar 15 13:49:35 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Mar 15 13:50:14 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../llap/IncrementalObjectSizeEstimator.java | 7 +-
.../apache/hadoop/hive/llap/cache/Cache.java | 27 ---
.../hadoop/hive/llap/cache/LowLevelCache.java | 4 +-
.../hive/llap/cache/LowLevelCacheImpl.java | 28 +--
.../hadoop/hive/llap/cache/NoopCache.java | 33 ----
.../hive/llap/io/api/impl/LlapIoImpl.java | 6 +-
.../llap/io/decode/EncodedDataConsumer.java | 77 +-------
.../llap/io/decode/OrcColumnVectorProducer.java | 8 +-
.../llap/io/decode/OrcEncodedDataConsumer.java | 2 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 183 ++++---------------
.../hive/llap/io/metadata/OrcFileMetadata.java | 33 ++--
.../hive/llap/io/metadata/OrcMetadataCache.java | 12 +-
.../llap/io/metadata/OrcStripeMetadata.java | 19 +-
orc/src/java/org/apache/orc/FileMetadata.java | 2 +-
.../org/apache/hadoop/hive/ql/io/HdfsUtils.java | 18 +-
.../hadoop/hive/ql/io/SyntheticFileId.java | 100 ++++++++++
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 97 ++++++----
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 46 +++--
.../ql/io/orc/encoded/EncodedReaderImpl.java | 32 ++--
.../hive/ql/io/orc/encoded/OrcBatchKey.java | 20 +-
.../hive/ql/io/orc/encoded/OrcCacheKey.java | 58 ------
.../hadoop/hive/ql/io/orc/encoded/Reader.java | 10 +-
.../hive/ql/io/orc/encoded/ReaderImpl.java | 4 +-
.../hive/ql/io/orc/encoded/StreamUtils.java | 1 -
.../hive/ql/io/orc/TestInputOutputFormat.java | 18 +-
.../apache/hadoop/hive/common/io/DataCache.java | 4 +-
.../common/io/encoded/EncodedColumnBatch.java | 9 +-
28 files changed, 335 insertions(+), 525 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c992433..9fd6648 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1223,6 +1223,8 @@ public class HiveConf extends Configuration {
"metastore calls if metastore metadata cache is used."),
HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS("hive.orc.splits.include.fileid", true,
"Include file ID in splits on file systems thaty support it."),
+ HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS("hive.orc.splits.allow.synthetic.fileid", true,
+ "Allow synthetic file ID in splits on file systems that don't have a native one."),
HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000,
"Max cache size for keeping meta info about orc splits cached in the client."),
HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10,
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
index d33f724..7d68294 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.llap;
+import com.google.common.collect.Lists;
+import com.google.protobuf.UnknownFieldSet;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
@@ -35,13 +37,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import com.google.common.collect.Lists;
-import com.google.protobuf.UnknownFieldSet;
-
/**
* Creates size estimators for java objects. The estimators attempt to do most of the reflection
* work at initialization time, and also take some shortcuts, to minimize the amount of work done
@@ -622,6 +622,7 @@ public class IncrementalObjectSizeEstimator {
} catch (ClassNotFoundException e) {
// Ignore and hope for the best.
LlapIoImpl.LOG.warn("Cannot find " + className);
+ return;
}
IncrementalObjectSizeEstimator.createEstimators(clazz, sizeEstimators);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
deleted file mode 100644
index cee23a9..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cache;
-
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
-
-/** Dummy interface for now, might be different. */
-public interface Cache<CacheKey> {
- public ColumnStreamData[] cacheOrGet(CacheKey key, ColumnStreamData[] value);
- public ColumnStreamData[] get(CacheKey key);
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index 17d9fdf..1b61a6e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -49,7 +49,7 @@ public interface LowLevelCache {
* Some sort of InvalidCacheChunk could be placed to avoid them. TODO
* @param base base offset for the ranges (stripe/stream offset in case of ORC).
*/
- DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
+ DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset,
DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData);
/**
@@ -57,6 +57,6 @@ public interface LowLevelCache {
* @return null if all data was put; bitmask indicating which chunks were not put otherwise;
* the replacement chunks from cache are updated directly in the array.
*/
- long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] chunks,
+ long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 1132171..a60fed3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -43,8 +43,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
private final EvictionAwareAllocator allocator;
private final AtomicInteger newEvictions = new AtomicInteger(0);
private Thread cleanupThread = null;
- private final ConcurrentHashMap<Long, FileCache> cache =
- new ConcurrentHashMap<Long, FileCache>();
+ private final ConcurrentHashMap<Object, FileCache> cache =
+ new ConcurrentHashMap<Object, FileCache>();
private final LowLevelCachePolicy cachePolicy;
private final long cleanupInterval;
private final LlapDaemonCacheMetrics metrics;
@@ -75,11 +75,11 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
}
@Override
- public DiskRangeList getFileData(long fileId, DiskRangeList ranges, long baseOffset,
+ public DiskRangeList getFileData(Object fileKey, DiskRangeList ranges, long baseOffset,
DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
if (ranges == null) return null;
DiskRangeList prev = ranges.prev;
- FileCache subCache = cache.get(fileId);
+ FileCache subCache = cache.get(fileKey);
if (subCache == null || !subCache.incRef()) {
long totalMissed = ranges.getTotalLength();
metrics.incrCacheRequestedBytes(totalMissed);
@@ -232,11 +232,11 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
}
@Override
- public long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] buffers,
+ public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers,
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
long[] result = null;
assert buffers.length == ranges.length;
- FileCache subCache = getOrAddFileSubCache(fileId);
+ FileCache subCache = getOrAddFileSubCache(fileKey);
try {
for (int i = 0; i < ranges.length; ++i) {
LlapDataBuffer buffer = (LlapDataBuffer)buffers[i];
@@ -260,7 +260,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
}
if (DebugUtils.isTraceCachingEnabled()) {
LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for "
- + fileId + "@" + offset + " (base " + baseOffset + "); old " + oldVal
+ + fileKey + "@" + offset + " (base " + baseOffset + "); old " + oldVal
+ ", new " + buffer);
}
if (DebugUtils.isTraceLockingEnabled()) {
@@ -301,10 +301,10 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
* All this mess is necessary because we want to be able to remove sub-caches for fully
* evicted files. It may actually be better to have non-nested map with object keys?
*/
- private FileCache getOrAddFileSubCache(long fileId) {
+ private FileCache getOrAddFileSubCache(Object fileKey) {
FileCache newSubCache = null;
while (true) { // Overwhelmingly executes once.
- FileCache subCache = cache.get(fileId);
+ FileCache subCache = cache.get(fileKey);
if (subCache != null) {
if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
if (newSubCache == null) {
@@ -312,7 +312,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
newSubCache.incRef();
}
// Found a stale value we cannot incRef; try to replace it with new value.
- if (cache.replace(fileId, subCache, newSubCache)) return newSubCache;
+ if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
continue; // Someone else replaced/removed a stale value, try again.
}
// No value found.
@@ -320,11 +320,11 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
newSubCache = new FileCache();
newSubCache.incRef();
}
- FileCache oldSubCache = cache.putIfAbsent(fileId, newSubCache);
+ FileCache oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
// Someone created one in parallel and then it went stale.
- if (cache.replace(fileId, oldSubCache, newSubCache)) return newSubCache;
+ if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
// Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
}
}
@@ -463,7 +463,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
// Iterate thru all the filecaches. This is best-effort.
// If these super-long-lived iterators affect the map in some bad way,
// we'd need to sleep once per round instead.
- Iterator<Map.Entry<Long, FileCache>> iter = cache.entrySet().iterator();
+ Iterator<Map.Entry<Object, FileCache>> iter = cache.entrySet().iterator();
boolean isPastEndTime = false;
while (iter.hasNext()) {
FileCache fc = iter.next().getValue();
@@ -516,7 +516,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
@Override
public String debugDumpForOom() {
StringBuilder sb = new StringBuilder("File cache state ");
- for (Map.Entry<Long, FileCache> e : cache.entrySet()) {
+ for (Map.Entry<Object, FileCache> e : cache.entrySet()) {
if (!e.getValue().incRef()) continue;
try {
sb.append("\n file " + e.getKey());
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
deleted file mode 100644
index d0461e8..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cache;
-
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
-
-public class NoopCache<CacheKey> implements Cache<CacheKey> {
- @Override
- public ColumnStreamData[] cacheOrGet(CacheKey key, ColumnStreamData[] value) {
- return value;
- }
-
- @Override
- public ColumnStreamData[] get(CacheKey key) {
- return null; // TODO: ensure real implementation increases refcount
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index d2c1907..dbee823 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
@@ -50,7 +49,6 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.metrics2.util.MBeans;
@@ -91,8 +89,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
LOG.info("Started llap daemon metrics with displayName: " + displayName +
" sessionId: " + sessionId);
- Cache<OrcCacheKey> cache = null; // High-level cache is not implemented or supported.
-
OrcMetadataCache metadataCache = null;
LowLevelCacheImpl orcCache = null;
BufferUsageManager bufferManager = null;
@@ -131,7 +127,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
// TODO: this should depends on input format and be in a map, or something.
this.cvp = new OrcColumnVectorProducer(
- metadataCache, orcCache, bufferManager, cache, conf, cacheMetrics, queueMetrics);
+ metadataCache, orcCache, bufferManager, conf, cacheMetrics, queueMetrics);
if (LOGL.isInfoEnabled()) {
LOG.info("LLAP IO initialized");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index b81e97d..137acb0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hive.llap.io.decode;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.concurrent.Callable;
import org.apache.hadoop.hive.common.Pool;
@@ -30,14 +27,9 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hive.common.util.FixedSizedObjectPool;
-/**
- *
- */
public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
implements Consumer<BatchType>, ReadPipeline {
private volatile boolean isStopped = false;
- // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb.
- private final HashMap<BatchKey, BatchType> pendingData = new HashMap<>();
private ConsumerFeedback<BatchType> upstreamFeedback;
private final Consumer<ColumnVectorBatch> downstreamConsumer;
private Callable<Void> readCallable;
@@ -76,50 +68,15 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
@Override
public void consumeData(BatchType data) {
- // TODO: data arrives in whole batches now, not in columns. We could greatly simplify this.
- BatchType targetBatch = null;
- boolean localIsStopped = false;
- Integer targetBatchVersion = null;
- synchronized (pendingData) {
- localIsStopped = isStopped;
- if (!localIsStopped) {
- targetBatch = pendingData.get(data.getBatchKey());
- if (targetBatch == null) {
- targetBatch = data;
- pendingData.put(data.getBatchKey(), data);
- }
- // We have the map locked; the code the throws things away from map only bumps the version
- // under the same map lock; code the throws things away here only bumps the version when
- // the batch was taken out of the map.
- targetBatchVersion = targetBatch.version;
- }
- queueMetrics.setQueueSize(pendingData.size());
- }
- if (localIsStopped) {
- returnSourceData(data);
- return;
- }
- assert targetBatchVersion != null;
- synchronized (targetBatch) {
- if (targetBatch != data) {
- throw new UnsupportedOperationException("Merging is not supported");
- }
- synchronized (pendingData) {
- targetBatch = isStopped ? null : pendingData.remove(data.getBatchKey());
- // Check if someone already threw this away and changed the version.
- localIsStopped = (targetBatchVersion != targetBatch.version);
- }
- // We took the batch out of the map. No more contention with stop possible.
- }
- if (localIsStopped && (targetBatch != data)) {
+ if (isStopped) {
returnSourceData(data);
return;
}
long start = System.currentTimeMillis();
- decodeBatch(targetBatch, downstreamConsumer);
+ decodeBatch(data, downstreamConsumer);
long end = System.currentTimeMillis();
queueMetrics.addProcessingTime(end - start);
- returnSourceData(targetBatch);
+ returnSourceData(data);
}
/**
@@ -127,7 +84,6 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
* of the ECB in question; or, if ECB is still in pendingData, pendingData must be locked.
*/
private void returnSourceData(BatchType data) {
- ++data.version;
upstreamFeedback.returnData(data);
}
@@ -136,19 +92,12 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
@Override
public void setDone() {
- synchronized (pendingData) {
- if (!pendingData.isEmpty()) {
- throw new AssertionError("Not all data has been sent downstream: " + pendingData.size());
- }
- }
downstreamConsumer.setDone();
}
-
@Override
public void setError(Throwable t) {
downstreamConsumer.setError(t);
- dicardPendingData(false);
}
@Override
@@ -156,28 +105,10 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
cvbPool.offer(data);
}
- private void dicardPendingData(boolean isStopped) {
- List<BatchType> batches = new ArrayList<BatchType>(
- pendingData.size());
- synchronized (pendingData) {
- if (isStopped) {
- this.isStopped = true;
- }
- for (BatchType ecb : pendingData.values()) {
- ++ecb.version;
- batches.add(ecb);
- }
- pendingData.clear();
- }
- for (BatchType batch : batches) {
- upstreamFeedback.returnData(batch);
- }
- }
-
@Override
public void stop() {
upstreamFeedback.stop();
- dicardPendingData(true);
+ this.isStopped = true;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 18191da..37fc8d0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -34,14 +33,12 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
-import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.mapred.FileSplit;
public class OrcColumnVectorProducer implements ColumnVectorProducer {
private final OrcMetadataCache metadataCache;
- private final Cache<OrcCacheKey> cache;
private final LowLevelCache lowLevelCache;
private final BufferUsageManager bufferManager;
private final Configuration conf;
@@ -50,7 +47,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
private LlapDaemonQueueMetrics queueMetrics;
public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
- LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, Cache<OrcCacheKey> cache,
+ LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager,
Configuration conf, LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
if (LlapIoImpl.LOGL.isInfoEnabled()) {
LlapIoImpl.LOG.info("Initializing ORC column vector producer");
@@ -59,7 +56,6 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
this.metadataCache = metadataCache;
this.lowLevelCache = lowLevelCache;
this.bufferManager = bufferManager;
- this.cache = cache;
this.conf = conf;
this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
this.cacheMetrics = metrics;
@@ -74,7 +70,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, queueMetrics);
- OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, cache,
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters);
edc.init(reader, reader);
return edc;
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 28cae87..7ee263d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -54,6 +54,7 @@ public class OrcEncodedDataConsumer
Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt,
QueryFragmentCounters counters, LlapDaemonQueueMetrics queueMetrics) {
super(consumer, colCount, queueMetrics);
+ // TODO: get rid of this
this.skipCorrupt = skipCorrupt;
this.counters = counters;
}
@@ -62,7 +63,6 @@ public class OrcEncodedDataConsumer
assert fileMetadata == null;
fileMetadata = f;
stripes = new OrcStripeMetadata[f.getStripes().size()];
- // TODO: get rid of this
codec = WriterImpl.createCodec(fileMetadata.getCompressionKind());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 8111c6d..eb251a8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -71,7 +70,6 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
@@ -90,7 +88,7 @@ import org.apache.tez.common.CallableWithNdc;
* consumer. It also serves as ConsumerFeedback that receives processed EncodedColumnBatch-es.
*/
public class OrcEncodedDataReader extends CallableWithNdc<Void>
- implements ConsumerFeedback<OrcEncodedColumnBatch>, Consumer<OrcEncodedColumnBatch> {
+ implements ConsumerFeedback<OrcEncodedColumnBatch> {
private static final Logger LOG = LoggerFactory.getLogger(OrcEncodedDataReader.class);
public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
@@ -135,7 +133,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private final LowLevelCache lowLevelCache;
private final BufferUsageManager bufferManager;
private final Configuration conf;
- private final Cache<OrcCacheKey> cache;
private final FileSplit split;
private List<Integer> columnIds;
private final SearchArgument sarg;
@@ -150,7 +147,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private Reader orcReader;
private MetadataReader metadataReader;
private EncodedReader stripeReader;
- private Long fileId;
+ private Object fileKey;
private FileSystem fs;
/**
* readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be
@@ -162,13 +159,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private volatile boolean isPaused = false;
public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
- Cache<OrcCacheKey> cache, OrcMetadataCache metadataCache, Configuration conf,
- FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
- OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
+ OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List<Integer> columnIds,
+ SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer,
+ QueryFragmentCounters counters) {
this.lowLevelCache = lowLevelCache;
this.metadataCache = metadataCache;
this.bufferManager = bufferManager;
- this.cache = cache;
this.conf = conf;
this.split = split;
this.columnIds = columnIds;
@@ -230,10 +226,10 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
fs = split.getPath().getFileSystem(conf);
- fileId = determineFileId(fs, split,
+ fileKey = determineFileId(fs, split,
HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
- + (fileId == null ? "" : " (" + fileId + ")"));
+ + (fileKey == null ? "" : " (" + fileKey + ")"));
try {
fileMetadata = getOrReadFileMetadata();
@@ -307,27 +303,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// read for every stripe (null means read all of them - the usual path). In any case,
// readState will be modified for column x rgs that were fetched from high-level cache.
List<Integer>[] stripeColsToRead = null;
- if (cache != null) {
- try {
- stripeColsToRead = produceDataFromCache(stride);
- } catch (Throwable t) {
- // produceDataFromCache handles its own cleanup.
- consumer.setError(t);
- cleanupReaders();
- recordReaderTime(startTime);
- return null;
- }
- }
// 5. Create encoded data reader.
- // In case if we have high-level cache, we will intercept the data and add it there;
- // otherwise just pass the data directly to the consumer.
- Consumer<OrcEncodedColumnBatch> dataConsumer = (cache == null) ? this.consumer : this;
try {
ensureOrcReader();
// Reader creating updates HDFS counters, don't do it here.
DataWrapperForOrc dw = new DataWrapperForOrc();
- stripeReader = orcReader.encodedReader(fileId, dw, dw, POOL_FACTORY);
+ stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY);
stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled());
} catch (Throwable t) {
consumer.setError(t);
@@ -338,9 +320,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// 6. Read data.
// TODO: I/O threadpool could be here - one thread per stripe; for now, linear.
- boolean hasFileId = this.fileId != null;
- long fileId = hasFileId ? this.fileId : 0;
- OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileId, -1, 0) : null;
+ boolean hasFileId = this.fileKey != null;
+ OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, -1, 0) : null;
for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
if (processStop()) {
cleanupReaders();
@@ -369,7 +350,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
if (colRgs.length > 0 && colRgs[0] == SargApplier.READ_NO_RGS) continue;
// 6.1. Determine the columns to read (usually the same as requested).
- if (cache == null || cols == null || cols.size() == colRgs.length) {
+ if (cols == null || cols.size() == colRgs.length) {
cols = columnIds;
stripeIncludes = globalIncludes;
} else {
@@ -393,7 +374,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
ensureMetadataReader();
long startTimeHdfs = counters.startTimeCounter();
- stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileId, stripeIx, 0),
+ stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0),
metadataReader, stripe, stripeIncludes, sargColumns);
counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs);
if (hasFileId && metadataCache != null) {
@@ -439,7 +420,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// data it receives for one stripe. We could probably interrupt it, if it checked that.
stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(),
stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes,
- colRgs, dataConsumer);
+ colRgs, consumer);
} catch (Throwable t) {
consumer.setError(t);
cleanupReaders();
@@ -450,7 +431,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// Done with all the things.
recordReaderTime(startTime);
- dataConsumer.setDone();
+ consumer.setDone();
if (DebugUtils.isTraceMttEnabled()) {
LlapIoImpl.LOG.info("done processing " + split);
}
@@ -525,12 +506,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
return true;
}
- private static Long determineFileId(FileSystem fs, FileSplit split,
+ private static Object determineFileId(FileSystem fs, FileSplit split,
boolean allowSynthetic) throws IOException {
if (split instanceof OrcSplit) {
- Long fileId = ((OrcSplit)split).getFileId();
- if (fileId != null) {
- return fileId;
+ Object fileKey = ((OrcSplit)split).getFileKey();
+ if (fileKey != null) {
+ return fileKey;
}
}
LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
@@ -600,8 +581,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void ensureOrcReader() throws IOException {
if (orcReader != null) return;
Path path = split.getPath();
- if (fileId != null && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
- path = HdfsUtils.getFileIdPath(fs, path, fileId);
+ if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+ path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
}
if (DebugUtils.isTraceOrcEnabled()) {
LOG.info("Creating reader for " + path + " (" + split.getPath() + ")");
@@ -617,8 +598,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
*/
private OrcFileMetadata getOrReadFileMetadata() throws IOException {
OrcFileMetadata metadata = null;
- if (fileId != null && metadataCache != null) {
- metadata = metadataCache.getFileMetadata(fileId);
+ if (fileKey != null && metadataCache != null) {
+ metadata = metadataCache.getFileMetadata(fileKey);
if (metadata != null) {
counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
return metadata;
@@ -628,8 +609,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
ensureOrcReader();
// We assume this call doesn't touch HDFS because everything is already read; don't add time.
- metadata = new OrcFileMetadata(fileId != null ? fileId : 0, orcReader);
- if (fileId == null || metadataCache == null) return metadata;
+ metadata = new OrcFileMetadata(fileKey, orcReader);
+ if (fileKey == null || metadataCache == null) return metadata;
return metadataCache.putFileMetadata(metadata);
}
@@ -639,9 +620,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private ArrayList<OrcStripeMetadata> readStripesMetadata(
boolean[] globalInc, boolean[] sargColumns) throws IOException {
ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(readState.length);
- boolean hasFileId = this.fileId != null;
- long fileId = hasFileId ? this.fileId : 0;
- OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileId, 0, 0) : null;
+ boolean hasFileId = this.fileKey != null;
+ OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, 0, 0) : null;
for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
OrcStripeMetadata value = null;
int stripeIx = stripeIxMod + stripeIxFrom;
@@ -655,7 +635,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
StripeInformation si = fileMetadata.getStripes().get(stripeIx);
if (value == null) {
long startTime = counters.startTimeCounter();
- value = new OrcStripeMetadata(new OrcBatchKey(fileId, stripeIx, 0),
+ value = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0),
metadataReader, si, globalInc, sargColumns);
counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
if (hasFileId && metadataCache != null) {
@@ -836,105 +816,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
readState = new boolean[stripeIxTo - stripeIxFrom][][];
}
- // TODO: split by stripe? we do everything by stripe, and it might be faster
- /**
- * Takes the data from high-level cache for all stripes and returns to consumer.
- * @return List of columns to read per stripe, if any columns were fully eliminated by cache.
- */
- private List<Integer>[] produceDataFromCache(int rowIndexStride) throws IOException {
- OrcCacheKey key = new OrcCacheKey(fileId, -1, -1, -1);
- // For each stripe, keep a list of columns that are not fully in cache (null => all of them).
- @SuppressWarnings("unchecked")
- List<Integer>[] stripeColsNotInCache = new List[readState.length];
- for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
- key.stripeIx = stripeIxFrom + stripeIxMod;
- boolean[][] cols = readState[stripeIxMod];
- boolean[] isMissingAnyRgs = new boolean[cols.length];
- int totalRgCount = getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride);
- for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) {
- OrcEncodedColumnBatch col = ECB_POOL.take();
- col.init(fileId, key.stripeIx, rgIx, cols.length);
- boolean hasAnyCached = false;
- try {
- key.rgIx = rgIx;
- for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
- boolean[] readMask = cols[colIxMod];
- // Check if RG is eliminated by SARG
- if ((readMask == SargApplier.READ_NO_RGS) || (readMask != SargApplier.READ_ALL_RGS
- && (readMask.length <= rgIx || !readMask[rgIx]))) continue;
- key.colIx = columnIds.get(colIxMod);
- ColumnStreamData[] cached = cache.get(key);
- if (cached == null) {
- isMissingAnyRgs[colIxMod] = true;
- continue;
- }
- assert cached.length == OrcEncodedColumnBatch.MAX_DATA_STREAMS;
- col.setAllStreamsData(colIxMod, key.colIx, cached);
- hasAnyCached = true;
- if (readMask == SargApplier.READ_ALL_RGS) {
- // We were going to read all RGs, but some were in cache, allocate the mask.
- cols[colIxMod] = readMask = new boolean[totalRgCount];
- Arrays.fill(readMask, true);
- }
- readMask[rgIx] = false; // Got from cache, don't read from disk.
- }
- } catch (Throwable t) {
- // TODO: Any cleanup needed to release data in col back to cache should be here.
- throw (t instanceof IOException) ? (IOException)t : new IOException(t);
- }
- if (hasAnyCached) {
- consumer.consumeData(col);
- }
- }
- boolean makeStripeColList = false; // By default assume we'll fetch all original columns.
- for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
- if (isMissingAnyRgs[colIxMod]) {
- if (makeStripeColList) {
- stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod));
- }
- } else if (!makeStripeColList) {
- // Some columns were fully in cache. Make a per-stripe col list, add previous columns.
- makeStripeColList = true;
- stripeColsNotInCache[stripeIxMod] = new ArrayList<Integer>(cols.length - 1);
- for (int i = 0; i < colIxMod; ++i) {
- stripeColsNotInCache[stripeIxMod].add(columnIds.get(i));
- }
- }
- }
- }
- return stripeColsNotInCache;
- }
-
- @Override
- public void setDone() {
- consumer.setDone();
- }
-
- @Override
- public void consumeData(OrcEncodedColumnBatch data) {
- // Store object in cache; create new key object - cannot be reused.
- assert cache != null;
- throw new UnsupportedOperationException("not implemented");
- /*for (int i = 0; i < data.getColumnData().length; ++i) {
- OrcCacheKey key = new OrcCacheKey(data.getBatchKey(), data.getColumnIxs()[i]);
- ColumnStreamData[] toCache = data.getColumnData()[i];
- ColumnStreamData[] cached = cache.cacheOrGet(key, toCache);
- if (toCache != cached) {
- for (ColumnStreamData sb : toCache) {
- if (sb.decRef() != 0) continue;
- lowLevelCache.releaseBuffers(sb.getCacheBuffers());
- }
- data.getColumnData()[i] = cached;
- }
- }
- consumer.consumeData(data);*/
- }
-
- @Override
- public void setError(Throwable t) {
- consumer.setError(t);
- }
-
private class DataWrapperForOrc implements DataReader, DataCache {
private final DataReader orcDataReader;
@@ -948,17 +829,17 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
@Override
- public DiskRangeList getFileData(long fileId, DiskRangeList range,
+ public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
return (lowLevelCache == null) ? range : lowLevelCache.getFileData(
- fileId, range, baseOffset, factory, counters, gotAllData);
+ fileKey, range, baseOffset, factory, counters, gotAllData);
}
@Override
- public long[] putFileData(long fileId, DiskRange[] ranges,
+ public long[] putFileData(Object fileKey, DiskRange[] ranges,
MemoryBuffer[] data, long baseOffset) {
return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
- fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
+ fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
}
@Override
@@ -989,7 +870,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect);
counters.recordHdfsTime(startTime);
if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) {
- LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + baseOffset
+ LOG.info("Disk ranges after disk read (file " + fileKey + ", base offset " + baseOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(result));
}
return result;
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index 2e4e0c5..4e42a0f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -18,23 +18,22 @@
package org.apache.hadoop.hive.llap.io.metadata;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-
import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.FileMetadata;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.ReaderImpl.StripeInformationImpl;
-import org.apache.orc.StripeInformation;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.FileMetadata;
import org.apache.orc.OrcProto;
-
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.orc.StripeInformation;
/** ORC file metadata. Currently contains some duplicate info due to how different parts
* of ORC use different info. Ideally we would get rid of protobuf structs in code beyond reading,
@@ -46,7 +45,7 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
private final List<OrcProto.StripeStatistics> stripeStats;
private final List<OrcProto.Type> types;
private final List<OrcProto.ColumnStatistics> fileStats;
- private final long fileId;
+ private final Object fileKey;
private final CompressionKind compressionKind;
private final int rowIndexStride;
private final int compressionBufferSize;
@@ -61,16 +60,18 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS;
private final static ObjectEstimator SIZE_ESTIMATOR;
static {
- OrcFileMetadata ofm = createDummy(0);
+ OrcFileMetadata ofm = createDummy(new SyntheticFileId());
SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(ofm);
IncrementalObjectSizeEstimator.addEstimator(
"com.google.protobuf.LiteralByteString", SIZE_ESTIMATORS);
+ // Add long for the regular file ID estimation.
+ IncrementalObjectSizeEstimator.createEstimators(Long.class, SIZE_ESTIMATORS);
SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcFileMetadata.class);
}
@VisibleForTesting
- public static OrcFileMetadata createDummy(int fileId) {
- OrcFileMetadata ofm = new OrcFileMetadata(fileId);
+ public static OrcFileMetadata createDummy(Object fileKey) {
+ OrcFileMetadata ofm = new OrcFileMetadata(fileKey);
ofm.stripes.add(new StripeInformationImpl(
OrcProto.StripeInformation.getDefaultInstance()));
ofm.fileStats.add(OrcProto.ColumnStatistics.getDefaultInstance());
@@ -87,8 +88,8 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
}
// Ctor for memory estimation and tests
- private OrcFileMetadata(int fileId) {
- this.fileId = fileId;
+ private OrcFileMetadata(Object fileKey) {
+ this.fileKey = fileKey;
stripes = new ArrayList<StripeInformation>();
versionList = new ArrayList<Integer>();
fileStats = new ArrayList<>();
@@ -101,8 +102,8 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
compressionKind = CompressionKind.NONE;
}
- public OrcFileMetadata(long fileId, Reader reader) {
- this.fileId = fileId;
+ public OrcFileMetadata(Object fileKey, Reader reader) {
+ this.fileKey = fileKey;
this.stripeStats = reader.getOrcProtoStripeStatistics();
this.compressionKind = reader.getCompressionKind();
this.compressionBufferSize = reader.getCompressionSize();
@@ -183,8 +184,8 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
}
@Override
- public long getFileId() {
- return fileId;
+ public Object getFileKey() {
+ return fileKey;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index 43c8fb3..e970137 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
public class OrcMetadataCache {
- private final ConcurrentHashMap<Long, OrcFileMetadata> metadata =
- new ConcurrentHashMap<Long, OrcFileMetadata>();
+ private final ConcurrentHashMap<Object, OrcFileMetadata> metadata =
+ new ConcurrentHashMap<Object, OrcFileMetadata>();
private final ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata> stripeMetadata =
new ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata>();
private final MemoryManager memoryManager;
@@ -42,7 +42,7 @@ public class OrcMetadataCache {
public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) {
long memUsage = metaData.getMemoryUsage();
memoryManager.reserveMemory(memUsage, false);
- OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileId(), metaData);
+ OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileKey(), metaData);
// See OrcFileMetadata; it is always unlocked, so we just "touch" it here to simulate use.
if (val == null) {
val = metaData;
@@ -75,12 +75,12 @@ public class OrcMetadataCache {
return stripeMetadata.get(stripeKey);
}
- public OrcFileMetadata getFileMetadata(long fileId) throws IOException {
- return metadata.get(fileId);
+ public OrcFileMetadata getFileMetadata(Object fileKey) throws IOException {
+ return metadata.get(fileKey);
}
public void notifyEvicted(OrcFileMetadata buffer) {
- metadata.remove(buffer.getFileId());
+ metadata.remove(buffer.getFileKey());
// See OrcFileMetadata - we don't clear the object, it will be GCed when released by users.
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 8479d22..82187bd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -17,22 +17,21 @@
*/
package org.apache.hadoop.hive.llap.io.metadata;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-
import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.OrcIndex;
-import org.apache.orc.StripeInformation;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
import org.apache.orc.OrcProto;
-
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.impl.MetadataReader;
+import org.apache.orc.impl.OrcIndex;
public class OrcStripeMetadata extends LlapCacheableBuffer {
private final OrcBatchKey stripeKey;
@@ -46,10 +45,12 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS;
private final static ObjectEstimator SIZE_ESTIMATOR;
static {
- OrcStripeMetadata osm = createDummy(0);
+ OrcStripeMetadata osm = createDummy(new SyntheticFileId());
SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(osm);
IncrementalObjectSizeEstimator.addEstimator(
"com.google.protobuf.LiteralByteString", SIZE_ESTIMATORS);
+ // Add long for the regular file ID estimation.
+ IncrementalObjectSizeEstimator.createEstimators(Long.class, SIZE_ESTIMATORS);
SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcStripeMetadata.class);
}
@@ -65,7 +66,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
estimatedMemUsage = SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS);
}
- private OrcStripeMetadata(long id) {
+ private OrcStripeMetadata(Object id) {
stripeKey = new OrcBatchKey(id, 0, 0);
encodings = new ArrayList<>();
streams = new ArrayList<>();
@@ -73,7 +74,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
}
@VisibleForTesting
- public static OrcStripeMetadata createDummy(long id) {
+ public static OrcStripeMetadata createDummy(Object id) {
OrcStripeMetadata dummy = new OrcStripeMetadata(id);
dummy.encodings.add(OrcProto.ColumnEncoding.getDefaultInstance());
dummy.streams.add(OrcProto.Stream.getDefaultInstance());
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/orc/src/java/org/apache/orc/FileMetadata.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/FileMetadata.java b/orc/src/java/org/apache/orc/FileMetadata.java
index d63bdcc..807e696 100644
--- a/orc/src/java/org/apache/orc/FileMetadata.java
+++ b/orc/src/java/org/apache/orc/FileMetadata.java
@@ -44,7 +44,7 @@ public interface FileMetadata {
int getFlattenedColumnCount();
- long getFileId();
+ Object getFileKey();
List<Integer> getVersionList();
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
index af64fc8..1a40847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -33,30 +33,18 @@ public class HdfsUtils {
private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
private static final Logger LOG = LoggerFactory.getLogger(HdfsUtils.class);
- public static Long getFileId(
+ public static Object getFileId(
FileSystem fileSystem, Path path, boolean allowSynthetic) throws IOException {
- String pathStr = path.toUri().getPath();
if (fileSystem instanceof DistributedFileSystem) {
- return SHIMS.getFileId(fileSystem, pathStr);
+ return SHIMS.getFileId(fileSystem, path.toUri().getPath());
}
if (!allowSynthetic) {
LOG.warn("Cannot get unique file ID from "
+ fileSystem.getClass().getSimpleName() + "; returning null");
return null;
}
- // If we are not on DFS, we just hash the file name + size and hope for the best.
- // TODO: we assume it only happens in tests. Fix?
- int nameHash = pathStr.hashCode();
FileStatus fs = fileSystem.getFileStatus(path);
- long fileSize = fs.getLen(), modTime = fs.getModificationTime();
- int fileSizeHash = (int)(fileSize ^ (fileSize >>> 32)),
- modTimeHash = (int)(modTime ^ (modTime >>> 32)),
- combinedHash = modTimeHash ^ fileSizeHash;
- long id = (((long)nameHash & 0xffffffffL) << 32) | ((long)combinedHash & 0xffffffffL);
- LOG.warn("Cannot get unique file ID from "
- + fileSystem.getClass().getSimpleName() + "; using " + id + " (" + pathStr
- + "," + nameHash + "," + fileSize + ")");
- return id;
+ return new SyntheticFileId(path, fs.getLen(), fs.getModificationTime());
}
// TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
new file mode 100644
index 0000000..905bbb9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+public final class SyntheticFileId implements Writable {
+ private long pathHash;
+ private long modTime;
+ private long length;
+
+ /** Writable ctor. */
+ public SyntheticFileId() {
+ }
+
+ public SyntheticFileId(Path path, long len, long modificationTime) {
+ this.pathHash = hashCode(path.toUri().getPath());
+ this.modTime = modificationTime;
+ this.length = len;
+ }
+
+ public SyntheticFileId(FileStatus file) {
+ this(file.getPath(), file.getLen(), file.getModificationTime());
+ }
+
+ @Override
+ public String toString() {
+ return "[" + pathHash + ", " + modTime + ", " + length + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = prime + (int) (length ^ (length >>> 32));
+ result = prime * result + (int) (modTime ^ (modTime >>> 32));
+ return prime * result + (int) (pathHash ^ (pathHash >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof SyntheticFileId)) return false;
+ SyntheticFileId other = (SyntheticFileId)obj;
+ return length == other.length && modTime == other.modTime && pathHash == other.pathHash;
+ }
+
+ private long hashCode(String path) {
+ long h = 0;
+ for (int i = 0; i < path.length(); ++i) {
+ h = 1223 * h + path.charAt(i);
+ }
+ return h;
+ }
+
+ /** Length allows for some backward compatibility wrt field addition. */
+ private static final short THREE_LONGS = 24;
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(THREE_LONGS);
+ out.writeLong(pathHash);
+ out.writeLong(modTime);
+ out.writeLong(length);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ short len = in.readShort();
+ if (len < THREE_LONGS) throw new IOException("Need at least " + THREE_LONGS + " bytes");
+ pathHash = in.readLong();
+ modTime = in.readLong();
+ length = in.readLong();
+ int extraBytes = len - THREE_LONGS;
+ if (extraBytes > 0) {
+ in.skipBytes(extraBytes);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0ebcd2a..cd2a668 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -727,10 +728,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
boolean[] covered;
private List<Future<List<OrcSplit>>> splitFuturesRef;
private final UserGroupInformation ugi;
+ private final boolean allowSyntheticFileIds;
public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData> deltas,
- boolean[] covered, UserGroupInformation ugi) {
+ boolean[] covered, UserGroupInformation ugi, boolean allowSyntheticFileIds) {
assert !children.isEmpty();
this.context = context;
this.dirs = Lists.newArrayList(new ETLDir(dir, fs, children.size()));
@@ -739,6 +741,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
this.deltas = deltas;
this.covered = covered;
this.ugi = ugi;
+ this.allowSyntheticFileIds = allowSyntheticFileIds;
}
@Override
@@ -860,7 +863,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size());
UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi;
for (SplitInfo splitInfo : splits) {
- localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo, tpUgi)));
+ localList.add(Context.threadPool.submit(
+ new SplitGenerator(splitInfo, tpUgi, allowSyntheticFileIds)));
}
synchronized (splitFutures) {
splitFutures.addAll(localList);
@@ -873,16 +877,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* as opposed to query execution (split generation does not read or cache file footers).
*/
static final class BISplitStrategy extends ACIDSplitStrategy {
- List<HdfsFileStatusWithId> fileStatuses;
- boolean isOriginal;
- List<DeltaMetaData> deltas;
- FileSystem fs;
- Context context;
- Path dir;
+ private final List<HdfsFileStatusWithId> fileStatuses;
+ private final boolean isOriginal;
+ private final List<DeltaMetaData> deltas;
+ private final FileSystem fs;
+ private final Context context;
+ private final Path dir;
+ private final boolean allowSyntheticFileIds;
public BISplitStrategy(Context context, FileSystem fs,
Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal,
- List<DeltaMetaData> deltas, boolean[] covered) {
+ List<DeltaMetaData> deltas, boolean[] covered, boolean allowSyntheticFileIds) {
super(dir, context.numBuckets, deltas, covered);
this.context = context;
this.fileStatuses = fileStatuses;
@@ -890,6 +895,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
this.deltas = deltas;
this.fs = fs;
this.dir = dir;
+ this.allowSyntheticFileIds = allowSyntheticFileIds;
}
@Override
@@ -900,7 +906,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (fileStatus.getLen() != 0) {
String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue()
.getHosts();
- OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), file.getFileId(), 0,
+ Object fileKey = file.getFileId();
+ if (fileKey == null && allowSyntheticFileIds) {
+ fileKey = new SyntheticFileId(fileStatus);
+ }
+ OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, 0,
fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1);
splits.add(orcSplit);
}
@@ -1029,8 +1039,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
static final class SplitGenerator implements Callable<List<OrcSplit>> {
private final Context context;
private final FileSystem fs;
- private final HdfsFileStatusWithId fileWithId;
private final FileStatus file;
+ private final Long fsFileId;
private final long blockSize;
private final TreeMap<Long, BlockLocation> locations;
private final FileInfo fileInfo;
@@ -1046,31 +1056,34 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private long projColsUncompressedSize;
private final List<OrcSplit> deltaSplits;
private final UserGroupInformation ugi;
+ private final boolean allowSyntheticFileIds;
- public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi) throws IOException {
+ public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
+ boolean allowSyntheticFileIds) throws IOException {
this.ugi = ugi;
this.context = splitInfo.context;
this.fs = splitInfo.fs;
- this.fileWithId = splitInfo.fileWithId;
- this.file = this.fileWithId.getFileStatus();
+ this.file = splitInfo.fileWithId.getFileStatus();
+ this.fsFileId = splitInfo.fileWithId.getFileId();
this.blockSize = this.file.getBlockSize();
this.fileInfo = splitInfo.fileInfo;
// TODO: potential DFS call
- this.locations = SHIMS.getLocationsWithOffset(fs, fileWithId.getFileStatus());
+ this.locations = SHIMS.getLocationsWithOffset(fs, file);
this.isOriginal = splitInfo.isOriginal;
this.deltas = splitInfo.deltas;
this.hasBase = splitInfo.hasBase;
this.projColsUncompressedSize = -1;
this.deltaSplits = splitInfo.getSplits();
+ this.allowSyntheticFileIds = allowSyntheticFileIds;
}
Path getPath() {
- return fileWithId.getFileStatus().getPath();
+ return file.getPath();
}
@Override
public String toString() {
- return "splitter(" + fileWithId.getFileStatus().getPath() + ")";
+ return "splitter(" + file.getPath() + ")";
}
/**
@@ -1133,7 +1146,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
maxSize = Math.max(maxSize, val.get());
}
} else {
- throw new IOException("File " + fileWithId.getFileStatus().getPath().toString() +
+ throw new IOException("File " + file.getPath().toString() +
" should have had overlap on block starting at " + block.getOffset());
}
}
@@ -1161,7 +1174,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final double splitRatio = (double) length / (double) fileLen;
final long scaledProjSize = projColsUncompressedSize > 0 ?
(long) (splitRatio * projColsUncompressedSize) : fileLen;
- return new OrcSplit(file.getPath(), fileWithId.getFileId(), offset, length, hosts,
+ Object fileKey = fsFileId;
+ if (fileKey == null && allowSyntheticFileIds) {
+ fileKey = new SyntheticFileId(file);
+ }
+ return new OrcSplit(file.getPath(), fileKey, offset, length, hosts,
fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize);
}
@@ -1274,7 +1291,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
&& fileInfo.writerVersion != null;
// We assume that if we needed to create a reader, we need to cache it to meta cache.
// TODO: This will also needlessly overwrite it in local cache for now.
- context.footerCache.put(fileWithId.getFileId(), file, fileInfo.fileMetaInfo, orcReader);
+ context.footerCache.put(fsFileId, file, fileInfo.fileMetaInfo, orcReader);
}
} else {
Reader orcReader = createOrcReader();
@@ -1286,8 +1303,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
fileMetaInfo = context.footerInSplits ?
((ReaderImpl) orcReader).getFileMetaInfo() : null;
if (context.cacheStripeDetails) {
- Long fileId = fileWithId.getFileId();
- context.footerCache.put(fileId, file, fileMetaInfo, orcReader);
+ context.footerCache.put(fsFileId, file, fileMetaInfo, orcReader);
}
}
includedCols = genIncludedColumns(types, context.conf, isOriginal);
@@ -1338,6 +1354,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
LOG.info("ORC pushdown predicate: " + context.sarg);
}
boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
+ boolean allowSyntheticFileIds = useFileIds && HiveConf.getBoolVar(
+ conf, ConfVars.HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS);
List<OrcSplit> splits = Lists.newArrayList();
List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
List<Future<Void>> strategyFutures = Lists.newArrayList();
@@ -1380,8 +1398,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// We have received a new directory information, make a split strategy.
--resultsLeft;
- SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context,
- adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, ugi);
+ SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context, adi.fs,
+ adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, ugi, allowSyntheticFileIds);
if (splitStrategy == null) continue; // Combined.
if (isDebugEnabled) {
@@ -1451,12 +1469,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private static SplitStrategy<?> combineOrCreateETLStrategy(CombinedCtx combinedCtx,
Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> files,
List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal,
- UserGroupInformation ugi) {
+ UserGroupInformation ugi, boolean allowSyntheticFileIds) {
if (!deltas.isEmpty() || combinedCtx == null) {
- return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered, ugi);
+ return new ETLSplitStrategy(
+ context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds);
} else if (combinedCtx.combined == null) {
combinedCtx.combined = new ETLSplitStrategy(
- context, fs, dir, files, isOriginal, deltas, covered, ugi);
+ context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds);
combinedCtx.combineStartUs = System.nanoTime();
return null;
} else {
@@ -1465,11 +1484,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
switch (r) {
case YES: return null;
case NO_AND_CONTINUE:
- return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered, ugi);
+ return new ETLSplitStrategy(
+ context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds);
case NO_AND_SWAP: {
ETLSplitStrategy oldBase = combinedCtx.combined;
combinedCtx.combined = new ETLSplitStrategy(
- context, fs, dir, files, isOriginal, deltas, covered, ugi);
+ context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds);
combinedCtx.combineStartUs = System.nanoTime();
return oldBase;
}
@@ -1798,7 +1818,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@VisibleForTesting
static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
- List<HdfsFileStatusWithId> baseOrOriginalFiles, UserGroupInformation ugi) {
+ List<HdfsFileStatusWithId> baseOrOriginalFiles, UserGroupInformation ugi,
+ boolean allowSyntheticFileIds) {
Path base = dirInfo.getBaseDirectory();
List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
@@ -1826,20 +1847,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
switch(context.splitStrategyKind) {
case BI:
// BI strategy requested through config
- return new BISplitStrategy(
- context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+ return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+ isOriginal, deltas, covered, allowSyntheticFileIds);
case ETL:
// ETL strategy requested through config
- return combineOrCreateETLStrategy(combinedCtx, context, fs,
- dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi);
+ return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles,
+ deltas, covered, isOriginal, ugi, allowSyntheticFileIds);
default:
// HYBRID strategy
if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) {
- return combineOrCreateETLStrategy(combinedCtx, context, fs,
- dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi);
+ return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles,
+ deltas, covered, isOriginal, ugi, allowSyntheticFileIds);
} else {
- return new BISplitStrategy(
- context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+ return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+ isOriginal, deltas, covered, allowSyntheticFileIds);
}
}
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 4a27ee7..407fd62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -26,13 +26,13 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.orc.FileMetaInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.ColumnarSplit;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.FileSplit;
@@ -48,11 +48,11 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
private boolean isOriginal;
private boolean hasBase;
private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
- private OrcFile.WriterVersion writerVersion;
private long projColsUncompressedSize;
- private transient Long fileId;
+ private transient Object fileKey;
- static final int HAS_FILEID_FLAG = 8;
+ static final int HAS_SYNTHETIC_FILEID_FLAG = 16;
+ static final int HAS_LONG_FILEID_FLAG = 8;
static final int BASE_FLAG = 4;
static final int ORIGINAL_FLAG = 2;
static final int FOOTER_FLAG = 1;
@@ -64,13 +64,13 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
super(null, 0, 0, (String[]) null);
}
- public OrcSplit(Path path, Long fileId, long offset, long length, String[] hosts,
+ public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts,
FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
super(path, offset, length, hosts);
- // We could avoid serializing file ID and just replace the path with inode-based path.
- // However, that breaks bunch of stuff because Hive later looks up things by split path.
- this.fileId = fileId;
+ // For HDFS, we could avoid serializing file ID and just replace the path with inode-based
+ // path. However, that breaks bunch of stuff because Hive later looks up things by split path.
+ this.fileKey = fileId;
this.fileMetaInfo = fileMetaInfo;
hasFooter = this.fileMetaInfo != null;
this.isOriginal = isOriginal;
@@ -84,10 +84,12 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
//serialize path, offset, length using FileSplit
super.write(out);
+ boolean isFileIdLong = fileKey instanceof Long, isFileIdWritable = fileKey instanceof Writable;
int flags = (hasBase ? BASE_FLAG : 0) |
(isOriginal ? ORIGINAL_FLAG : 0) |
(hasFooter ? FOOTER_FLAG : 0) |
- (fileId != null ? HAS_FILEID_FLAG : 0);
+ (isFileIdLong ? HAS_LONG_FILEID_FLAG : 0) |
+ (isFileIdWritable ? HAS_SYNTHETIC_FILEID_FLAG : 0);
out.writeByte(flags);
out.writeInt(deltas.size());
for(AcidInputFormat.DeltaMetaData delta: deltas) {
@@ -109,8 +111,10 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
footerBuff.limit() - footerBuff.position());
WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
}
- if (fileId != null) {
- out.writeLong(fileId.longValue());
+ if (isFileIdLong) {
+ out.writeLong(((Long)fileKey).longValue());
+ } else if (isFileIdWritable) {
+ ((Writable)fileKey).write(out);
}
}
@@ -123,7 +127,11 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
hasFooter = (FOOTER_FLAG & flags) != 0;
isOriginal = (ORIGINAL_FLAG & flags) != 0;
hasBase = (BASE_FLAG & flags) != 0;
- boolean hasFileId = (HAS_FILEID_FLAG & flags) != 0;
+ boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0,
+ hasWritableFileId = (HAS_SYNTHETIC_FILEID_FLAG & flags) != 0;
+ if (hasLongFileId && hasWritableFileId) {
+ throw new IOException("Invalid split - both file ID types present");
+ }
deltas.clear();
int numDeltas = in.readInt();
@@ -148,8 +156,12 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
fileMetaInfo = new FileMetaInfo(compressionType, bufferSize,
metadataSize, footerBuff, writerVersion);
}
- if (hasFileId) {
- fileId = in.readLong();
+ if (hasLongFileId) {
+ fileKey = in.readLong();
+ } else if (hasWritableFileId) {
+ SyntheticFileId fileId = new SyntheticFileId();
+ fileId.readFields(in);
+ this.fileKey = fileId;
}
}
@@ -186,8 +198,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
return projColsUncompressedSize;
}
- public Long getFileId() {
- return fileId;
+ public Object getFileKey() {
+ return fileKey;
}
@Override