You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/04 04:55:05 UTC

[16/28] hive git commit: HIVE-11689 : minor flow changes to ORC split generation (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Swarnim Kulkarni)

HIVE-11689 : minor flow changes to ORC split generation (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Swarnim Kulkarni)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f530f44d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f530f44d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f530f44d

Branch: refs/heads/llap
Commit: f530f44d1d95c2da2485d53f0855f8f8e0646005
Parents: c0690a6
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Sep 1 11:23:14 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Sep 1 11:23:14 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 169 +++++++++++--------
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  13 +-
 2 files changed, 107 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f530f44d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 8c138b9..05efc5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat.DeltaMetaData;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -371,6 +373,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     private final Configuration conf;
     private static Cache<Path, FileInfo> footerCache;
     private static ExecutorService threadPool = null;
+    private static ExecutorCompletionService<AcidDirInfo> ecs = null;
     private final int numBuckets;
     private final long maxSize;
     private final long minSize;
@@ -416,6 +419,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
           threadPool = Executors.newFixedThreadPool(numThreads,
               new ThreadFactoryBuilder().setDaemon(true)
                   .setNameFormat("ORC_GET_SPLITS #%d").build());
+          ecs = new ExecutorCompletionService<AcidDirInfo>(threadPool);
         }
 
         if (footerCache == null && cacheStripeDetails) {
@@ -433,10 +437,34 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
+  /**
+   * The full ACID directory information needed for splits; no more calls to HDFS needed.
+   * We could just live with AcidUtils.Directory but...
+   * 1) That doesn't contain have base files.
+   * 2) We save fs for convenience to avoid getting it twice.
+   */
+  @VisibleForTesting
+  static final class AcidDirInfo {
+    public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
+        List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+      this.splitPath = splitPath;
+      this.acidInfo = acidInfo;
+      this.baseOrOriginalFiles = baseOrOriginalFiles;
+      this.fs = fs;
+    }
+
+    final FileSystem fs;
+    final Path splitPath;
+    final AcidUtils.Directory acidInfo;
+    final List<HdfsFileStatusWithId> baseOrOriginalFiles;
+  }
+
+  @VisibleForTesting
   interface SplitStrategy<T> {
     List<T> getSplits() throws IOException;
   }
 
