You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/14 02:44:06 UTC
hive git commit: HIVE-11856 : allow split strategies to run on
threadpool (Sergey Shelukhin, reviewed by Vikram Dixit K)
Repository: hive
Updated Branches:
refs/heads/master 7efb7308c -> d7f1b465b
HIVE-11856 : allow split strategies to run on threadpool (Sergey Shelukhin, reviewed by Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d7f1b465
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d7f1b465
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d7f1b465
Branch: refs/heads/master
Commit: d7f1b465b9853c772abd8ad4d8ead175f8ed5cb9
Parents: 7efb730
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Oct 13 16:10:55 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Oct 13 17:43:54 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 65 +++++++++++++++++---
1 file changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f1b465/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 690f8dd..b03e055 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -452,7 +452,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// We store all caches in variables to change the main one based on config.
// This is not thread safe between different split generations (and wasn't anyway).
- private static FooterCache footerCache;
+ private FooterCache footerCache;
private static LocalCache localCache;
private static MetastoreCache metaCache;
private static ExecutorService threadPool = null;
@@ -596,7 +596,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* ETL strategy is used when spending little more time in split generation is acceptable
* (split generation reads and caches file footers).
*/
- static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
+ static final class ETLSplitStrategy implements SplitStrategy<SplitInfo>, Callable<Void> {
Context context;
FileSystem fs;
List<HdfsFileStatusWithId> files;
@@ -604,6 +604,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<DeltaMetaData> deltas;
Path dir;
boolean[] covered;
+ private List<Future<List<OrcSplit>>> splitFuturesRef;
public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData> deltas,
@@ -629,7 +630,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// Force local cache if we have deltas.
FooterCache cache = context.cacheStripeDetails ?
- (deltas == null ? Context.footerCache : Context.localCache) : null;
+ (deltas == null ? context.footerCache : Context.localCache) : null;
if (cache != null) {
FileInfo[] infos = cache.getAndValidate(files);
for (int i = 0; i < files.size(); ++i) {
@@ -661,6 +662,34 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public String toString() {
return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dir;
}
+
+ public Future<Void> generateSplitWork(
+ Context context, List<Future<List<OrcSplit>>> splitFutures) throws IOException {
+ if (context.cacheStripeDetails && context.footerCache.isBlocking()) {
+ this.splitFuturesRef = splitFutures;
+ return Context.threadPool.submit(this);
+ } else {
+ runGetSplitsSync(splitFutures);
+ return null;
+ }
+ }
+
+ @Override
+ public Void call() throws IOException {
+ runGetSplitsSync(splitFuturesRef);
+ return null;
+ }
+
+ private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures) throws IOException {
+ List<SplitInfo> splits = getSplits();
+ List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size());
+ for (SplitInfo splitInfo : splits) {
+ localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo)));
+ }
+ synchronized (splitFutures) {
+ splitFutures.addAll(localList);
+ }
+ }
}
/**
@@ -1028,7 +1057,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
&& fileInfo.writerVersion != null;
// We assume that if we needed to create a reader, we need to cache it to meta cache.
// TODO: This will also needlessly overwrite it in local cache for now.
- Context.footerCache.put(fileWithId.getFileId(), file, fileInfo.fileMetaInfo, orcReader);
+ context.footerCache.put(fileWithId.getFileId(), file, fileInfo.fileMetaInfo, orcReader);
}
} else {
Reader orcReader = createOrcReader();
@@ -1041,7 +1070,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
((ReaderImpl) orcReader).getFileMetaInfo() : null;
if (context.cacheStripeDetails) {
Long fileId = fileWithId.getFileId();
- Context.footerCache.put(fileId, file, fileMetaInfo, orcReader);
+ context.footerCache.put(fileId, file, fileMetaInfo, orcReader);
}
}
includedCols = genIncludedColumns(types, context.conf, isOriginal);
@@ -1084,7 +1113,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
List<OrcSplit> splits = Lists.newArrayList();
List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
- List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
+ List<Future<Void>> strategyFutures = Lists.newArrayList();
+ final List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
// multi-threaded file statuses and split strategy
Path[] paths = getInputPaths(conf);
@@ -1109,9 +1139,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// Hack note - different split strategies return differently typed lists, yay Java.
// This works purely by magic, because we know which strategy produces which type.
if (splitStrategy instanceof ETLSplitStrategy) {
- List<SplitInfo> splitInfos = ((ETLSplitStrategy)splitStrategy).getSplits();
- for (SplitInfo splitInfo : splitInfos) {
- splitFutures.add(Context.threadPool.submit(new SplitGenerator(splitInfo)));
+ Future<Void> ssFuture = ((ETLSplitStrategy)splitStrategy).generateSplitWork(
+ context, splitFutures);
+ if (ssFuture != null) {
+ strategyFutures.add(ssFuture);
}
} else {
@SuppressWarnings("unchecked")
@@ -1121,11 +1152,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
// complete split futures
+ for (Future<Void> ssFuture : strategyFutures) {
+ ssFuture.get(); // Make sure we get exceptions strategies might have thrown.
+ }
+ // All the split strategies are done, so it must be safe to access splitFutures.
for (Future<List<OrcSplit>> splitFuture : splitFutures) {
splits.addAll(splitFuture.get());
}
} catch (Exception e) {
cancelFutures(pathFutures);
+ cancelFutures(strategyFutures);
cancelFutures(splitFutures);
throw new RuntimeException("ORC split generation failed with exception: " + e.getMessage(), e);
}
@@ -1557,6 +1593,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
*/
public interface FooterCache {
FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException;
+ boolean isBlocking();
void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
throws IOException;
}
@@ -1619,6 +1656,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
orcReader.getOrcProtoFileStatistics(), fileMetaInfo, orcReader.getWriterVersion(),
fileId));
}
+
+ @Override
+ public boolean isBlocking() {
+ return false;
+ }
}
/** Metastore-based footer cache storing serialized footers. Also has a local cache. */
@@ -1734,5 +1776,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public void configure(HiveConf queryConfig) {
this.conf = queryConfig;
}
+
+ @Override
+ public boolean isBlocking() {
+ return true;
+ }
}
}