You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/24 07:25:52 UTC

svn commit: r1535281 - in /hive/branches/tez: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/java/org/apache/hadoop/hive/ql/log/

Author: gunther
Date: Thu Oct 24 05:25:51 2013
New Revision: 1535281

URL: http://svn.apache.org/r1535281
Log:
HIVE-5639: Allow caching of Orc footers in Tez AM (Siddharth Seth via Gunther Hagleitner)

Modified:
    hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java

Modified: hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Oct 24 05:25:51 2013
@@ -516,6 +516,10 @@ public class HiveConf extends Configurat
 
     HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f),
 
+    HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
+    HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),
+    HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10),
+
     HIVESKEWJOIN("hive.optimize.skewjoin", false),
     HIVECONVERTJOIN("hive.auto.convert.join", true),
     HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Oct 24 05:25:51 2013
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -58,6 +60,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * A MapReduce/Hive input format for ORC files.
  */
@@ -69,6 +75,7 @@ public class OrcInputFormat  implements 
   private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
   static final String MIN_SPLIT_SIZE = "mapred.min.split.size";
   static final String MAX_SPLIT_SIZE = "mapred.max.split.size";
+
   private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
   private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
 
@@ -248,11 +255,15 @@ public class OrcInputFormat  implements 
       //If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit
       reader = OrcFile.createReader(fs, path);
     } else {
-      //We have OrcSplit, which has footer metadata cached, so used the appropriate reader
+      //We have OrcSplit, which may have footer metadata cached, so use the appropriate reader
       //constructor
       OrcSplit orcSplit = (OrcSplit) fSplit;
-      FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
-      reader = OrcFile.createReader(fs, path, fMetaInfo);
+      if (orcSplit.hasFooter()) {
+        FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
+        reader = OrcFile.createReader(fs, path, fMetaInfo);
+      } else {
+        reader = OrcFile.createReader(fs, path);
+      }
     }
     return new OrcRecordReader(reader, conf, fSplit.getStart(), fSplit.getLength());
   }
