You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2019/12/04 18:14:18 UTC

[hive] branch master updated: HIVE-22499 : LLAP: Add an EncodedReaderOptions to extend ORC impl for options (Mustafa Iman via Slim Bouguerra)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ef05ef  HIVE-22499 : LLAP: Add an EncodedReaderOptions to extend ORC impl for options (Mustafa Iman via Slim Bouguerra)
1ef05ef is described below

commit 1ef05efba6bc50722f33afeceada383914cfbd59
Author: Ashutosh Chauhan <ha...@apache.org>
AuthorDate: Wed Dec 4 10:12:11 2019 -0800

    HIVE-22499 : LLAP: Add an EncodedReaderOptions to extend ORC impl for options (Mustafa Iman via Slim Bouguerra)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hive/llap/io/encoded/OrcEncodedDataReader.java | 23 +++++--
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  |  5 --
 .../hive/ql/io/orc/encoded/EncodedOrcFile.java     | 40 +++++++++++
 .../hive/ql/io/orc/encoded/TestEncodedOrcFile.java | 80 ++++++++++++++++++++++
 4 files changed, 137 insertions(+), 11 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 9e61322..2893870 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
@@ -173,7 +173,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private final CacheTag cacheTag;
   private final Map<Path, PartitionDesc> parts;
 
-  private Utilities.SupplierWithCheckedException<FileSystem, IOException> fsSupplier;
+  private Supplier<FileSystem> fsSupplier;
 
   /**
    * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read.
@@ -219,7 +219,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null;
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
-    fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf);
+    fsSupplier = getFsSupplier(split.getPath(), jobConf);
     fileKey = determineFileId(fsSupplier, split,
         HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
         HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
@@ -276,6 +276,17 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     });
   }
 
+  private static Supplier<FileSystem> getFsSupplier(final Path path,
+      final Configuration conf) {
+    return () -> {
+      try {
+        return path.getFileSystem(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
   protected Void performDataRead() throws IOException, InterruptedException {
     long startTime = counters.startTimeCounter();
     LlapIoImpl.LOG.info("Processing data for file {}: {}", fileKey, split.getPath());
@@ -479,7 +490,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     return true;
   }
 
-  private static Object determineFileId(Utilities.SupplierWithCheckedException<FileSystem, IOException> fsSupplier,
+  private static Object determineFileId(Supplier<FileSystem> fsSupplier,
     FileSplit split,
       boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException {
     if (split instanceof OrcSplit) {
@@ -525,7 +536,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     }
     LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath());
     long startTime = counters.startTimeCounter();
-    ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fsSupplier.get()).fileMetadata(fileMetadata);
+    ReaderOptions opts = EncodedOrcFile.readerOptions(jobConf).filesystem(fsSupplier).fileMetadata(fileMetadata);
     if (split instanceof OrcSplit) {
       OrcTail orcTail = ((OrcSplit) split).getOrcTail();
       if (orcTail != null) {
@@ -740,7 +751,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     rawDataReader = RecordReaderUtils.createDefaultDataReader(
         DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize())
         .withCompression(orcReader.getCompressionKind())
-        .withFileSystem(fsSupplier.get()).withPath(path)
+        .withFileSystemSupplier(fsSupplier).withPath(path)
         .withTypeCount(orcReader.getSchema().getMaximumId() + 1)
         .withZeroCopy(useZeroCopy)
         .build());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 3deba27..3d30d09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -4512,11 +4512,6 @@ public final class Utilities {
     return passwd;
   }
 
-  public static SupplierWithCheckedException<FileSystem, IOException> getFsSupplier(final Path path,
-    final Configuration conf) {
-    return () -> path.getFileSystem(conf);
-  }
-
   /**
    * Logs the class paths of the job class loader and the thread context class loader to the passed logger.
    * Checks both loaders if getURLs method is available; if not, prints a message about this (instead of the class path)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java
index 97a1b53..8920661 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java
@@ -18,13 +18,53 @@
 package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import java.io.IOException;
+import java.util.function.Supplier;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
 
+/**
+ * Factory for encoded ORC readers and options.
+ */
 public class EncodedOrcFile {
+
+  /**
+   * Extends ReaderOptions to accept a file system supplier
+   * instead of a fully initialized fs object.
+   */
+  public static class EncodedReaderOptions extends ReaderOptions {
+
+    private Supplier<FileSystem> fileSystemSupplier;
+
+    public EncodedReaderOptions(Configuration configuration) {
+      super(configuration);
+    }
+
+    public EncodedReaderOptions filesystem(Supplier<FileSystem> fsSupplier) {
+      this.fileSystemSupplier = fsSupplier;
+      return this;
+    }
+
+    @Override
+    public EncodedReaderOptions filesystem(FileSystem fs) {
+      this.fileSystemSupplier = () -> fs;
+      return this;
+    }
+
+    @Override
+    public FileSystem getFilesystem() {
+      return fileSystemSupplier != null ? fileSystemSupplier.get() : null;
+    }
+  }
+
   public static Reader createReader(
       Path path, ReaderOptions options) throws IOException {
     return new ReaderImpl(path, options);
   }
+
+  public static EncodedReaderOptions readerOptions(Configuration conf) {
+    return new EncodedReaderOptions(conf);
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java
new file mode 100644
index 0000000..8264723
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hive.common.util.MockFileSystem;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.OrcTail;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test to verify lazy nature of EncodedOrcFile.
+ */
+public class TestEncodedOrcFile {
+
+  @Test
+  public void testFileSystemIsNotInitializedWithKnownTail() throws IOException {
+    JobConf conf = new JobConf();
+    Path path = new Path("fmock:///testtable/bucket_0");
+    conf.set("hive.orc.splits.include.file.footer", "true");
+    conf.set("fs.defaultFS", "fmock:///");
+    conf.set("fs.mock.impl", FailingMockFileSystem.class.getName());
+
+    List<OrcProto.Type> types = new ArrayList<>();
+    types.add(OrcProto.Type.newBuilder().setKind(OrcProto.Type.Kind.BINARY).build());
+    FileMetadata dummyMetadata = mock(FileMetadata.class);
+    when(dummyMetadata.getTypes()).thenReturn(types);
+    when(dummyMetadata.getCompressionKind()).thenReturn(CompressionKind.NONE);
+    OrcFile.ReaderOptions readerOptions = EncodedOrcFile.readerOptions(conf)
+        .filesystem(() -> {
+          throw new RuntimeException("Filesystem should not have been initialized");
+        }).orcTail(new OrcTail(OrcProto.FileTail.getDefaultInstance(), null))
+        .fileMetadata(dummyMetadata);
+
+    // an orc reader is created, this should not cause filesystem initialization
+    // because orc tail is already provided and we are not making any real reads.
+    Reader reader = EncodedOrcFile.createReader(path, readerOptions);
+
+    // Following initiates the creation of data reader in ORC reader. This should
+    // not cause file system initialization either as we are still not making any
+    // real read.
+    reader.rows();
+  }
+
+  private static class FailingMockFileSystem extends MockFileSystem {
+    @Override
+    public void initialize(URI uri, Configuration conf) {
+      throw new RuntimeException("Filesystem should not have been initialized");
+    }
+  }
+}