You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/04/12 21:49:30 UTC
[1/3] hive git commit: HIVE-16390 : LLAP IO should take job config
into account;
also LLAP config should load defaults (Sergey Shelukhin, reviewed by Siddharth
Seth)
Repository: hive
Updated Branches:
refs/heads/branch-2 04e5a4ee2 -> 156a25b6a
refs/heads/branch-2.3 96aaa337e -> a62911a61
refs/heads/master ce037d146 -> 3f681c227
HIVE-16390 : LLAP IO should take job config into account; also LLAP config should load defaults (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/156a25b6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/156a25b6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/156a25b6
Branch: refs/heads/branch-2
Commit: 156a25b6aacf587f3418599c7d5e84a111ab7ca3
Parents: 04e5a4e
Author: sergey <se...@apache.org>
Authored: Wed Apr 12 14:14:57 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Apr 12 14:49:10 2017 -0700
----------------------------------------------------------------------
.../configuration/LlapDaemonConfiguration.java | 2 +-
.../llap/io/decode/OrcColumnVectorProducer.java | 6 ++--
.../llap/io/encoded/OrcEncodedDataReader.java | 33 +++++++++++++-------
3 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/156a25b6/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
index 88f3b19..7219d36 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
@@ -36,7 +36,7 @@ public class LlapDaemonConfiguration extends Configuration {
public static final String[] SSL_DAEMON_CONFIGS = { "ssl-server.xml" };
public LlapDaemonConfiguration() {
- super(false);
+ super(true); // Load the defaults.
for (String conf : DAEMON_CONFIGS) {
addResource(conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/156a25b6/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index ac031aa..121e169 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -79,9 +79,9 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, ioMetrics);
- // Note: we use global conf here and ignore JobConf.
- OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
- metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(
+ lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg,
+ columnNames, edc, counters, readerSchema);
edc.init(reader, reader);
return edc;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/156a25b6/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 076b0e1..03bc3ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -135,7 +135,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private final OrcMetadataCache metadataCache;
private final LowLevelCache lowLevelCache;
private final BufferUsageManager bufferManager;
- private final Configuration conf;
+ private final Configuration daemonConf, jobConf;
private final FileSplit split;
private List<Integer> includedColumnIds;
private final SearchArgument sarg;
@@ -166,13 +166,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
boolean[] globalIncludes = null;
public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
- OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List<Integer> columnIds,
- SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer,
- QueryFragmentCounters counters, TypeDescription readerSchema) throws IOException {
+ OrcMetadataCache metadataCache, Configuration daemonConf, Configuration jobConf,
+ FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ OrcEncodedDataConsumer consumer, QueryFragmentCounters counters,
+ TypeDescription readerSchema) throws IOException {
this.lowLevelCache = lowLevelCache;
this.metadataCache = metadataCache;
this.bufferManager = bufferManager;
- this.conf = conf;
+ this.daemonConf = daemonConf;
this.split = split;
this.includedColumnIds = columnIds;
if (this.includedColumnIds != null) {
@@ -193,15 +194,22 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
orcReader = null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
- fs = split.getPath().getFileSystem(conf);
+ fs = split.getPath().getFileSystem(jobConf);
fileKey = determineFileId(fs, split,
- HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
+ HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
fileMetadata = getOrReadFileMetadata();
if (readerSchema == null) {
readerSchema = fileMetadata.getSchema();
}
globalIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds);
- Reader.Options options = new Reader.Options(conf).include(globalIncludes);
+ // Do not allow users to override zero-copy setting. The rest can be taken from user config.
+ boolean useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
+ if (useZeroCopy != OrcConf.USE_ZEROCOPY.getBoolean(jobConf)) {
+ jobConf = new Configuration(jobConf);
+ jobConf.setBoolean(OrcConf.USE_ZEROCOPY.getAttribute(), useZeroCopy);
+ }
+ this.jobConf = jobConf;
+ Reader.Options options = new Reader.Options(jobConf).include(globalIncludes);
evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, options);
consumer.setFileMetadata(fileMetadata);
consumer.setIncludedColumns(globalIncludes);
@@ -481,7 +489,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void validateFileMetadata() throws IOException {
if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
int bufferSize = fileMetadata.getCompressionBufferSize();
- long minAllocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+ long minAllocSize = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
if (bufferSize < minAllocSize) {
LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
+ "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
@@ -563,12 +571,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void ensureOrcReader() throws IOException {
if (orcReader != null) return;
path = split.getPath();
- if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+ if (fileKey instanceof Long && HiveConf.getBoolVar(
+ daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
}
LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath());
long startTime = counters.startTimeCounter();
- ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
+ ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fs).fileMetadata(fileMetadata);
if (split instanceof OrcSplit) {
OrcTail orcTail = ((OrcSplit) split).getOrcTail();
if (orcTail != null) {
@@ -655,7 +664,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
ensureOrcReader();
if (metadataReader != null) return;
long startTime = counters.startTimeCounter();
- boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
+ boolean useZeroCopy = (daemonConf != null) && OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
metadataReader = RecordReaderUtils.createDefaultDataReader(
DataReaderProperties.builder()
.withBufferSize(orcReader.getCompressionSize())
[3/3] hive git commit: HIVE-16390 : LLAP IO should take job config
into account;
also LLAP config should load defaults (Sergey Shelukhin, reviewed by Siddharth
Seth)
Posted by se...@apache.org.
HIVE-16390 : LLAP IO should take job config into account; also LLAP config should load defaults (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3f681c22
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3f681c22
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3f681c22
Branch: refs/heads/master
Commit: 3f681c22787424244490184d01b9a0b9c5a4dae5
Parents: ce037d1
Author: sergey <se...@apache.org>
Authored: Wed Apr 12 14:14:57 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Apr 12 14:49:23 2017 -0700
----------------------------------------------------------------------
.../configuration/LlapDaemonConfiguration.java | 2 +-
.../llap/io/decode/OrcColumnVectorProducer.java | 6 ++--
.../llap/io/encoded/OrcEncodedDataReader.java | 33 +++++++++++++-------
3 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3f681c22/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
index 88f3b19..7219d36 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
@@ -36,7 +36,7 @@ public class LlapDaemonConfiguration extends Configuration {
public static final String[] SSL_DAEMON_CONFIGS = { "ssl-server.xml" };
public LlapDaemonConfiguration() {
- super(false);
+ super(true); // Load the defaults.
for (String conf : DAEMON_CONFIGS) {
addResource(conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3f681c22/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index ac031aa..121e169 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -79,9 +79,9 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, ioMetrics);
- // Note: we use global conf here and ignore JobConf.
- OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
- metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(
+ lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg,
+ columnNames, edc, counters, readerSchema);
edc.init(reader, reader);
return edc;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3f681c22/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 076b0e1..03bc3ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -135,7 +135,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private final OrcMetadataCache metadataCache;
private final LowLevelCache lowLevelCache;
private final BufferUsageManager bufferManager;
- private final Configuration conf;
+ private final Configuration daemonConf, jobConf;
private final FileSplit split;
private List<Integer> includedColumnIds;
private final SearchArgument sarg;
@@ -166,13 +166,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
boolean[] globalIncludes = null;
public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
- OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List<Integer> columnIds,
- SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer,
- QueryFragmentCounters counters, TypeDescription readerSchema) throws IOException {
+ OrcMetadataCache metadataCache, Configuration daemonConf, Configuration jobConf,
+ FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ OrcEncodedDataConsumer consumer, QueryFragmentCounters counters,
+ TypeDescription readerSchema) throws IOException {
this.lowLevelCache = lowLevelCache;
this.metadataCache = metadataCache;
this.bufferManager = bufferManager;
- this.conf = conf;
+ this.daemonConf = daemonConf;
this.split = split;
this.includedColumnIds = columnIds;
if (this.includedColumnIds != null) {
@@ -193,15 +194,22 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
orcReader = null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
- fs = split.getPath().getFileSystem(conf);
+ fs = split.getPath().getFileSystem(jobConf);
fileKey = determineFileId(fs, split,
- HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
+ HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
fileMetadata = getOrReadFileMetadata();
if (readerSchema == null) {
readerSchema = fileMetadata.getSchema();
}
globalIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds);
- Reader.Options options = new Reader.Options(conf).include(globalIncludes);
+ // Do not allow users to override zero-copy setting. The rest can be taken from user config.
+ boolean useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
+ if (useZeroCopy != OrcConf.USE_ZEROCOPY.getBoolean(jobConf)) {
+ jobConf = new Configuration(jobConf);
+ jobConf.setBoolean(OrcConf.USE_ZEROCOPY.getAttribute(), useZeroCopy);
+ }
+ this.jobConf = jobConf;
+ Reader.Options options = new Reader.Options(jobConf).include(globalIncludes);
evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, options);
consumer.setFileMetadata(fileMetadata);
consumer.setIncludedColumns(globalIncludes);
@@ -481,7 +489,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void validateFileMetadata() throws IOException {
if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
int bufferSize = fileMetadata.getCompressionBufferSize();
- long minAllocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+ long minAllocSize = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
if (bufferSize < minAllocSize) {
LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
+ "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
@@ -563,12 +571,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void ensureOrcReader() throws IOException {
if (orcReader != null) return;
path = split.getPath();
- if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+ if (fileKey instanceof Long && HiveConf.getBoolVar(
+ daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
}
LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath());
long startTime = counters.startTimeCounter();
- ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
+ ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fs).fileMetadata(fileMetadata);
if (split instanceof OrcSplit) {
OrcTail orcTail = ((OrcSplit) split).getOrcTail();
if (orcTail != null) {
@@ -655,7 +664,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
ensureOrcReader();
if (metadataReader != null) return;
long startTime = counters.startTimeCounter();
- boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
+ boolean useZeroCopy = (daemonConf != null) && OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
metadataReader = RecordReaderUtils.createDefaultDataReader(
DataReaderProperties.builder()
.withBufferSize(orcReader.getCompressionSize())
[2/3] hive git commit: HIVE-16390 : LLAP IO should take job config
into account;
also LLAP config should load defaults (Sergey Shelukhin, reviewed by Siddharth
Seth)
Posted by se...@apache.org.
HIVE-16390 : LLAP IO should take job config into account; also LLAP config should load defaults (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a62911a6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a62911a6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a62911a6
Branch: refs/heads/branch-2.3
Commit: a62911a6160559ad6b201d4ee606ea1f896b95fd
Parents: 96aaa33
Author: sergey <se...@apache.org>
Authored: Wed Apr 12 14:14:57 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Apr 12 14:49:18 2017 -0700
----------------------------------------------------------------------
.../configuration/LlapDaemonConfiguration.java | 2 +-
.../llap/io/decode/OrcColumnVectorProducer.java | 6 ++--
.../llap/io/encoded/OrcEncodedDataReader.java | 33 +++++++++++++-------
3 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a62911a6/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
index 88f3b19..7219d36 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
@@ -36,7 +36,7 @@ public class LlapDaemonConfiguration extends Configuration {
public static final String[] SSL_DAEMON_CONFIGS = { "ssl-server.xml" };
public LlapDaemonConfiguration() {
- super(false);
+ super(true); // Load the defaults.
for (String conf : DAEMON_CONFIGS) {
addResource(conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a62911a6/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index ac031aa..121e169 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -79,9 +79,9 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, ioMetrics);
- // Note: we use global conf here and ignore JobConf.
- OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
- metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
+ OrcEncodedDataReader reader = new OrcEncodedDataReader(
+ lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg,
+ columnNames, edc, counters, readerSchema);
edc.init(reader, reader);
return edc;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a62911a6/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 076b0e1..03bc3ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -135,7 +135,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private final OrcMetadataCache metadataCache;
private final LowLevelCache lowLevelCache;
private final BufferUsageManager bufferManager;
- private final Configuration conf;
+ private final Configuration daemonConf, jobConf;
private final FileSplit split;
private List<Integer> includedColumnIds;
private final SearchArgument sarg;
@@ -166,13 +166,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
boolean[] globalIncludes = null;
public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
- OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List<Integer> columnIds,
- SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer,
- QueryFragmentCounters counters, TypeDescription readerSchema) throws IOException {
+ OrcMetadataCache metadataCache, Configuration daemonConf, Configuration jobConf,
+ FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ OrcEncodedDataConsumer consumer, QueryFragmentCounters counters,
+ TypeDescription readerSchema) throws IOException {
this.lowLevelCache = lowLevelCache;
this.metadataCache = metadataCache;
this.bufferManager = bufferManager;
- this.conf = conf;
+ this.daemonConf = daemonConf;
this.split = split;
this.includedColumnIds = columnIds;
if (this.includedColumnIds != null) {
@@ -193,15 +194,22 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
orcReader = null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
- fs = split.getPath().getFileSystem(conf);
+ fs = split.getPath().getFileSystem(jobConf);
fileKey = determineFileId(fs, split,
- HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
+ HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
fileMetadata = getOrReadFileMetadata();
if (readerSchema == null) {
readerSchema = fileMetadata.getSchema();
}
globalIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds);
- Reader.Options options = new Reader.Options(conf).include(globalIncludes);
+ // Do not allow users to override zero-copy setting. The rest can be taken from user config.
+ boolean useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
+ if (useZeroCopy != OrcConf.USE_ZEROCOPY.getBoolean(jobConf)) {
+ jobConf = new Configuration(jobConf);
+ jobConf.setBoolean(OrcConf.USE_ZEROCOPY.getAttribute(), useZeroCopy);
+ }
+ this.jobConf = jobConf;
+ Reader.Options options = new Reader.Options(jobConf).include(globalIncludes);
evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, options);
consumer.setFileMetadata(fileMetadata);
consumer.setIncludedColumns(globalIncludes);
@@ -481,7 +489,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void validateFileMetadata() throws IOException {
if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
int bufferSize = fileMetadata.getCompressionBufferSize();
- long minAllocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+ long minAllocSize = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
if (bufferSize < minAllocSize) {
LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
+ "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
@@ -563,12 +571,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private void ensureOrcReader() throws IOException {
if (orcReader != null) return;
path = split.getPath();
- if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+ if (fileKey instanceof Long && HiveConf.getBoolVar(
+ daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
}
LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath());
long startTime = counters.startTimeCounter();
- ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
+ ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fs).fileMetadata(fileMetadata);
if (split instanceof OrcSplit) {
OrcTail orcTail = ((OrcSplit) split).getOrcTail();
if (orcTail != null) {
@@ -655,7 +664,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
ensureOrcReader();
if (metadataReader != null) return;
long startTime = counters.startTimeCounter();
- boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
+ boolean useZeroCopy = (daemonConf != null) && OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
metadataReader = RecordReaderUtils.createDefaultDataReader(
DataReaderProperties.builder()
.withBufferSize(orcReader.getCompressionSize())