You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/04/12 21:49:32 UTC

[3/3] hive git commit: HIVE-16390 : LLAP IO should take job config into account; also LLAP config should load defaults (Sergey Shelukhin, reviewed by Siddharth Seth)

HIVE-16390 : LLAP IO should take job config into account; also LLAP config should load defaults (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3f681c22
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3f681c22
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3f681c22

Branch: refs/heads/master
Commit: 3f681c22787424244490184d01b9a0b9c5a4dae5
Parents: ce037d1
Author: sergey <se...@apache.org>
Authored: Wed Apr 12 14:14:57 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Apr 12 14:49:23 2017 -0700

----------------------------------------------------------------------
 .../configuration/LlapDaemonConfiguration.java  |  2 +-
 .../llap/io/decode/OrcColumnVectorProducer.java |  6 ++--
 .../llap/io/encoded/OrcEncodedDataReader.java   | 33 +++++++++++++-------
 3 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3f681c22/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
index 88f3b19..7219d36 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
@@ -36,7 +36,7 @@ public class LlapDaemonConfiguration extends Configuration {
   public static final String[] SSL_DAEMON_CONFIGS = { "ssl-server.xml" };
 
   public LlapDaemonConfiguration() {
-    super(false);
+    super(true); // Load the defaults.
     for (String conf : DAEMON_CONFIGS) {
       addResource(conf);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/3f681c22/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index ac031aa..121e169 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -79,9 +79,9 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
     cacheMetrics.incrCacheReadRequests();
     OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
         _skipCorrupt, counters, ioMetrics);
-    // Note: we use global conf here and ignore JobConf.
-    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
-        metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
+    OrcEncodedDataReader reader = new OrcEncodedDataReader(
+        lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg,
+        columnNames, edc, counters, readerSchema);
     edc.init(reader, reader);
     return edc;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/3f681c22/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 076b0e1..03bc3ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -135,7 +135,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private final OrcMetadataCache metadataCache;
   private final LowLevelCache lowLevelCache;
   private final BufferUsageManager bufferManager;
-  private final Configuration conf;
+  private final Configuration daemonConf, jobConf;
   private final FileSplit split;
   private List<Integer> includedColumnIds;
   private final SearchArgument sarg;
@@ -166,13 +166,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   boolean[] globalIncludes = null;
 
   public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
-      OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List<Integer> columnIds,
-      SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer,
-      QueryFragmentCounters counters, TypeDescription readerSchema) throws IOException {
+      OrcMetadataCache metadataCache, Configuration daemonConf, Configuration jobConf,
+      FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+      OrcEncodedDataConsumer consumer, QueryFragmentCounters counters,
+      TypeDescription readerSchema) throws IOException {
     this.lowLevelCache = lowLevelCache;
     this.metadataCache = metadataCache;
     this.bufferManager = bufferManager;
-    this.conf = conf;
+    this.daemonConf = daemonConf;
     this.split = split;
     this.includedColumnIds = columnIds;
     if (this.includedColumnIds != null) {
@@ -193,15 +194,22 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     orcReader = null;
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
-    fs = split.getPath().getFileSystem(conf);
+    fs = split.getPath().getFileSystem(jobConf);
     fileKey = determineFileId(fs, split,
-        HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
+        HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
     fileMetadata = getOrReadFileMetadata();
     if (readerSchema == null) {
       readerSchema = fileMetadata.getSchema();
     }
     globalIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds);
-    Reader.Options options = new Reader.Options(conf).include(globalIncludes);
+    // Do not allow users to override zero-copy setting. The rest can be taken from user config.
+    boolean useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
+    if (useZeroCopy != OrcConf.USE_ZEROCOPY.getBoolean(jobConf)) {
+      jobConf = new Configuration(jobConf);
+      jobConf.setBoolean(OrcConf.USE_ZEROCOPY.getAttribute(), useZeroCopy);
+    }
+    this.jobConf = jobConf;
+    Reader.Options options = new Reader.Options(jobConf).include(globalIncludes);
     evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, options);
     consumer.setFileMetadata(fileMetadata);
     consumer.setIncludedColumns(globalIncludes);
@@ -481,7 +489,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private void validateFileMetadata() throws IOException {
     if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
     int bufferSize = fileMetadata.getCompressionBufferSize();
-    long minAllocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+    long minAllocSize = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
     if (bufferSize < minAllocSize) {
       LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
             + "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
@@ -563,12 +571,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private void ensureOrcReader() throws IOException {
     if (orcReader != null) return;
     path = split.getPath();
-    if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+    if (fileKey instanceof Long && HiveConf.getBoolVar(
+        daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
       path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
     }
     LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath());
     long startTime = counters.startTimeCounter();
-    ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
+    ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fs).fileMetadata(fileMetadata);
     if (split instanceof OrcSplit) {
       OrcTail orcTail = ((OrcSplit) split).getOrcTail();
       if (orcTail != null) {
@@ -655,7 +664,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     ensureOrcReader();
     if (metadataReader != null) return;
     long startTime = counters.startTimeCounter();
-    boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
+    boolean useZeroCopy = (daemonConf != null) && OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
     metadataReader = RecordReaderUtils.createDefaultDataReader(
         DataReaderProperties.builder()
         .withBufferSize(orcReader.getCompressionSize())