You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/04 04:55:05 UTC
[16/28] hive git commit: HIVE-11689 : minor flow changes to ORC split
generation (Sergey Shelukhin,
reviewed by Prasanth Jayachandran and Swarnim Kulkarni)
HIVE-11689 : minor flow changes to ORC split generation (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Swarnim Kulkarni)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f530f44d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f530f44d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f530f44d
Branch: refs/heads/llap
Commit: f530f44d1d95c2da2485d53f0855f8f8e0646005
Parents: c0690a6
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Sep 1 11:23:14 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Sep 1 11:23:14 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 169 +++++++++++--------
.../hive/ql/io/orc/TestInputOutputFormat.java | 13 +-
2 files changed, 107 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f530f44d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 8c138b9..05efc5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidInputFormat.DeltaMetaData;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -371,6 +373,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final Configuration conf;
private static Cache<Path, FileInfo> footerCache;
private static ExecutorService threadPool = null;
+ private static ExecutorCompletionService<AcidDirInfo> ecs = null;
private final int numBuckets;
private final long maxSize;
private final long minSize;
@@ -416,6 +419,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
threadPool = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ORC_GET_SPLITS #%d").build());
+ ecs = new ExecutorCompletionService<AcidDirInfo>(threadPool);
}
if (footerCache == null && cacheStripeDetails) {
@@ -433,10 +437,34 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
+ /**
+ * The full ACID directory information needed for splits; no more calls to HDFS needed.
+ * We could just live with AcidUtils.Directory but...
+ * 1) That doesn't contain have base files.
+ * 2) We save fs for convenience to avoid getting it twice.
+ */
+ @VisibleForTesting
+ static final class AcidDirInfo {
+ public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
+ List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+ this.splitPath = splitPath;
+ this.acidInfo = acidInfo;
+ this.baseOrOriginalFiles = baseOrOriginalFiles;
+ this.fs = fs;
+ }
+
+ final FileSystem fs;
+ final Path splitPath;
+ final AcidUtils.Directory acidInfo;
+ final List<HdfsFileStatusWithId> baseOrOriginalFiles;
+ }
+
+ @VisibleForTesting
interface SplitStrategy<T> {
List<T> getSplits() throws IOException;
}
+ @VisibleForTesting
static final class SplitInfo extends ACIDSplitStrategy {
private final Context context;
private final FileSystem fs;
@@ -638,7 +666,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* Given a directory, get the list of files and blocks in those files.
* To parallelize file generator use "mapreduce.input.fileinputformat.list-status.num-threads"
*/
- static final class FileGenerator implements Callable<SplitStrategy> {
+ static final class FileGenerator implements Callable<AcidDirInfo> {
private final Context context;
private final FileSystem fs;
private final Path dir;
@@ -652,69 +680,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
@Override
- public SplitStrategy call() throws IOException {
- final SplitStrategy splitStrategy;
+ public AcidDirInfo call() throws IOException {
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
context.conf, context.transactionList, useFileIds);
- List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
Path base = dirInfo.getBaseDirectory();
- List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
- boolean[] covered = new boolean[context.numBuckets];
- boolean isOriginal = base == null;
-
- // if we have a base to work from
- if (base != null || !original.isEmpty()) {
-
- // find the base files (original or new style)
- List<HdfsFileStatusWithId> children = original;
- if (base != null) {
- children = findBaseFiles(base, useFileIds);
- }
-
- long totalFileSize = 0;
- for (HdfsFileStatusWithId child : children) {
- totalFileSize += child.getFileStatus().getLen();
- AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
- (child.getFileStatus().getPath(), context.conf);
- int b = opts.getBucket();
- // If the bucket is in the valid range, mark it as covered.
- // I wish Hive actually enforced bucketing all of the time.
- if (b >= 0 && b < covered.length) {
- covered[b] = true;
- }
- }
-
- int numFiles = children.size();
- long avgFileSize = totalFileSize / numFiles;
- int totalFiles = context.numFilesCounter.addAndGet(numFiles);
- switch(context.splitStrategyKind) {
- case BI:
- // BI strategy requested through config
- splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
- deltas, covered);
- break;
- case ETL:
- // ETL strategy requested through config
- splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
- deltas, covered);
- break;
- default:
- // HYBRID strategy
- if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) {
- splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
- covered);
- } else {
- splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
- covered);
- }
- break;
- }
- } else {
- // no base, only deltas
- splitStrategy = new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
- }
-
- return splitStrategy;
+ // find the base files (original or new style)
+ List<HdfsFileStatusWithId> children = (base == null)
+ ? dirInfo.getOriginalFiles() : findBaseFiles(base, useFileIds);
+ return new AcidDirInfo(fs, dir, dirInfo, children);
}
private List<HdfsFileStatusWithId> findBaseFiles(
@@ -1052,21 +1025,24 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// use threads to resolve directories into splits
Context context = new Context(conf, numSplits);
List<OrcSplit> splits = Lists.newArrayList();
- List<Future<?>> pathFutures = Lists.newArrayList();
- List<Future<?>> splitFutures = Lists.newArrayList();
+ List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
+ List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
// multi-threaded file statuses and split strategy
boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
- for (Path dir : getInputPaths(conf)) {
+ Path[] paths = getInputPaths(conf);
+ for (Path dir : paths) {
FileSystem fs = dir.getFileSystem(conf);
FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds);
- pathFutures.add(context.threadPool.submit(fileGenerator));
+ pathFutures.add(Context.ecs.submit(fileGenerator));
}
// complete path futures and schedule split generation
try {
- for (Future<?> pathFuture : pathFutures) {
- SplitStrategy splitStrategy = (SplitStrategy) pathFuture.get();
+ for (int notIndex = 0; notIndex < paths.length; ++notIndex) {
+ AcidDirInfo adi = Context.ecs.take().get();
+ SplitStrategy splitStrategy = determineSplitStrategy(
+ context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
if (isDebugEnabled) {
LOG.debug(splitStrategy);
@@ -1075,7 +1051,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (splitStrategy instanceof ETLSplitStrategy) {
List<SplitInfo> splitInfos = splitStrategy.getSplits();
for (SplitInfo splitInfo : splitInfos) {
- splitFutures.add(context.threadPool.submit(new SplitGenerator(splitInfo)));
+ splitFutures.add(Context.threadPool.submit(new SplitGenerator(splitInfo)));
}
} else {
splits.addAll(splitStrategy.getSplits());
@@ -1083,8 +1059,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
// complete split futures
- for (Future<?> splitFuture : splitFutures) {
- splits.addAll((Collection<? extends OrcSplit>) splitFuture.get());
+ for (Future<List<OrcSplit>> splitFuture : splitFutures) {
+ splits.addAll(splitFuture.get());
}
} catch (Exception e) {
cancelFutures(pathFutures);
@@ -1106,8 +1082,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return splits;
}
- private static void cancelFutures(List<Future<?>> futures) {
- for (Future future : futures) {
+ private static <T> void cancelFutures(List<Future<T>> futures) {
+ for (Future<T> future : futures) {
future.cancel(true);
}
}
@@ -1375,6 +1351,55 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
directory);
}
+
+ @VisibleForTesting
+ static SplitStrategy determineSplitStrategy(Context context, FileSystem fs, Path dir,
+ AcidUtils.Directory dirInfo, List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+ Path base = dirInfo.getBaseDirectory();
+ List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
+ List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+ boolean[] covered = new boolean[context.numBuckets];
+ boolean isOriginal = base == null;
+
+ // if we have a base to work from
+ if (base != null || !original.isEmpty()) {
+ long totalFileSize = 0;
+ for (HdfsFileStatusWithId child : baseOrOriginalFiles) {
+ totalFileSize += child.getFileStatus().getLen();
+ AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
+ (child.getFileStatus().getPath(), context.conf);
+ int b = opts.getBucket();
+ // If the bucket is in the valid range, mark it as covered.
+ // I wish Hive actually enforced bucketing all of the time.
+ if (b >= 0 && b < covered.length) {
+ covered[b] = true;
+ }
+ }
+
+ int numFiles = baseOrOriginalFiles.size();
+ long avgFileSize = totalFileSize / numFiles;
+ int totalFiles = context.numFilesCounter.addAndGet(numFiles);
+ switch(context.splitStrategyKind) {
+ case BI:
+ // BI strategy requested through config
+ return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+ case ETL:
+ // ETL strategy requested through config
+ return new ETLSplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+ default:
+ // HYBRID strategy
+ if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) {
+ return new ETLSplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+ } else {
+ return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+ }
+ }
+ } else {
+ // no base, only deltas
+ return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
+ }
+ }
+
@Override
public RawReader<OrcStruct> getRawReader(Configuration conf,
boolean collapseEvents,
http://git-wip-us.apache.org/repos/asf/hive/blob/f530f44d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index ce86cd8..8ba4d2e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -484,7 +484,7 @@ public class TestInputOutputFormat {
conf, n);
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
context, fs, new MockPath(fs, "mock:/a/b"), false);
- final SplitStrategy splitStrategy = gen.call();
+ final SplitStrategy splitStrategy = createSplitStrategy(context, gen);
assertTrue(
String.format(
"Split strategy for %d files x %d size for %d splits", c, s,
@@ -508,7 +508,7 @@ public class TestInputOutputFormat {
OrcInputFormat.FileGenerator gen =
new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false);
- SplitStrategy splitStrategy = gen.call();
+ OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
conf.set("mapreduce.input.fileinputformat.split.maxsize", "500");
@@ -521,11 +521,18 @@ public class TestInputOutputFormat {
new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
gen = new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false);
- splitStrategy = gen.call();
+ splitStrategy = createSplitStrategy(context, gen);
assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy);
}
+ private OrcInputFormat.SplitStrategy createSplitStrategy(
+ OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException {
+ OrcInputFormat.AcidDirInfo adi = gen.call();
+ return OrcInputFormat.determineSplitStrategy(
+ context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+ }
+
public static class MockBlock {
int offset;
int length;