You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/02/23 21:05:03 UTC

[GitHub] [hive] asinkovits commented on a change in pull request #1990: HIVE-24728: Low level reader for llap cache hydration

asinkovits commented on a change in pull request #1990:
URL: https://github.com/apache/hive/pull/1990#discussion_r580512723



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
##########
@@ -71,4 +72,7 @@ void readIndexStreams(OrcIndex index, StripeInformation stripe,
           throws IOException;
 
   void setStopped(AtomicBoolean isStopped);
+
+  void readDataRanges(DiskRangeList ranges) throws IOException;

Review comment:
       Renamed the method and javadoc added.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
##########
@@ -1085,6 +1084,61 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon
     return lastUncompressed;
   }
 
+  @Override
+  public void readDataRanges(DiskRangeList ranges) throws IOException {
+    boolean hasFileId = this.fileKey != null;
+    long baseOffset = 0L;
+
+    // 2. Now, read all of the ranges from cache or disk.
+    IdentityHashMap<ByteBuffer, Boolean> toRelease = new IdentityHashMap<>();
+    MutateHelper toRead = getDataFromCacheAndDisk(ranges, 0, hasFileId, toRelease);
+
+    // 3. For uncompressed case, we need some special processing before read.
+    //preReadUncompressedStream(stripeOffset, iter, sctx.offset, sctx.offset + sctx.length, sctx.kind);
+    preReadUncompressedStreams(baseOffset, toRead, toRelease);
+
+    // 4. Decompress the data.
+    try {
+      ColumnStreamData csd = POOLS.csdPool.take();
+      csd.incRef();
+      DiskRangeList drl = toRead.next;
+      while (drl != null) {
+          drl = readEncodedStream(baseOffset, drl, drl.getOffset(), drl.getEnd(), csd, drl.getOffset(), drl.getEnd(),
+                  toRelease);
+          for (MemoryBuffer buf : csd.getCacheBuffers()) {
+            cacheWrapper.releaseBuffer(buf);
+          }
+          if (drl != null)
+            drl = drl.next;
+        }
+      csd.decRef();

Review comment:
       Currently no pooling is done, so it doesn't matter but you are right it should be there. Fixed.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
##########
@@ -1085,6 +1084,61 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon
     return lastUncompressed;
   }
 
+  @Override
+  public void readDataRanges(DiskRangeList ranges) throws IOException {
+    boolean hasFileId = this.fileKey != null;
+    long baseOffset = 0L;
+
+    // 2. Now, read all of the ranges from cache or disk.
+    IdentityHashMap<ByteBuffer, Boolean> toRelease = new IdentityHashMap<>();
+    MutateHelper toRead = getDataFromCacheAndDisk(ranges, 0, hasFileId, toRelease);
+
+    // 3. For uncompressed case, we need some special processing before read.
+    //preReadUncompressedStream(stripeOffset, iter, sctx.offset, sctx.offset + sctx.length, sctx.kind);

Review comment:
       Yes. Deleted.

##########
File path: llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LowLevelDataReader.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.io.orc.encoded.LlapDataReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcCodecPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class LowLevelDataReader implements AutoCloseable {
+
+  private Path path;
+  private Object fileKey;
+  private Configuration daemonConf;
+  private DataCache cache;
+  private FileMetadataCache metadataCache;
+  private CacheTag cacheTag;
+  private FixedSizedObjectPool<IoTrace> tracePool;
+
+  private Supplier<FileSystem> fsSupplier;
+
+  private Reader orcReader;
+  private LlapDataReader rawDataReader;
+  private EncodedReader encodedReader;
+
+
+  public LowLevelDataReader(Path path, Object fileKey, Configuration daemonConf, DataCache cache,
+      FileMetadataCache metadataCache, CacheTag cacheTag, FixedSizedObjectPool<IoTrace> tracePool) {
+    this.path = path;
+    this.fileKey = fileKey;
+    this.daemonConf = daemonConf;
+    this.cache = cache;
+    this.metadataCache = metadataCache;
+    this.cacheTag = cacheTag;
+    this.tracePool = tracePool;
+  }
+
+  public void init() throws IOException {
+    this.fsSupplier = getFsSupplier(path, daemonConf);
+    Object fileKey = HdfsUtils.getFileId(fsSupplier.get(), path,

Review comment:
       Good catch. Thanks.

##########
File path: llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LowLevelDataReader.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.io.orc.encoded.LlapDataReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcCodecPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class LowLevelDataReader implements AutoCloseable {
+
+  private Path path;
+  private Object fileKey;
+  private Configuration daemonConf;
+  private DataCache cache;
+  private FileMetadataCache metadataCache;
+  private CacheTag cacheTag;
+  private FixedSizedObjectPool<IoTrace> tracePool;
+
+  private Supplier<FileSystem> fsSupplier;
+
+  private Reader orcReader;
+  private LlapDataReader rawDataReader;
+  private EncodedReader encodedReader;
+
+
+  public LowLevelDataReader(Path path, Object fileKey, Configuration daemonConf, DataCache cache,
+      FileMetadataCache metadataCache, CacheTag cacheTag, FixedSizedObjectPool<IoTrace> tracePool) {
+    this.path = path;
+    this.fileKey = fileKey;
+    this.daemonConf = daemonConf;
+    this.cache = cache;
+    this.metadataCache = metadataCache;
+    this.cacheTag = cacheTag;
+    this.tracePool = tracePool;
+  }
+
+  public void init() throws IOException {
+    this.fsSupplier = getFsSupplier(path, daemonConf);
+    Object fileKey = HdfsUtils.getFileId(fsSupplier.get(), path,
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
+          !HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_USE_FILEID_PATH));
+    if(!fileKey.equals(this.fileKey)) {
+      throw new IOException("File key mismatch.");
+    }
+    OrcFile.ReaderOptions opts = EncodedOrcFile.readerOptions(daemonConf).filesystem(fsSupplier);
+    orcReader = EncodedOrcFile.createReader(path, opts);
+  }
+
+  private static Supplier<FileSystem> getFsSupplier(final Path path, final Configuration conf) {
+    return () -> {
+      try {
+        return path.getFileSystem(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  public void read(DiskRangeList ranges) throws IOException {

Review comment:
       Ok, unfortunately not. The data is only available in the cache and the buffers are released. I've renamed the method to describe the behaviour.

##########
File path: llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LowLevelDataReader.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.io.orc.encoded.LlapDataReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcCodecPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class LowLevelDataReader implements AutoCloseable {

Review comment:
       I've renamed the class to describe the behaviour.
   +javadoc added

##########
File path: llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LowLevelDataReader.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.io.orc.encoded.LlapDataReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcCodecPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class LowLevelDataReader implements AutoCloseable {
+
+  private Path path;
+  private Object fileKey;
+  private Configuration daemonConf;
+  private DataCache cache;
+  private FileMetadataCache metadataCache;
+  private CacheTag cacheTag;
+  private FixedSizedObjectPool<IoTrace> tracePool;
+
+  private Supplier<FileSystem> fsSupplier;
+
+  private Reader orcReader;
+  private LlapDataReader rawDataReader;
+  private EncodedReader encodedReader;
+
+
+  public LowLevelDataReader(Path path, Object fileKey, Configuration daemonConf, DataCache cache,
+      FileMetadataCache metadataCache, CacheTag cacheTag, FixedSizedObjectPool<IoTrace> tracePool) {
+    this.path = path;
+    this.fileKey = fileKey;
+    this.daemonConf = daemonConf;
+    this.cache = cache;
+    this.metadataCache = metadataCache;
+    this.cacheTag = cacheTag;
+    this.tracePool = tracePool;
+  }
+
+  public void init() throws IOException {
+    this.fsSupplier = getFsSupplier(path, daemonConf);
+    Object fileKey = HdfsUtils.getFileId(fsSupplier.get(), path,
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
+          !HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_USE_FILEID_PATH));
+    if(!fileKey.equals(this.fileKey)) {
+      throw new IOException("File key mismatch.");
+    }
+    OrcFile.ReaderOptions opts = EncodedOrcFile.readerOptions(daemonConf).filesystem(fsSupplier);
+    orcReader = EncodedOrcFile.createReader(path, opts);
+  }
+
+  private static Supplier<FileSystem> getFsSupplier(final Path path, final Configuration conf) {

Review comment:
       The fsSupplier is required in multiple places so I cannot remove altogether, but I can make the static method package private and use it in this class as well.

##########
File path: llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LowLevelDataReader.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.io.orc.encoded.LlapDataReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcCodecPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class LowLevelDataReader implements AutoCloseable {
+
+  private Path path;
+  private Object fileKey;
+  private Configuration daemonConf;
+  private DataCache cache;
+  private FileMetadataCache metadataCache;
+  private CacheTag cacheTag;
+  private FixedSizedObjectPool<IoTrace> tracePool;
+
+  private Supplier<FileSystem> fsSupplier;
+
+  private Reader orcReader;
+  private LlapDataReader rawDataReader;
+  private EncodedReader encodedReader;
+
+
+  public LowLevelDataReader(Path path, Object fileKey, Configuration daemonConf, DataCache cache,
+      FileMetadataCache metadataCache, CacheTag cacheTag, FixedSizedObjectPool<IoTrace> tracePool) {
+    this.path = path;
+    this.fileKey = fileKey;
+    this.daemonConf = daemonConf;
+    this.cache = cache;
+    this.metadataCache = metadataCache;
+    this.cacheTag = cacheTag;
+    this.tracePool = tracePool;
+  }
+
+  public void init() throws IOException {
+    this.fsSupplier = getFsSupplier(path, daemonConf);
+    Object fileKey = HdfsUtils.getFileId(fsSupplier.get(), path,
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
+          !HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_USE_FILEID_PATH));
+    if(!fileKey.equals(this.fileKey)) {
+      throw new IOException("File key mismatch.");
+    }
+    OrcFile.ReaderOptions opts = EncodedOrcFile.readerOptions(daemonConf).filesystem(fsSupplier);
+    orcReader = EncodedOrcFile.createReader(path, opts);
+  }
+
+  private static Supplier<FileSystem> getFsSupplier(final Path path, final Configuration conf) {
+    return () -> {
+      try {
+        return path.getFileSystem(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  public void read(DiskRangeList ranges) throws IOException {
+    boolean useZeroCopy = (daemonConf != null) && OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
+    InStream.StreamOptions options = InStream.options()
+        .withCodec(OrcCodecPool.getCodec(orcReader.getCompressionKind()))
+        .withBufferSize(orcReader.getCompressionSize());
+
+    rawDataReader = LlapRecordReaderUtils.createDefaultLlapDataReader(
+        DataReaderProperties.builder()
+            .withFileSystemSupplier(fsSupplier).withPath(path)
+            .withCompression(options)
+            .withZeroCopy(useZeroCopy)
+            .build());
+    rawDataReader.open();
+
+    IoTrace ioTrace = tracePool.take();
+
+    try {
+      encodedReader = orcReader.encodedReader(fileKey, cache, rawDataReader, null, ioTrace, false, cacheTag, false);
+      encodedReader.readDataRanges(ranges);
+    } finally {
+      tracePool.offer(ioTrace);
+    }
+  }
+
+  public void readFooter() {

Review comment:
       I've renamed the method to describe the behaviour.

##########
File path: llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LowLevelDataReader.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.io.orc.encoded.LlapDataReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcCodecPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class LowLevelDataReader implements AutoCloseable {
+
+  private Path path;
+  private Object fileKey;
+  private Configuration daemonConf;
+  private DataCache cache;
+  private FileMetadataCache metadataCache;
+  private CacheTag cacheTag;
+  private FixedSizedObjectPool<IoTrace> tracePool;
+
+  private Supplier<FileSystem> fsSupplier;
+
+  private Reader orcReader;
+  private LlapDataReader rawDataReader;
+  private EncodedReader encodedReader;
+
+
+  public LowLevelDataReader(Path path, Object fileKey, Configuration daemonConf, DataCache cache,
+      FileMetadataCache metadataCache, CacheTag cacheTag, FixedSizedObjectPool<IoTrace> tracePool) {
+    this.path = path;
+    this.fileKey = fileKey;
+    this.daemonConf = daemonConf;
+    this.cache = cache;
+    this.metadataCache = metadataCache;
+    this.cacheTag = cacheTag;
+    this.tracePool = tracePool;
+  }
+
+  public void init() throws IOException {
+    this.fsSupplier = getFsSupplier(path, daemonConf);
+    Object fileKey = HdfsUtils.getFileId(fsSupplier.get(), path,
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
+          !HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_USE_FILEID_PATH));
+    if(!fileKey.equals(this.fileKey)) {
+      throw new IOException("File key mismatch.");
+    }
+    OrcFile.ReaderOptions opts = EncodedOrcFile.readerOptions(daemonConf).filesystem(fsSupplier);
+    orcReader = EncodedOrcFile.createReader(path, opts);
+  }
+
+  private static Supplier<FileSystem> getFsSupplier(final Path path, final Configuration conf) {
+    return () -> {
+      try {
+        return path.getFileSystem(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  public void read(DiskRangeList ranges) throws IOException {
+    boolean useZeroCopy = (daemonConf != null) && OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
+    InStream.StreamOptions options = InStream.options()
+        .withCodec(OrcCodecPool.getCodec(orcReader.getCompressionKind()))
+        .withBufferSize(orcReader.getCompressionSize());
+
+    rawDataReader = LlapRecordReaderUtils.createDefaultLlapDataReader(
+        DataReaderProperties.builder()
+            .withFileSystemSupplier(fsSupplier).withPath(path)
+            .withCompression(options)
+            .withZeroCopy(useZeroCopy)
+            .build());
+    rawDataReader.open();
+
+    IoTrace ioTrace = tracePool.take();
+
+    try {
+      encodedReader = orcReader.encodedReader(fileKey, cache, rawDataReader, null, ioTrace, false, cacheTag, false);
+      encodedReader.readDataRanges(ranges);
+    } finally {
+      tracePool.offer(ioTrace);
+    }
+  }
+
+  public void readFooter() {
+    MemoryBufferOrBuffers tailBuffers = metadataCache.getFileMetadata(fileKey);
+    if (tailBuffers == null) {
+      ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
+      metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag, null);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (encodedReader != null) {

Review comment:
       Nope, the EncodedReaderImpl takes care of it. See:
   https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java#L820

##########
File path: llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LowLevelDataReader.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
+import org.apache.hadoop.hive.ql.io.orc.encoded.LlapDataReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcCodecPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class LowLevelDataReader implements AutoCloseable {
+
+  private Path path;
+  private Object fileKey;
+  private Configuration daemonConf;
+  private DataCache cache;
+  private FileMetadataCache metadataCache;
+  private CacheTag cacheTag;
+  private FixedSizedObjectPool<IoTrace> tracePool;
+
+  private Supplier<FileSystem> fsSupplier;
+
+  private Reader orcReader;
+  private LlapDataReader rawDataReader;
+  private EncodedReader encodedReader;
+
+
+  public LowLevelDataReader(Path path, Object fileKey, Configuration daemonConf, DataCache cache,
+      FileMetadataCache metadataCache, CacheTag cacheTag, FixedSizedObjectPool<IoTrace> tracePool) {
+    this.path = path;
+    this.fileKey = fileKey;
+    this.daemonConf = daemonConf;
+    this.cache = cache;
+    this.metadataCache = metadataCache;
+    this.cacheTag = cacheTag;
+    this.tracePool = tracePool;
+  }
+
+  public void init() throws IOException {
+    this.fsSupplier = getFsSupplier(path, daemonConf);
+    Object fileKey = HdfsUtils.getFileId(fsSupplier.get(), path,
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
+          !HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_USE_FILEID_PATH));
+    if(!fileKey.equals(this.fileKey)) {
+      throw new IOException("File key mismatch.");
+    }
+    OrcFile.ReaderOptions opts = EncodedOrcFile.readerOptions(daemonConf).filesystem(fsSupplier);
+    orcReader = EncodedOrcFile.createReader(path, opts);
+  }
+
+  private static Supplier<FileSystem> getFsSupplier(final Path path, final Configuration conf) {
+    return () -> {
+      try {
+        return path.getFileSystem(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  public void read(DiskRangeList ranges) throws IOException {
+    boolean useZeroCopy = (daemonConf != null) && OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
+    InStream.StreamOptions options = InStream.options()
+        .withCodec(OrcCodecPool.getCodec(orcReader.getCompressionKind()))
+        .withBufferSize(orcReader.getCompressionSize());
+
+    rawDataReader = LlapRecordReaderUtils.createDefaultLlapDataReader(
+        DataReaderProperties.builder()
+            .withFileSystemSupplier(fsSupplier).withPath(path)
+            .withCompression(options)
+            .withZeroCopy(useZeroCopy)
+            .build());
+    rawDataReader.open();
+
+    IoTrace ioTrace = tracePool.take();
+
+    try {
+      encodedReader = orcReader.encodedReader(fileKey, cache, rawDataReader, null, ioTrace, false, cacheTag, false);

Review comment:
       Comment added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org