You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2019/12/04 18:14:18 UTC
[hive] branch master updated: HIVE-22499 : LLAP: Add an
EncodedReaderOptions to extend ORC impl for options (Mustafa Iman via Slim
Bouguerra)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1ef05ef HIVE-22499 : LLAP: Add an EncodedReaderOptions to extend ORC impl for options (Mustafa Iman via Slim Bouguerra)
1ef05ef is described below
commit 1ef05efba6bc50722f33afeceada383914cfbd59
Author: Ashutosh Chauhan <ha...@apache.org>
AuthorDate: Wed Dec 4 10:12:11 2019 -0800
HIVE-22499 : LLAP: Add an EncodedReaderOptions to extend ORC impl for options (Mustafa Iman via Slim Bouguerra)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hive/llap/io/encoded/OrcEncodedDataReader.java | 23 +++++--
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 5 --
.../hive/ql/io/orc/encoded/EncodedOrcFile.java | 40 +++++++++++
.../hive/ql/io/orc/encoded/TestEncodedOrcFile.java | 80 ++++++++++++++++++++++
4 files changed, 137 insertions(+), 11 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 9e61322..2893870 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
@@ -173,7 +173,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private final CacheTag cacheTag;
private final Map<Path, PartitionDesc> parts;
- private Utilities.SupplierWithCheckedException<FileSystem, IOException> fsSupplier;
+ private Supplier<FileSystem> fsSupplier;
/**
* stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read.
@@ -219,7 +219,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
- fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf);
+ fsSupplier = getFsSupplier(split.getPath(), jobConf);
fileKey = determineFileId(fsSupplier, split,
HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
@@ -276,6 +276,17 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
});
}
+ private static Supplier<FileSystem> getFsSupplier(final Path path,
+ final Configuration conf) {
+ return () -> {
+ try {
+ return path.getFileSystem(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
protected Void performDataRead() throws IOException, InterruptedException {
long startTime = counters.startTimeCounter();
LlapIoImpl.LOG.info("Processing data for file {}: {}", fileKey, split.getPath());
@@ -479,7 +490,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
return true;
}
- private static Object determineFileId(Utilities.SupplierWithCheckedException<FileSystem, IOException> fsSupplier,
+ private static Object determineFileId(Supplier<FileSystem> fsSupplier,
FileSplit split,
boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException {
if (split instanceof OrcSplit) {
@@ -525,7 +536,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath());
long startTime = counters.startTimeCounter();
- ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fsSupplier.get()).fileMetadata(fileMetadata);
+ ReaderOptions opts = EncodedOrcFile.readerOptions(jobConf).filesystem(fsSupplier).fileMetadata(fileMetadata);
if (split instanceof OrcSplit) {
OrcTail orcTail = ((OrcSplit) split).getOrcTail();
if (orcTail != null) {
@@ -740,7 +751,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
rawDataReader = RecordReaderUtils.createDefaultDataReader(
DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize())
.withCompression(orcReader.getCompressionKind())
- .withFileSystem(fsSupplier.get()).withPath(path)
+ .withFileSystemSupplier(fsSupplier).withPath(path)
.withTypeCount(orcReader.getSchema().getMaximumId() + 1)
.withZeroCopy(useZeroCopy)
.build());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 3deba27..3d30d09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -4512,11 +4512,6 @@ public final class Utilities {
return passwd;
}
- public static SupplierWithCheckedException<FileSystem, IOException> getFsSupplier(final Path path,
- final Configuration conf) {
- return () -> path.getFileSystem(conf);
- }
-
/**
* Logs the class paths of the job class loader and the thread context class loader to the passed logger.
* Checks both loaders if getURLs method is available; if not, prints a message about this (instead of the class path)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java
index 97a1b53..8920661 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java
@@ -18,13 +18,53 @@
package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
+/**
+ * Factory for encoded ORC readers and options.
+ */
public class EncodedOrcFile {
+
+ /**
+ * Extends ReaderOptions to accept a file system supplier
+ * instead of a fully initialized fs object.
+ */
+ public static class EncodedReaderOptions extends ReaderOptions {
+
+ private Supplier<FileSystem> fileSystemSupplier;
+
+ public EncodedReaderOptions(Configuration configuration) {
+ super(configuration);
+ }
+
+ public EncodedReaderOptions filesystem(Supplier<FileSystem> fsSupplier) {
+ this.fileSystemSupplier = fsSupplier;
+ return this;
+ }
+
+ @Override
+ public EncodedReaderOptions filesystem(FileSystem fs) {
+ this.fileSystemSupplier = () -> fs;
+ return this;
+ }
+
+ @Override
+ public FileSystem getFilesystem() {
+ return fileSystemSupplier != null ? fileSystemSupplier.get() : null;
+ }
+ }
+
public static Reader createReader(
Path path, ReaderOptions options) throws IOException {
return new ReaderImpl(path, options);
}
+
+ public static EncodedReaderOptions readerOptions(Configuration conf) {
+ return new EncodedReaderOptions(conf);
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java
new file mode 100644
index 0000000..8264723
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hive.common.util.MockFileSystem;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.OrcTail;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test to verify lazy nature of EncodedOrcFile.
+ */
+public class TestEncodedOrcFile {
+
+ @Test
+ public void testFileSystemIsNotInitializedWithKnownTail() throws IOException {
+ JobConf conf = new JobConf();
+ Path path = new Path("fmock:///testtable/bucket_0");
+ conf.set("hive.orc.splits.include.file.footer", "true");
+ conf.set("fs.defaultFS", "fmock:///");
+ conf.set("fs.mock.impl", FailingMockFileSystem.class.getName());
+
+ List<OrcProto.Type> types = new ArrayList<>();
+ types.add(OrcProto.Type.newBuilder().setKind(OrcProto.Type.Kind.BINARY).build());
+ FileMetadata dummyMetadata = mock(FileMetadata.class);
+ when(dummyMetadata.getTypes()).thenReturn(types);
+ when(dummyMetadata.getCompressionKind()).thenReturn(CompressionKind.NONE);
+ OrcFile.ReaderOptions readerOptions = EncodedOrcFile.readerOptions(conf)
+ .filesystem(() -> {
+ throw new RuntimeException("Filesystem should not have been initialized");
+ }).orcTail(new OrcTail(OrcProto.FileTail.getDefaultInstance(), null))
+ .fileMetadata(dummyMetadata);
+
+ // an orc reader is created, this should not cause filesystem initialization
+ // because orc tail is already provided and we are not making any real reads.
+ Reader reader = EncodedOrcFile.createReader(path, readerOptions);
+
+ // Following initiates the creation of data reader in ORC reader. This should
+ // not cause file system initialization either as we are still not making any
+ // real read.
+ reader.rows();
+ }
+
+ private static class FailingMockFileSystem extends MockFileSystem {
+ @Override
+ public void initialize(URI uri, Configuration conf) {
+ throw new RuntimeException("Filesystem should not have been initialized");
+ }
+ }
+}