You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/07/14 08:55:03 UTC

[hive] branch master updated: HIVE-23824: LLAP - add API to look up ORC metadata for certain Path (Adam Szita reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b8adcfc  HIVE-23824: LLAP - add API to look up ORC metadata for certain Path (Adam Szita reviewed by Peter Vary)
b8adcfc is described below

commit b8adcfc02d4b2fa91a837caf42f47cfde496c77a
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Tue Jul 14 10:54:46 2020 +0200

    HIVE-23824: LLAP - add API to look up ORC metadata for certain Path (Adam Szita reviewed by Peter Vary)
    
    Closes (#1238)
---
 .../llap/IllegalCacheConfigurationException.java   |  30 +++++
 .../org/apache/hadoop/hive/llap/io/api/LlapIo.java |  17 +++
 .../hadoop/hive/llap/io/api/impl/LlapIoImpl.java   |  14 ++-
 .../hive/llap/io/encoded/OrcEncodedDataReader.java | 138 +++++++++++++++------
 .../hive/llap/cache/TestOrcMetadataCache.java      |  36 ++++++
 5 files changed, 193 insertions(+), 42 deletions(-)

diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/IllegalCacheConfigurationException.java b/llap-client/src/java/org/apache/hadoop/hive/llap/IllegalCacheConfigurationException.java
new file mode 100644
index 0000000..a7bd34c
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/IllegalCacheConfigurationException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+
+/**
+ * Can be used to signal if LLAP IO cache is not configured properly (if at all) to carry out an operation that relies
+ * on the cache being properly configured and available.
+ */
+public class IllegalCacheConfigurationException extends IOException {
+  public IllegalCacheConfigurationException(String message) {
+    super(message);
+  }
+}
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index 4a5bf73..d94a586 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -18,10 +18,16 @@
 
 package org.apache.hadoop.hive.llap.io.api;
 
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.orc.impl.OrcTail;
 
 public interface LlapIo<T> {
   InputFormat<NullWritable, T> getInputFormat(
@@ -36,6 +42,17 @@ public interface LlapIo<T> {
   long purge();
 
   /**
+   * Returns a deserialized OrcTail instance associated with the ORC file on the given path.
+   * Raw content is either obtained from cache, or from disk if there is a cache miss.
+   * @param path Orc file path
+   * @param conf jobConf
+   * @param tag a CacheTag instance must be provided as that's needed for cache insertion
+   * @return
+   * @throws IOException
+   */
+  OrcTail getOrcTailFromCache(Path path, Configuration conf, CacheTag tag) throws IOException;
+
+  /**
    * Handles request to evict entities specified in the request object.
    * @param protoRequest lists Hive entities (DB, table, etc..) whose LLAP buffers should be evicted.
    * @return number of evicted bytes.
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 0c0e52e..da52734 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -29,6 +29,7 @@ import java.util.function.Predicate;
 
 import javax.management.ObjectName;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.llap.ProactiveEviction;
 import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
@@ -79,11 +81,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hive.common.util.FixedSizedObjectPool;
-
-
-
-
-
+import org.apache.orc.impl.OrcTail;
 
 
 import com.google.common.primitives.Ints;
@@ -363,4 +361,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
       child.debugDumpShort(sb);
     }
   }
+
+  @Override
+  public OrcTail getOrcTailFromCache(Path path, Configuration jobConf, CacheTag tag)
+      throws IOException {
+    return OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, (MetadataCache) fileMetadataCache);
+  }
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 479656f..5db26d6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.IllegalCacheConfigurationException;
 import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
@@ -225,11 +226,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
     fsSupplier = getFsSupplier(split.getPath(), jobConf);
-    fileKey = determineFileId(fsSupplier, split,
-        HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
-        HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
-        !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)
-        );
+    fileKey = determineFileId(fsSupplier, split, daemonConf);
     fileMetadata = getFileFooterFromCacheOrDisk();
     final TypeDescription fileSchema = fileMetadata.getSchema();
 
@@ -497,8 +494,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private static Object determineFileId(Supplier<FileSystem> fsSupplier,
-    FileSplit split,
-      boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException {
+    FileSplit split, Configuration daemonConf) throws IOException {
+
     if (split instanceof OrcSplit) {
       Object fileKey = ((OrcSplit)split).getFileKey();
       if (fileKey != null) {
@@ -506,7 +503,17 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       }
     }
     LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
-    return HdfsUtils.getFileId(fsSupplier.get(), split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic);
+    return determineFileId(fsSupplier, split.getPath(), daemonConf);
+  }
+
+  private static Object determineFileId(Supplier<FileSystem> fsSupplier, Path path, Configuration daemonConf)
+      throws IOException {
+
+    boolean allowSynthetic = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID);
+    boolean checkDefaultFs = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID);
+    boolean forceSynthetic = !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH);
+
+    return HdfsUtils.getFileId(fsSupplier.get(), path, allowSynthetic, checkDefaultFs, forceSynthetic);
   }
 
   /**
@@ -564,6 +571,50 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   }
 
   /**
+   * Looks up metadata for the given Orc file in the cache. Will read it in, in case of a cache miss.
+   * @param path
+   * @param jobConf
+   * @param tag
+   * @param daemonConf
+   * @param metadataCache
+   * @return
+   * @throws IOException
+   */
+  public static OrcTail getOrcTailForPath(Path path, Configuration jobConf, CacheTag tag,
+      Configuration daemonConf, MetadataCache metadataCache) throws IOException {
+    Supplier<FileSystem> fsSupplier = getFsSupplier(path, jobConf);
+    Object fileKey = determineFileId(fsSupplier, path, daemonConf);
+
+    if(fileKey == null || metadataCache == null) {
+      throw new IllegalCacheConfigurationException("LLAP metadata cache not available for path " + path.toString());
+    }
+
+    LlapBufferOrBuffers tailBuffers = metadataCache.getFileMetadata(fileKey);
+    try {
+      // Cache hit
+      if (tailBuffers != null) {
+        return getOrcTailFromLlapBuffers(tailBuffers);
+      }
+
+      // Cache miss
+      throwIfCacheOnlyRead(HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY));
+
+      ReaderOptions opts = EncodedOrcFile.readerOptions(jobConf).filesystem(fsSupplier);
+      Reader reader = EncodedOrcFile.createReader(path, opts);
+      ByteBuffer tailBufferBb = reader.getSerializedFileFooter();
+      tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, tag, new AtomicBoolean(false));
+      return getOrcTailFromLlapBuffers(tailBuffers);
+
+    } finally {
+      // By this time buffers got locked at either cache look up or cache insert times.
+      if (tailBuffers != null) {
+        metadataCache.decRefBuffer(tailBuffers);
+      }
+    }
+
+  }
+
+  /**
    *  Gets file metadata for the split from cache, or reads it from the file.
    */
   private OrcFileMetadata getFileFooterFromCacheOrDisk() throws IOException {
@@ -575,31 +626,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       tailBuffers = metadataCache.getFileMetadata(fileKey);
       if (tailBuffers != null) {
         try {
-          MemoryBuffer tailBuffer = tailBuffers.getSingleBuffer();
-          ByteBuffer bb = null;
-          if (tailBuffer != null) {
-            bb = tailBuffer.getByteBufferDup();
-            // TODO: remove the copy after ORC-158 and ORC-197
-            // if (bb.isDirect()) {
-              ByteBuffer dupBb = tailBuffer.getByteBufferDup(); // Don't mess with the cached object.
-              bb = ByteBuffer.allocate(dupBb.remaining());
-              bb.put(dupBb);
-              bb.flip();
-            // }
-          } else {
-            // TODO: add the ability to extractFileTail to read from multiple buffers?
-            MemoryBuffer[] tailBufferArray = tailBuffers.getMultipleBuffers();
-            int totalSize = 0;
-            for (MemoryBuffer buf : tailBufferArray) {
-              totalSize += buf.getByteBufferRaw().remaining();
-            }
-            bb = ByteBuffer.allocate(totalSize);
-            for (MemoryBuffer buf : tailBufferArray) {
-              bb.put(buf.getByteBufferDup());
-            }
-            bb.flip();
-          }
-          OrcTail orcTail = ReaderImpl.extractFileTail(bb);
+          OrcTail orcTail = getOrcTailFromLlapBuffers(tailBuffers);
+          counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
           FileTail tail = orcTail.getFileTail();
           stats = orcTail.getStripeStatisticsProto();
           stripes = new ArrayList<>(tail.getFooter().getStripesCount());
@@ -608,15 +636,15 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
           }
           return new OrcFileMetadata(
               fileKey, tail.getFooter(), tail.getPostscript(), stats, stripes,
-            ReaderImpl.getFileVersion(tail.getPostscript().getVersionList()));
+              ReaderImpl.getFileVersion(tail.getPostscript().getVersionList()));
         } finally {
           // We don't need the buffer anymore.
           metadataCache.decRefBuffer(tailBuffers);
-          counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
         }
+      } else {
+        counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
+        throwIfCacheOnlyRead(isReadCacheOnly);
       }
-      counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
-      throwIfCacheOnlyRead(isReadCacheOnly);
     }
     ensureOrcReader();
     ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
@@ -629,6 +657,42 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         orcReader.getOrcProtoStripeStatistics(), orcReader.getStripes(), orcReader.getFileVersion());
   }
 
