You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/14 02:44:06 UTC

hive git commit: HIVE-11856 : allow split strategies to run on threadpool (Sergey Shelukhin, reviewed by Vikram Dixit K)

Repository: hive
Updated Branches:
  refs/heads/master 7efb7308c -> d7f1b465b


HIVE-11856 : allow split strategies to run on threadpool (Sergey Shelukhin, reviewed by Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d7f1b465
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d7f1b465
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d7f1b465

Branch: refs/heads/master
Commit: d7f1b465b9853c772abd8ad4d8ead175f8ed5cb9
Parents: 7efb730
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Oct 13 16:10:55 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Oct 13 17:43:54 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 65 +++++++++++++++++---
 1 file changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d7f1b465/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 690f8dd..b03e055 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -452,7 +452,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     // We store all caches in variables to change the main one based on config.
     // This is not thread safe between different split generations (and wasn't anyway).
-    private static FooterCache footerCache;
+    private FooterCache footerCache;
     private static LocalCache localCache;
     private static MetastoreCache metaCache;
     private static ExecutorService threadPool = null;
@@ -596,7 +596,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * ETL strategy is used when spending little more time in split generation is acceptable
    * (split generation reads and caches file footers).
    */
-  static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
+  static final class ETLSplitStrategy implements SplitStrategy<SplitInfo>, Callable<Void> {
     Context context;
     FileSystem fs;
     List<HdfsFileStatusWithId> files;
@@ -604,6 +604,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     List<DeltaMetaData> deltas;
     Path dir;
     boolean[] covered;
+    private List<Future<List<OrcSplit>>> splitFuturesRef;
 
     public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData> deltas,
@@ -629,7 +630,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
       // Force local cache if we have deltas.
       FooterCache cache = context.cacheStripeDetails ?
-          (deltas == null ? Context.footerCache : Context.localCache) : null;
+          (deltas == null ? context.footerCache : Context.localCache) : null;
       if (cache != null) {
         FileInfo[] infos = cache.getAndValidate(files);
         for (int i = 0; i < files.size(); ++i) {
@@ -661,6 +662,34 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     public String toString() {
       return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dir;
     }
+
+    public Future<Void> generateSplitWork(
+        Context context, List<Future<List<OrcSplit>>> splitFutures) throws IOException {
+      if (context.cacheStripeDetails && context.footerCache.isBlocking()) {
+        this.splitFuturesRef = splitFutures;
+        return Context.threadPool.submit(this);
+      } else {
+        runGetSplitsSync(splitFutures);
+        return null;
+      }
+    }
+
+    @Override
+    public Void call() throws IOException {
+      runGetSplitsSync(splitFuturesRef);
+      return null;
+    }
+
+    private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures) throws IOException {
+      List<SplitInfo> splits = getSplits();
+      List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size());
+      for (SplitInfo splitInfo : splits) {
+        localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo)));
+      }
+      synchronized (splitFutures) {
+        splitFutures.addAll(localList);
+      }
+    }
   }
 
   /**
@@ -1028,7 +1057,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               && fileInfo.writerVersion != null;
           // We assume that if we needed to create a reader, we need to cache it to meta cache.
           // TODO: This will also needlessly overwrite it in local cache for now.
-          Context.footerCache.put(fileWithId.getFileId(), file, fileInfo.fileMetaInfo, orcReader);
+          context.footerCache.put(fileWithId.getFileId(), file, fileInfo.fileMetaInfo, orcReader);
         }
       } else {
         Reader orcReader = createOrcReader();
@@ -1041,7 +1070,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             ((ReaderImpl) orcReader).getFileMetaInfo() : null;
         if (context.cacheStripeDetails) {
           Long fileId = fileWithId.getFileId();
-          Context.footerCache.put(fileId, file, fileMetaInfo, orcReader);
+          context.footerCache.put(fileId, file, fileMetaInfo, orcReader);
         }
       }
       includedCols = genIncludedColumns(types, context.conf, isOriginal);
@@ -1084,7 +1113,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
     List<OrcSplit> splits = Lists.newArrayList();
     List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
-    List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
+    List<Future<Void>> strategyFutures = Lists.newArrayList();
+    final List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
 
     // multi-threaded file statuses and split strategy
     Path[] paths = getInputPaths(conf);
@@ -1109,9 +1139,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         // Hack note - different split strategies return differently typed lists, yay Java.
         // This works purely by magic, because we know which strategy produces which type.
         if (splitStrategy instanceof ETLSplitStrategy) {
-          List<SplitInfo> splitInfos = ((ETLSplitStrategy)splitStrategy).getSplits();
-          for (SplitInfo splitInfo : splitInfos) {
-            splitFutures.add(Context.threadPool.submit(new SplitGenerator(splitInfo)));
+          Future<Void> ssFuture = ((ETLSplitStrategy)splitStrategy).generateSplitWork(
+              context, splitFutures);
+          if (ssFuture != null) {
+            strategyFutures.add(ssFuture);
           }
         } else {
           @SuppressWarnings("unchecked")
@@ -1121,11 +1152,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
 
       // complete split futures
+      for (Future<Void> ssFuture : strategyFutures) {
+         ssFuture.get(); // Make sure we get exceptions strategies might have thrown.
+      }
+      // All the split strategies are done, so it must be safe to access splitFutures.
       for (Future<List<OrcSplit>> splitFuture : splitFutures) {
         splits.addAll(splitFuture.get());
       }
     } catch (Exception e) {
       cancelFutures(pathFutures);
+      cancelFutures(strategyFutures);
       cancelFutures(splitFutures);
       throw new RuntimeException("ORC split generation failed with exception: " + e.getMessage(), e);
     }
@@ -1557,6 +1593,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    */
   public interface FooterCache {
     FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException;
+    boolean isBlocking();
     void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
         throws IOException;
   }
@@ -1619,6 +1656,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           orcReader.getOrcProtoFileStatistics(), fileMetaInfo, orcReader.getWriterVersion(),
           fileId));
     }
+
+    @Override
+    public boolean isBlocking() {
+      return false;
+    }
   }
 
   /** Metastore-based footer cache storing serialized footers. Also has a local cache. */
@@ -1734,5 +1776,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     public void configure(HiveConf queryConfig) {
       this.conf = queryConfig;
     }
+
+    @Override
+    public boolean isBlocking() {
+      return true;
+    }
   }
 }