You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/10/21 07:38:51 UTC
hive git commit: HIVE-14920: S3: Optimize
SimpleFetchOptimizer::checkThreshold() (Rajesh Balamohan reviewed by Ashutosh
Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 19999dad8 -> f2efa6a2b
HIVE-14920: S3: Optimize SimpleFetchOptimizer::checkThreshold() (Rajesh Balamohan reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f2efa6a2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f2efa6a2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f2efa6a2
Branch: refs/heads/master
Commit: f2efa6a2be52f09e700c931a293c816a446bf619
Parents: 19999da
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Oct 21 00:38:36 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Oct 21 00:38:36 2016 -0700
----------------------------------------------------------------------
.../hive/ql/optimizer/SimpleFetchOptimizer.java | 115 +++++++++++++------
1 file changed, 82 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f2efa6a2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index eb0ba7b..0481110 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -26,7 +26,16 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.ContentSummary;
@@ -162,13 +171,7 @@ public class SimpleFetchOptimizer extends Transform {
return true;
}
}
- long remaining = threshold;
- remaining -= data.getInputLength(pctx, remaining);
- if (remaining < 0) {
- LOG.info("Threshold " + remaining + " exceeded for pseudoMR mode");
- return false;
- }
- return true;
+ return data.isDataLengthWitInThreshold(pctx, threshold);
}
// all we can handle is LimitOperator, FilterOperator SelectOperator and final FS
@@ -414,18 +417,16 @@ public class SimpleFetchOptimizer extends Transform {
return replaceFSwithLS(fileSink, work.getSerializationNullFormat());
}
- private long getInputLength(ParseContext pctx, long remaining) throws Exception {
+ private boolean isDataLengthWitInThreshold(ParseContext pctx, final long threshold)
+ throws Exception {
if (splitSample != null && splitSample.getTotalLength() != null) {
- return splitSample.getTotalLength();
- }
- if (splitSample != null) {
- return splitSample.getTargetSize(calculateLength(pctx, splitSample.estimateSourceSize(remaining)));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Threshold " + splitSample.getTotalLength() + " exceeded for pseudoMR mode");
+ }
+ return (threshold - splitSample.getTotalLength()) > 0;
}
- return calculateLength(pctx, remaining);
- }
- private long calculateLength(ParseContext pctx, long remaining) throws Exception {
- JobConf jobConf = new JobConf(pctx.getConf());
+ final JobConf jobConf = new JobConf(pctx.getConf());
Utilities.setColumnNameList(jobConf, scanOp, true);
Utilities.setColumnTypeList(jobConf, scanOp, true);
HiveStorageHandler handler = table.getStorageHandler();
@@ -434,41 +435,89 @@ public class SimpleFetchOptimizer extends Transform {
TableDesc tableDesc = Utilities.getTableDesc(table);
PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
- return estimator.estimate(jobConf, scanOp, remaining).getTotalLength();
+ long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+ }
+ return (threshold - len) > 0;
}
if (table.isNonNative()) {
- return 0; // nothing can be done
+ return true; // nothing can be done
}
if (!table.isPartitioned()) {
- return getFileLength(jobConf, table.getPath(), table.getInputFormatClass());
+ long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+ }
+ return (threshold - len) > 0;
}
- long total = 0;
- for (Partition partition : partsList.getNotDeniedPartns()) {
- Path path = partition.getDataLocation();
- total += getFileLength(jobConf, path, partition.getInputFormatClass());
- if (total > remaining) {
- break;
+ final AtomicLong total = new AtomicLong(0);
+ //TODO: use common thread pool later?
+ int threadCount = HiveConf.getIntVar(pctx.getConf(),
+ HiveConf.ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
+ final ExecutorService pool = (threadCount > 0) ?
+ Executors.newFixedThreadPool(threadCount,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null;
+ try {
+ List<Future> futures = Lists.newLinkedList();
+ for (final Partition partition : partsList.getNotDeniedPartns()) {
+ final Path path = partition.getDataLocation();
+ if (pool != null) {
+ futures.add(pool.submit(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold);
+ LOG.trace(path + ", length=" + len);
+ return total.addAndGet(len);
+ }
+ }));
+ } else {
+ total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold));
+ }
+ }
+ if (pool != null) {
+ pool.shutdown();
+ for (Future<Long> future : futures) {
+ long totalLen = future.get();
+ if ((threshold - totalLen) <= 0) {
+ // early exit, as getting file lengths can be expensive in object stores.
+ return false;
+ }
+ }
+ }
+ return (threshold - total.get()) >= 0;
+ } finally {
+ LOG.info("Data set size=" + total.get() + ", threshold=" + threshold);
+ if (pool != null) {
+ pool.shutdownNow();
}
}
- return total;
}
- // from Utilities.getInputSummary()
- private long getFileLength(JobConf conf, Path path, Class<? extends InputFormat> clazz)
+ private long getPathLength(JobConf conf, Path path,
+ Class<? extends InputFormat> clazz, long threshold)
throws IOException {
- ContentSummary summary;
if (ContentSummaryInputFormat.class.isAssignableFrom(clazz)) {
InputFormat input = HiveInputFormat.getInputFormatFromCache(clazz, conf);
- summary = ((ContentSummaryInputFormat)input).getContentSummary(path, conf);
+ return ((ContentSummaryInputFormat)input).getContentSummary(path, conf).getLength();
} else {
FileSystem fs = path.getFileSystem(conf);
try {
- summary = fs.getContentSummary(path);
+ long length = 0;
+ RemoteIterator<LocatedFileStatus> results = fs.listFiles(path, true);
+ // No need to iterate more, when threshold is reached
+ // (beneficial especially for object stores)
+ while (length <= threshold && results.hasNext()) {
+ length += results.next().getLen();
+ }
+ LOG.trace("length=" + length + ", threshold=" + threshold);
+ return length;
} catch (FileNotFoundException e) {
return 0;
}
}
- return summary.getLength();
}
}