+
+  /**
+   * Utility function to produce a deseralized OrcTail instance from LLAP buffers retrieved from metadata cache.
+   * Expects buffers are already locked before invocation, and caller releases them thereafter.
+   * @param tailBuffers
+   * @return
+   * @throws IOException
+   */
+  private static OrcTail getOrcTailFromLlapBuffers(LlapBufferOrBuffers tailBuffers) throws IOException {
+    MemoryBuffer tailBuffer = tailBuffers.getSingleBuffer();
+    ByteBuffer bb = null;
+    if (tailBuffer != null) {
+      bb = tailBuffer.getByteBufferDup();
+      // TODO: remove the copy after ORC-158 and ORC-197
+      // if (bb.isDirect()) {
+      ByteBuffer dupBb = tailBuffer.getByteBufferDup(); // Don't mess with the cached object.
+      bb = ByteBuffer.allocate(dupBb.remaining());
+      bb.put(dupBb);
+      bb.flip();
+      // }
+    } else {
+      // TODO: add the ability to extractFileTail to read from multiple buffers?
+      MemoryBuffer[] tailBufferArray = tailBuffers.getMultipleBuffers();
+      int totalSize = 0;
+      for (MemoryBuffer buf : tailBufferArray) {
+        totalSize += buf.getByteBufferRaw().remaining();
+      }
+      bb = ByteBuffer.allocate(totalSize);
+      for (MemoryBuffer buf : tailBufferArray) {
+        bb.put(buf.getByteBufferDup());
+      }
+      bb.flip();
+    }
+    return ReaderImpl.extractFileTail(bb);
+  }
+
   private OrcProto.StripeFooter buildStripeFooter(
       List<DiskRange> bcs, int len, CompressionCodec codec, int bufferSize) throws IOException {
     return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream(
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 1534864..818d43a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -24,16 +24,24 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.IllegalCacheConfigurationException;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IncompleteCb;
+import org.apache.orc.impl.OrcTail;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -228,6 +236,34 @@ public class TestOrcMetadataCache {
     verifyResult(result, DRL, 4, 5);
   }
 
+  @Test
+  public void testGetOrcTailForPath() throws Exception {
+    DummyMemoryManager mm = new DummyMemoryManager();
+    DummyCachePolicy cp = new DummyCachePolicy();
+    final int MAX_ALLOC = 64;
+    LlapDaemonCacheMetrics metrics = LlapDaemonCacheMetrics.create("", "");
+    BuddyAllocator alloc = new BuddyAllocator(
+        false, false, 8, MAX_ALLOC, 1, 4096, 0, null, mm, metrics, null, true);
+    MetadataCache cache = new MetadataCache(alloc, mm, cp, true, metrics);
+
+    Path path = new Path("../data/files/alltypesorc");
+    Configuration jobConf = new Configuration();
+    Configuration daemonConf = new Configuration();
+    CacheTag tag = CacheTag.build("test-table");
+    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache);
+    jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
+    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache);
+    assertEquals(uncached.getSerializedTail(), cached.getSerializedTail());
+    assertEquals(uncached.getFileTail(), cached.getFileTail());
+  }
+
+  @Test(expected = IllegalCacheConfigurationException.class)
+  public void testGetOrcTailForPathCacheNotReady() throws Exception {
+    Path path = new Path("../data/files/alltypesorc");
+    Configuration conf = new Configuration();
+    OrcEncodedDataReader.getOrcTailForPath(path, conf, null, conf, null);
+  }
+
   private static final int INCOMPLETE = 0, DRL = 1;
   public void verifyResult(DiskRangeList result, long... vals) {
     for (int i = 0; i < vals.length; i += 3) {