@@ -311,13 +322,17 @@ public class OrcInputFormat  implements 
    * the different worker threads.
    */
   static class Context {
-    private final ExecutorService threadPool = Executors.newFixedThreadPool(10);
+    private static Cache<Path, FileInfo> footerCache;
+    private final ExecutorService threadPool;
     private final List<OrcSplit> splits = new ArrayList<OrcSplit>(10000);
     private final List<Throwable> errors = new ArrayList<Throwable>();
     private final HadoopShims shims = ShimLoader.getHadoopShims();
-    private final Configuration conf;
     private final long maxSize;
     private final long minSize;
+    private final boolean footerInSplits;
+    private final boolean cacheStripeDetails;
+    private final AtomicInteger cacheHitCounter = new AtomicInteger(0);
+    private final AtomicInteger numFilesCounter = new AtomicInteger(0);
 
     /**
      * A count of the number of threads that may create more work for the
@@ -326,9 +341,24 @@ public class OrcInputFormat  implements 
     private int schedulers = 0;
 
     Context(Configuration conf) {
-      this.conf = conf;
       minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE);
       maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE);
+      footerInSplits = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS);
+      int cacheStripeDetailsSize = HiveConf.getIntVar(conf,
+          ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE);
+      int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
+
+      cacheStripeDetails = (cacheStripeDetailsSize > 0);
+
+      threadPool = Executors.newFixedThreadPool(numThreads,
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ORC_GET_SPLITS #%d").build());
+
+      synchronized (Context.class) {
+        if (footerCache == null && cacheStripeDetails) {
+          footerCache = CacheBuilder.newBuilder().concurrencyLevel(numThreads)
+              .initialCapacity(cacheStripeDetailsSize).softValues().build();
+        }
+      }
     }
 
     int getSchedulers() {
@@ -413,16 +443,22 @@ public class OrcInputFormat  implements 
     @Override
     public void run() {
       try {
+        perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_BLOCK_LOCATIONS);
         Iterator<FileStatus> itr = context.shims.listLocatedStatus(fs, dir,
             hiddenFileFilter);
         while (itr.hasNext()) {
           FileStatus file = itr.next();
           if (!file.isDir()) {
-            context.schedule(new SplitGenerator(context, fs, file));
+            FileInfo fileInfo = null;
+            if (context.cacheStripeDetails) {
+              fileInfo = verifyCachedFileInfo(file);
+            }
+            context.schedule(new SplitGenerator(context, fs, file, fileInfo));
           }
         }
         // mark the fact that we are done
         context.decrementSchedulers();
+        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_BLOCK_LOCATIONS);
       } catch (Throwable th) {
         context.decrementSchedulers();
         synchronized (context.errors) {
@@ -430,6 +466,34 @@ public class OrcInputFormat  implements 
         }
       }
     }
+
+    private FileInfo verifyCachedFileInfo(FileStatus file) {
+      context.numFilesCounter.incrementAndGet();
+      FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath());
+      if (fileInfo != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Info cached for path: " + file.getPath());
+        }
+        if (fileInfo.modificationTime == file.getModificationTime() && fileInfo.size == file.getLen()) {
+          // Cached copy is valid
+          context.cacheHitCounter.incrementAndGet();
+          return fileInfo;
+        } else {
+          // Invalidate
+          Context.footerCache.invalidate(file.getPath());
+          LOG.info("Meta-Info for : " + file.getPath() + " changed. CachedModificationTime: "
+              + fileInfo.modificationTime + ", CurrentModificationTime: "
+              + file.getModificationTime()
+              + ", CachedLength: " + fileInfo.size + ", CurrentLength: " + file.getLen());
+        }
+      } else {
+        LOG.info("Info not cached for path: " + file.getPath());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Info not cached for path: " + file.getPath());
+        }
+      }
+      return null;
+    }
   }
 
   /**
@@ -442,13 +506,18 @@ public class OrcInputFormat  implements 
     private final FileStatus file;
     private final long blockSize;
     private final BlockLocation[] locations;
+    private final FileInfo fileInfo;
+    private Iterable<StripeInformation> stripes;
+    private FileMetaInfo fileMetaInfo;
+
 
     SplitGenerator(Context context, FileSystem fs,
-                   FileStatus file) throws IOException {
+                   FileStatus file, FileInfo fileInfo) throws IOException {
       this.context = context;
       this.fs = fs;
       this.file = file;
       this.blockSize = file.getBlockSize();
+      this.fileInfo = fileInfo;
       locations = context.shims.getLocations(fs, file);
     }
 
@@ -547,15 +616,15 @@ public class OrcInputFormat  implements 
     public void run() {
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CREATE_ORC_SPLITS);
       try {
-        Reader orcReader = OrcFile.createReader(fs, file.getPath());
+        populateAndCacheStripeDetails();
         long currentOffset = -1;
         long currentLength = 0;
-        for(StripeInformation stripe: orcReader.getStripes()) {
+        for(StripeInformation stripe: stripes) {
           // if we are working on a stripe, over the min stripe size, and
           // crossed a block boundary, cut the input split here.
           if (currentOffset != -1 && currentLength > context.minSize &&
               (currentOffset / blockSize != stripe.getOffset() / blockSize)) {
-            createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo());
+            createSplit(currentOffset, currentLength, fileMetaInfo);
             currentOffset = -1;
           }
           // if we aren't building a split, start a new one.
@@ -566,12 +635,12 @@ public class OrcInputFormat  implements 
             currentLength += stripe.getLength();
           }
           if (currentLength >= context.maxSize) {
-            createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo());
+            createSplit(currentOffset, currentLength, fileMetaInfo);
             currentOffset = -1;
           }
         }
         if (currentOffset != -1) {
-          createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo());
+          createSplit(currentOffset, currentLength, fileMetaInfo);
         }
       } catch (Throwable th) {
         synchronized (context.errors) {
@@ -580,12 +649,48 @@ public class OrcInputFormat  implements 
       }
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CREATE_ORC_SPLITS);
     }
+
+
+
+
+    private void populateAndCacheStripeDetails() {
+      try {
+        Reader orcReader;
+        boolean found = false;
+        if (fileInfo != null) {
+          found = true;
+          stripes = fileInfo.stripeInfos;
+          fileMetaInfo = fileInfo.fileMetaInfo;
+          // For multiple runs, in case sendSplitsInFooter changes
+          if (fileMetaInfo == null && context.footerInSplits) {
+            orcReader = OrcFile.createReader(fs, file.getPath());
+            fileInfo.fileMetaInfo = orcReader.getFileMetaInfo();
+          }
+        }
+        if (!found) {
+          orcReader = OrcFile.createReader(fs, file.getPath());
+          stripes = orcReader.getStripes();
+          fileMetaInfo = context.footerInSplits ? orcReader.getFileMetaInfo() : null;
+          if (context.cacheStripeDetails) {
+            // Populate into cache.
+            Context.footerCache.put(file.getPath(),
+                new FileInfo(file.getModificationTime(), file.getLen(), stripes, fileMetaInfo));
+          }
+        }
+      } catch (Throwable th) {
+        synchronized (context.errors) {
+          context.errors.add(th);
+        }
+      }
+    }
+
   }
 
   @Override
   public InputSplit[] getSplits(JobConf job,
                                 int numSplits) throws IOException {
     // use threads to resolve directories into splits
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
     Context context = new Context(job);
     for(Path dir: getInputPaths(job)) {
       FileSystem fs = dir.getFileSystem(job);
@@ -607,6 +712,32 @@ public class OrcInputFormat  implements 
     }
     InputSplit[] result = new InputSplit[context.splits.size()];
     context.splits.toArray(result);
+    if (context.cacheStripeDetails) {
+      LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
+          + context.numFilesCounter.get());
+    }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
     return result;
   }
-}
+
+  /**
+   * FileInfo.
+   *
+   * Stores information relevant to split generation for an ORC File.
+   *
+   */
+  private static class FileInfo {
+    long modificationTime;
+    long size;
+    Iterable<StripeInformation> stripeInfos;
+    FileMetaInfo fileMetaInfo;
+
+    FileInfo(long modificationTime, long size, Iterable<StripeInformation> stripeInfos,
+        FileMetaInfo fileMetaInfo) {
+      this.modificationTime = modificationTime;
+      this.size = size;
+      this.stripeInfos = stripeInfos;
+      this.fileMetaInfo = fileMetaInfo;
+    }
+  }
+}
\ No newline at end of file

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Thu Oct 24 05:25:51 2013
@@ -19,6 +19,7 @@ import org.apache.hadoop.mapred.FileSpli
  */
 public class OrcSplit extends FileSplit {
   private Reader.FileMetaInfo fileMetaInfo;
+  private boolean hasFooter;
 
   protected OrcSplit(){
     //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
@@ -31,6 +32,7 @@ public class OrcSplit extends FileSplit 
       FileMetaInfo fileMetaInfo) {
     super(path, offset, length, hosts);
     this.fileMetaInfo = fileMetaInfo;
+    hasFooter = this.fileMetaInfo != null;
   }
 
   @Override
@@ -38,16 +40,22 @@ public class OrcSplit extends FileSplit 
     //serialize path, offset, length using FileSplit
     super.write(out);
 
-    //serialize FileMetaInfo fields
-    Text.writeString(out, fileMetaInfo.compressionType);
-    WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
-
-    //serialize FileMetaInfo field footer
-    ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
-    footerBuff.reset();
-    //write length of buffer
-    WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
-    out.write(footerBuff.array(), footerBuff.position(), footerBuff.limit() - footerBuff.position());
+    // Whether footer information follows.
+    out.writeBoolean(hasFooter);
+
+    if (hasFooter) {
+      // serialize FileMetaInfo fields
+      Text.writeString(out, fileMetaInfo.compressionType);
+      WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
+
+      // serialize FileMetaInfo field footer
+      ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
+      footerBuff.reset();
+      // write length of buffer
+      WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
+      out.write(footerBuff.array(), footerBuff.position(),
+          footerBuff.limit() - footerBuff.position());
+    }
   }
 
   @Override