+  @VisibleForTesting
   static final class SplitInfo extends ACIDSplitStrategy {
     private final Context context;
     private final FileSystem fs;
@@ -638,7 +666,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
    * Given a directory, get the list of files and blocks in those files.
    * To parallelize file generator use "mapreduce.input.fileinputformat.list-status.num-threads"
    */
-  static final class FileGenerator implements Callable<SplitStrategy> {
+  static final class FileGenerator implements Callable<AcidDirInfo> {
     private final Context context;
     private final FileSystem fs;
     private final Path dir;
@@ -652,69 +680,14 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     }
 
     @Override
-    public SplitStrategy call() throws IOException {
-      final SplitStrategy splitStrategy;
+    public AcidDirInfo call() throws IOException {
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
           context.conf, context.transactionList, useFileIds);
-      List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
       Path base = dirInfo.getBaseDirectory();
-      List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
-      boolean[] covered = new boolean[context.numBuckets];
-      boolean isOriginal = base == null;
-
-      // if we have a base to work from
-      if (base != null || !original.isEmpty()) {
-
-        // find the base files (original or new style)
-        List<HdfsFileStatusWithId> children = original;
-        if (base != null) {
-          children = findBaseFiles(base, useFileIds);
-        }
-
-        long totalFileSize = 0;
-        for (HdfsFileStatusWithId child : children) {
-          totalFileSize += child.getFileStatus().getLen();
-          AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
-              (child.getFileStatus().getPath(), context.conf);
-          int b = opts.getBucket();
-          // If the bucket is in the valid range, mark it as covered.
-          // I wish Hive actually enforced bucketing all of the time.
-          if (b >= 0 && b < covered.length) {
-            covered[b] = true;
-          }
-        }
-
-        int numFiles = children.size();
-        long avgFileSize = totalFileSize / numFiles;
-        int totalFiles = context.numFilesCounter.addAndGet(numFiles);
-        switch(context.splitStrategyKind) {
-          case BI:
-            // BI strategy requested through config
-            splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
-                deltas, covered);
-            break;
-          case ETL:
-            // ETL strategy requested through config
-            splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
-                deltas, covered);
-            break;
-          default:
-            // HYBRID strategy
-            if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) {
-              splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
-                  covered);
-            } else {
-              splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
-                  covered);
-            }
-            break;
-        }
-      } else {
-        // no base, only deltas
-        splitStrategy = new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
-      }
-
-      return splitStrategy;
+      // find the base files (original or new style)
+      List<HdfsFileStatusWithId> children = (base == null)
+          ? dirInfo.getOriginalFiles() : findBaseFiles(base, useFileIds);
+      return new AcidDirInfo(fs, dir, dirInfo, children);
     }
 
     private List<HdfsFileStatusWithId> findBaseFiles(
@@ -1052,21 +1025,24 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     // use threads to resolve directories into splits
     Context context = new Context(conf, numSplits);
     List<OrcSplit> splits = Lists.newArrayList();
-    List<Future<?>> pathFutures = Lists.newArrayList();
-    List<Future<?>> splitFutures = Lists.newArrayList();
+    List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
+    List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
 
     // multi-threaded file statuses and split strategy
     boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
-    for (Path dir : getInputPaths(conf)) {
+    Path[] paths = getInputPaths(conf);
+    for (Path dir : paths) {
       FileSystem fs = dir.getFileSystem(conf);
       FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds);
-      pathFutures.add(context.threadPool.submit(fileGenerator));
+      pathFutures.add(Context.ecs.submit(fileGenerator));
     }
 
     // complete path futures and schedule split generation
     try {
-      for (Future<?> pathFuture : pathFutures) {
-        SplitStrategy splitStrategy = (SplitStrategy) pathFuture.get();
+      for (int notIndex = 0; notIndex < paths.length; ++notIndex) {
+        AcidDirInfo adi = Context.ecs.take().get();
+        SplitStrategy splitStrategy = determineSplitStrategy(
+            context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
 
         if (isDebugEnabled) {
           LOG.debug(splitStrategy);
@@ -1075,7 +1051,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
         if (splitStrategy instanceof ETLSplitStrategy) {
           List<SplitInfo> splitInfos = splitStrategy.getSplits();
           for (SplitInfo splitInfo : splitInfos) {
-            splitFutures.add(context.threadPool.submit(new SplitGenerator(splitInfo)));
+            splitFutures.add(Context.threadPool.submit(new SplitGenerator(splitInfo)));
           }
         } else {
           splits.addAll(splitStrategy.getSplits());
@@ -1083,8 +1059,8 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
       }
 
       // complete split futures
-      for (Future<?> splitFuture : splitFutures) {
-        splits.addAll((Collection<? extends OrcSplit>) splitFuture.get());
+      for (Future<List<OrcSplit>> splitFuture : splitFutures) {
+        splits.addAll(splitFuture.get());
       }
     } catch (Exception e) {
       cancelFutures(pathFutures);
@@ -1106,8 +1082,8 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     return splits;
   }
 
-  private static void cancelFutures(List<Future<?>> futures) {
-    for (Future future : futures) {
+  private static <T> void cancelFutures(List<Future<T>> futures) {
+    for (Future<T> future : futures) {
       future.cancel(true);
     }
   }
@@ -1375,6 +1351,55 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
         directory);
   }
 
+
+  @VisibleForTesting
+  static SplitStrategy determineSplitStrategy(Context context, FileSystem fs, Path dir,
+      AcidUtils.Directory dirInfo, List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+    Path base = dirInfo.getBaseDirectory();
+    List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
+    List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+    boolean[] covered = new boolean[context.numBuckets];
+    boolean isOriginal = base == null;
+
+    // if we have a base to work from
+    if (base != null || !original.isEmpty()) {
+      long totalFileSize = 0;
+      for (HdfsFileStatusWithId child : baseOrOriginalFiles) {
+        totalFileSize += child.getFileStatus().getLen();
+        AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
+            (child.getFileStatus().getPath(), context.conf);
+        int b = opts.getBucket();
+        // If the bucket is in the valid range, mark it as covered.
+        // I wish Hive actually enforced bucketing all of the time.
+        if (b >= 0 && b < covered.length) {
+          covered[b] = true;
+        }
+      }
+
+      int numFiles = baseOrOriginalFiles.size();
+      long avgFileSize = totalFileSize / numFiles;
+      int totalFiles = context.numFilesCounter.addAndGet(numFiles);
+      switch(context.splitStrategyKind) {
+        case BI:
+          // BI strategy requested through config
+          return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+        case ETL:
+          // ETL strategy requested through config
+          return new ETLSplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+        default:
+          // HYBRID strategy
+          if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) {
+            return new ETLSplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+          } else {
+            return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+          }
+      }
+    } else {
+      // no base, only deltas
+      return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
+    }
+  }
+
   @Override
   public RawReader<OrcStruct> getRawReader(Configuration conf,
                                            boolean collapseEvents,

http://git-wip-us.apache.org/repos/asf/hive/blob/f530f44d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index ce86cd8..8ba4d2e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -484,7 +484,7 @@ public class TestInputOutputFormat {
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
               context, fs, new MockPath(fs, "mock:/a/b"), false);
-          final SplitStrategy splitStrategy = gen.call();
+          final SplitStrategy splitStrategy = createSplitStrategy(context, gen);
           assertTrue(
               String.format(
                   "Split strategy for %d files x %d size for %d splits", c, s,
@@ -508,7 +508,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.FileGenerator gen =
       new OrcInputFormat.FileGenerator(context, fs,
           new MockPath(fs, "mock:/a/b"), false);
-    SplitStrategy splitStrategy = gen.call();
+    OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
     assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
 
     conf.set("mapreduce.input.fileinputformat.split.maxsize", "500");
@@ -521,11 +521,18 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
     gen = new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a/b"), false);
-    splitStrategy = gen.call();
+    splitStrategy = createSplitStrategy(context, gen);
     assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy);
 
   }
 
+  private OrcInputFormat.SplitStrategy createSplitStrategy(
+      OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException {
+    OrcInputFormat.AcidDirInfo adi = gen.call();
+    return OrcInputFormat.determineSplitStrategy(
+        context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+  }
+
   public static class MockBlock {
     int offset;
     int length;