You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2011/08/26 20:35:48 UTC

svn commit: r1162202 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/LimitOperator.java parse/SemanticAnalyzer.java

Author: cws
Date: Fri Aug 26 18:35:48 2011
New Revision: 1162202

URL: http://svn.apache.org/viewvc?rev=1162202&view=rev
Log:
HIVE-2385. Local Mode can be more aggressive if LIMIT optimization is on (Siying Dong via cws)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=1162202&r1=1162201&r2=1162202&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Fri Aug 26 18:35:48 2011
@@ -34,6 +34,7 @@ public class LimitOperator extends Opera
   protected transient int limit;
   protected transient int leastRow;
   protected transient int currCount;
+  protected transient boolean isMap;
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
@@ -41,6 +42,7 @@ public class LimitOperator extends Opera
     limit = conf.getLimit();
     leastRow = conf.getLeastRows();
     currCount = 0;
+    isMap = hconf.getBoolean("mapred.task.is.map", true);
   }
 
   @Override
@@ -65,7 +67,7 @@ public class LimitOperator extends Opera
 
   @Override
   public void closeOp(boolean abort) throws HiveException {
-    if (currCount < leastRow) {
+    if (!isMap && currCount < leastRow) {
       throw new HiveException("No sufficient row found");
     }
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1162202&r1=1162201&r2=1162202&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Aug 26 18:35:48 2011
@@ -27,9 +27,9 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.Map.Entry;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -92,7 +92,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
 import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
 import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
 import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
@@ -102,6 +101,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
 import org.apache.hadoop.hive.ql.optimizer.Optimizer;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -122,7 +122,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
@@ -145,12 +144,13 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.ResourceType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
@@ -158,9 +158,9 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -6977,7 +6977,7 @@ public class SemanticAnalyzer extends Ba
       }
     }
 
-    decideExecMode(rootTasks, ctx);
+    decideExecMode(rootTasks, ctx, globalLimitCtx);
 
     if (qb.isCTAS()) {
       // generate a DDL task and make it a dependent task of the leaf
@@ -8001,7 +8001,8 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
-  private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx)
+  private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
+      GlobalLimitCtx globalLimitCtx)
     throws SemanticException {
 
     // bypass for explain queries for now
@@ -8032,14 +8033,32 @@ public class SemanticAnalyzer extends Ba
           (ctx, (MapredWork)mrtask.getWork(), p);
         int numReducers = getNumberOfReducers(mrtask.getWork(), conf);
 
+        long estimatedInput;
+
+        if (globalLimitCtx != null && globalLimitCtx.isEnable()) {
+          // If the global limit optimization is triggered, we will
+          // estimate input data actually needed based on limit rows.
+          // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2)
+          //
+          long sizePerRow = HiveConf.getLongVar(conf,
+              HiveConf.ConfVars.HIVELIMITMAXROWSIZE);
+          estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow;
+          long minSplitSize = HiveConf.getLongVar(conf,
+              HiveConf.ConfVars.MAPREDMINSPLITSIZE);
+          long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1;
+          estimatedInput = estimatedInput * (estimatedNumMap + 1);
+        } else {
+          estimatedInput = inputSummary.getLength();
+        }
+
         if (LOG.isDebugEnabled()) {
           LOG.debug("Task: " + mrtask.getId() + ", Summary: " +
                    inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
-                   + numReducers);
+                   + numReducers + ", estimated Input: " + estimatedInput);
         }
 
         if(MapRedTask.isEligibleForLocalMode(conf, numReducers,
-            inputSummary.getLength(), inputSummary.getFileCount()) != null) {
+            estimatedInput, inputSummary.getFileCount()) != null) {
           hasNonLocalJob = true;
           break;
         }else{