You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2019/03/09 17:11:20 UTC

[hive] branch master updated: HIVE-21339 : LLAP: Cache hit also initializes an FS object (Prasanth Jayachandran via Gopal V)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d7a4872  HIVE-21339 : LLAP: Cache hit also initializes an FS object (Prasanth Jayachandran via Gopal V)
d7a4872 is described below

commit d7a4872ebe10fa15fb7b45d1a567c400bce4335d
Author: Prasanth Jayachandran <pr...@apache.org>
AuthorDate: Sat Mar 9 09:10:33 2019 -0800

    HIVE-21339 : LLAP: Cache hit also initializes an FS object (Prasanth Jayachandran via Gopal V)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hive/llap/io/encoded/OrcEncodedDataReader.java    | 19 +++++++++++--------
 .../org/apache/hadoop/hive/ql/exec/Utilities.java     | 10 ++++++++++
 2 files changed, 21 insertions(+), 8 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index e6d8b7a..6a00220 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
@@ -167,7 +168,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private CompressionCodec codec;
   private Object fileKey;
   private final String cacheTag;
-  private FileSystem fs;
+
+  private Utilities.SupplierWithCheckedException<FileSystem, IOException> fsSupplier;
 
   /**
    * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read.
@@ -211,8 +213,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
-    fs = split.getPath().getFileSystem(jobConf);
-    fileKey = determineFileId(fs, split,
+    fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf);
+    fileKey = determineFileId(fsSupplier, split,
         HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
         HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
         !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)
@@ -472,7 +474,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     return true;
   }
 
-  private static Object determineFileId(FileSystem fs, FileSplit split,
+  private static Object determineFileId(Utilities.SupplierWithCheckedException<FileSystem, IOException> fsSupplier,
+    FileSplit split,
       boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException {
     if (split instanceof OrcSplit) {
       Object fileKey = ((OrcSplit)split).getFileKey();
@@ -481,7 +484,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       }
     }
     LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
-    return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic);
+    return HdfsUtils.getFileId(fsSupplier.get(), split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic);
   }
 
   /**
@@ -513,11 +516,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     path = split.getPath();
     if (fileKey instanceof Long && HiveConf.getBoolVar(
         daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
-      path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
+      path = HdfsUtils.getFileIdPath(fsSupplier.get(), path, (long)fileKey);
     }
     LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath());
     long startTime = counters.startTimeCounter();
-    ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fs).fileMetadata(fileMetadata);
+    ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fsSupplier.get()).fileMetadata(fileMetadata);
     if (split instanceof OrcSplit) {
       OrcTail orcTail = ((OrcSplit) split).getOrcTail();
       if (orcTail != null) {
@@ -732,7 +735,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     rawDataReader = RecordReaderUtils.createDefaultDataReader(
         DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize())
         .withCompression(orcReader.getCompressionKind())
-        .withFileSystem(fs).withPath(path)
+        .withFileSystem(fsSupplier.get()).withPath(path)
         .withTypeCount(orcReader.getSchema().getMaximumId() + 1)
         .withZeroCopy(useZeroCopy)
         .build());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 018bc52..1df6094 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -255,6 +255,11 @@ public final class Utilities {
   private static final Object INPUT_SUMMARY_LOCK = new Object();
   private static final Object ROOT_HDFS_DIR_LOCK  = new Object();
 
+  @FunctionalInterface
+  public interface SupplierWithCheckedException<T, X extends Exception> {
+    T get() throws X;
+  }
+
   /**
    * ReduceField:
    * KEY: record key
@@ -4602,4 +4607,9 @@ public final class Utilities {
     }
     return passwd;
   }
+
+  public static SupplierWithCheckedException<FileSystem, IOException> getFsSupplier(final Path path,
+    final Configuration conf) {
+    return () -> path.getFileSystem(conf);
+  }
 }