@@ -55,20 +63,27 @@ public class OrcSplit extends FileSplit 
     //deserialize path, offset, length using FileSplit
     super.readFields(in);
 
-    //deserialize FileMetaInfo fields
-    String compressionType = Text.readString(in);
-    int bufferSize = WritableUtils.readVInt(in);
-
-    //deserialize FileMetaInfo field footer
-    int footerBuffSize = WritableUtils.readVInt(in);
-    ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
-    in.readFully(footerBuff.array(), 0, footerBuffSize);
+    hasFooter = in.readBoolean();
+
+    if (hasFooter) {
+      // deserialize FileMetaInfo fields
+      String compressionType = Text.readString(in);
+      int bufferSize = WritableUtils.readVInt(in);
+
+      // deserialize FileMetaInfo field footer
+      int footerBuffSize = WritableUtils.readVInt(in);
+      ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
+      in.readFully(footerBuff.array(), 0, footerBuffSize);
 
-    fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff);
+      fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff);
+    }
   }
 
   public FileMetaInfo getFileMetaInfo(){
     return fileMetaInfo;
   }
 
+  public boolean hasFooter() {
+    return hasFooter;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Thu Oct 24 05:25:51 2013
@@ -153,11 +153,15 @@ public class VectorizedOrcInputFormat ex
       //If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit
       reader = OrcFile.createReader(fs, path);
     } else {
-      //We have OrcSplit, which has footer metadata cached, so used the appropriate reader
+      //We have OrcSplit, which may have footer metadata cached, so use the appropriate reader
       //constructor
       OrcSplit orcSplit = (OrcSplit) fSplit;
-      FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
-      reader = OrcFile.createReader(fs, path, fMetaInfo);
+      if (orcSplit.hasFooter()) {
+        FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
+        reader = OrcFile.createReader(fs, path, fMetaInfo);
+      } else {
+        reader = OrcFile.createReader(fs, path);
+      }
     }
 
     return new VectorizedOrcRecordReader(reader, conf, fSplit);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Thu Oct 24 05:25:51 2013
@@ -65,6 +65,8 @@ public class PerfLogger {
   public static final String LOAD_HASHTABLE = "LoadHashtable";
   public static final String INIT_ORC_RECORD_READER = "OrcRecordReaderInit";
   public static final String CREATE_ORC_SPLITS = "OrcCreateSplits";
+  public static final String ORC_GET_SPLITS = "OrcGetSplits";
+  public static final String ORC_GET_BLOCK_LOCATIONS = "OrcGetBlockLocations";
 
   protected static final ThreadLocal<PerfLogger> perfLogger = new ThreadLocal<PerfLogger>();