You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2011/08/26 20:35:48 UTC
svn commit: r1162202 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
exec/LimitOperator.java parse/SemanticAnalyzer.java
Author: cws
Date: Fri Aug 26 18:35:48 2011
New Revision: 1162202
URL: http://svn.apache.org/viewvc?rev=1162202&view=rev
Log:
HIVE-2385. Local Mode can be more aggressive if LIMIT optimization is on (Siying Dong via cws)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=1162202&r1=1162201&r2=1162202&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Fri Aug 26 18:35:48 2011
@@ -34,6 +34,7 @@ public class LimitOperator extends Opera
protected transient int limit;
protected transient int leastRow;
protected transient int currCount;
+ protected transient boolean isMap;
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -41,6 +42,7 @@ public class LimitOperator extends Opera
limit = conf.getLimit();
leastRow = conf.getLeastRows();
currCount = 0;
+ isMap = hconf.getBoolean("mapred.task.is.map", true);
}
@Override
@@ -65,7 +67,7 @@ public class LimitOperator extends Opera
@Override
public void closeOp(boolean abort) throws HiveException {
- if (currCount < leastRow) {
+ if (!isMap && currCount < leastRow) {
throw new HiveException("No sufficient row found");
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1162202&r1=1162201&r2=1162202&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Aug 26 18:35:48 2011
@@ -27,9 +27,9 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
+import java.util.Map.Entry;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -92,7 +92,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
@@ -102,6 +101,7 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.Optimizer;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -122,7 +122,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
@@ -145,12 +144,13 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.UnionDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.ResourceType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
@@ -158,9 +158,9 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -6977,7 +6977,7 @@ public class SemanticAnalyzer extends Ba
}
}
- decideExecMode(rootTasks, ctx);
+ decideExecMode(rootTasks, ctx, globalLimitCtx);
if (qb.isCTAS()) {
// generate a DDL task and make it a dependent task of the leaf
@@ -8001,7 +8001,8 @@ public class SemanticAnalyzer extends Ba
}
}
- private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx)
+ private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
+ GlobalLimitCtx globalLimitCtx)
throws SemanticException {
// bypass for explain queries for now
@@ -8032,14 +8033,32 @@ public class SemanticAnalyzer extends Ba
(ctx, (MapredWork)mrtask.getWork(), p);
int numReducers = getNumberOfReducers(mrtask.getWork(), conf);
+ long estimatedInput;
+
+ if (globalLimitCtx != null && globalLimitCtx.isEnable()) {
+ // If the global limit optimization is triggered, we will
+ // estimate input data actually needed based on limit rows.
+ // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2)
+ //
+ long sizePerRow = HiveConf.getLongVar(conf,
+ HiveConf.ConfVars.HIVELIMITMAXROWSIZE);
+ estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow;
+ long minSplitSize = HiveConf.getLongVar(conf,
+ HiveConf.ConfVars.MAPREDMINSPLITSIZE);
+ long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1;
+ estimatedInput = estimatedInput * (estimatedNumMap + 1);
+ } else {
+ estimatedInput = inputSummary.getLength();
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Task: " + mrtask.getId() + ", Summary: " +
inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
- + numReducers);
+ + numReducers + ", estimated Input: " + estimatedInput);
}
if(MapRedTask.isEligibleForLocalMode(conf, numReducers,
- inputSummary.getLength(), inputSummary.getFileCount()) != null) {
+ estimatedInput, inputSummary.getFileCount()) != null) {
hasNonLocalJob = true;
break;
}else{