You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/07 08:42:24 UTC

svn commit: r1500375 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse: MapReduceCompiler.java SemanticAnalyzer.java

Author: hashutosh
Date: Sun Jul  7 06:42:24 2013
New Revision: 1500375

URL: http://svn.apache.org/r1500375
Log:
HIVE-4811 : (Slightly) break up the SemanticAnalyzer monstrosity (Gunther Hagleitner via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java?rev=1500375&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java Sun Jul  7 06:42:24 2013
@@ -0,0 +1,650 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
+import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
+import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
+import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
+import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1;
+import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+public class MapReduceCompiler {
+
+  protected final Log LOG = LogFactory.getLog(MapReduceCompiler.class);
+  private Hive db;
+  protected LogHelper console;
+  private HiveConf conf;
+
+
+  public MapReduceCompiler() {
+  }
+
+  public void init(HiveConf conf, LogHelper console, Hive db) {
+    this.conf = conf;
+    this.db = db;
+    this.console = console;
+  }
+
+  @SuppressWarnings({"nls", "unchecked"})
+  public void compile(final ParseContext pCtx, final List<Task<? extends Serializable>> rootTasks,
+      final HashSet<ReadEntity> inputs, final HashSet<WriteEntity> outputs) throws SemanticException {
+
+    Context ctx = pCtx.getContext();
+    GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx();
+    QB qb = pCtx.getQB();
+    List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
+
+    List<LoadTableDesc> loadTableWork = pCtx.getLoadTableWork();
+    List<LoadFileDesc> loadFileWork = pCtx.getLoadFileWork();
+
+    boolean isCStats = qb.isAnalyzeRewrite();
+
+    if (pCtx.getFetchTask() != null) {
+      return;
+    }
+
+    /*
+     * In case of a select, use a fetch task instead of a move task.
+     * If the select is from analyze table column rewrite, don't create a fetch task. Instead create
+     * a column stats task later.
+     */
+    if (pCtx.getQB().getIsQuery() && !isCStats) {
+      if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
+        throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
+      }
+      String cols = loadFileWork.get(0).getColumns();
+      String colTypes = loadFileWork.get(0).getColumnTypes();
+
+      String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+      TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+
+      FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
+          resultTab, qb.getParseInfo().getOuterQueryLimit());
+
+      pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch, conf));
+
+      // For the FetchTask, the limit optimization requires we fetch all the rows
+      // in memory and count how many rows we get. It's not practical if the
+      // limit factor is too big
+      int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
+      if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) {
+        LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit
+            + ". Doesn't qualify limit optimiztion.");
+        globalLimitCtx.disableOpt();
+      }
+    } else if (!isCStats) {
+      for (LoadTableDesc ltd : loadTableWork) {
+        Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
+        mvTask.add(tsk);
+        // Check to see if we are stale'ing any indexes and auto-update them if we want
+        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
+          IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf);
+          try {
+            List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
+                .generateUpdateTasks();
+            for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
+              tsk.addDependentTask(updateTask);
+            }
+          } catch (HiveException e) {
+            console
+                .printInfo("WARNING: could not auto-update stale indexes, which are not in sync");
+          }
+        }
+      }
+
+      boolean oneLoadFile = true;
+      for (LoadFileDesc lfd : loadFileWork) {
+        if (qb.isCTAS()) {
+          assert (oneLoadFile); // should not have more than 1 load file for
+          // CTAS
+          // make the movetask's destination directory the table's destination.
+          String location = qb.getTableDesc().getLocation();
+          if (location == null) {
+            // get the table's default location
+            Table dumpTable;
+            Path targetPath;
+            try {
+              dumpTable = db.newTable(qb.getTableDesc().getTableName());
+              if (!db.databaseExists(dumpTable.getDbName())) {
+                throw new SemanticException("ERROR: The database " + dumpTable.getDbName()
+                    + " does not exist.");
+              }
+              Warehouse wh = new Warehouse(conf);
+              targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable
+                  .getTableName());
+            } catch (HiveException e) {
+              throw new SemanticException(e);
+            } catch (MetaException e) {
+              throw new SemanticException(e);
+            }
+
+            location = targetPath.toString();
+          }
+          lfd.setTargetDir(location);
+
+          oneLoadFile = false;
+        }
+        mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf));
+      }
+    }
+
+    // generate map reduce plans
+    ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
+    GenMRProcContext procCtx = new GenMRProcContext(
+        conf,
+        new HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
+        new ArrayList<Operator<? extends OperatorDesc>>(), tempParseContext,
+        mvTask, rootTasks,
+        new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
+        inputs, outputs);
+
+    // create a walker which walks the tree in a DFS manner while maintaining
+    // the operator stack.
+    // The dispatcher generates the plan from the operator tree
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp(new String("R1"),
+        TableScanOperator.getOperatorName() + "%"),
+        new GenMRTableScan1());
+    opRules.put(new RuleRegExp(new String("R2"),
+        TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+        new GenMRRedSink1());
+    opRules.put(new RuleRegExp(new String("R3"),
+        ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+        new GenMRRedSink2());
+    opRules.put(new RuleRegExp(new String("R4"),
+        FileSinkOperator.getOperatorName() + "%"),
+        new GenMRFileSink1());
+    opRules.put(new RuleRegExp(new String("R5"),
+        UnionOperator.getOperatorName() + "%"),
+        new GenMRUnion1());
+    opRules.put(new RuleRegExp(new String("R6"),
+        UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+        new GenMRRedSink3());
+    opRules.put(new RuleRegExp(new String("R7"),
+        MapJoinOperator.getOperatorName() + "%"),
+        MapJoinFactory.getTableScanMapJoin());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
+        procCtx);
+
+    GraphWalker ogw = new GenMapRedWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    /*
+     * If the query was the result of analyze table column compute statistics rewrite, create
+     * a column stats task instead of a fetch task to persist stats to the metastore.
+     */
+    if (isCStats) {
+      genColumnStatsTask(qb, loadTableWork, loadFileWork, rootTasks);
+    }
+
+    // reduce sink does not have any kids - since the plan by now has been
+    // broken up into multiple
+    // tasks, iterate over all tasks.
+    // For each task, go over all operators recursively
+    for (Task<? extends Serializable> rootTask : rootTasks) {
+      breakTaskTree(rootTask);
+    }
+
+    // For each task, set the key descriptor for the reducer
+    for (Task<? extends Serializable> rootTask : rootTasks) {
+      GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
+    }
+
+    // If a task contains an operator which instructs bucketizedhiveinputformat
+    // to be used, please do so
+    for (Task<? extends Serializable> rootTask : rootTasks) {
+      setInputFormat(rootTask);
+    }
+
+    PhysicalContext physicalContext = new PhysicalContext(conf,
+        getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
+    PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
+        physicalContext, conf);
+    physicalOptimizer.optimize();
+
+    // For each operator, generate the counters if needed
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS)) {
+      for (Task<? extends Serializable> rootTask : rootTasks) {
+        generateCountersTask(rootTask);
+      }
+    }
+
+    decideExecMode(rootTasks, ctx, globalLimitCtx);
+
+    if (qb.isCTAS()) {
+      // generate a DDL task and make it a dependent task of the leaf
+      CreateTableDesc crtTblDesc = qb.getTableDesc();
+
+      crtTblDesc.validate();
+
+      // Clear the output for CTAS since we don't need the output from the
+      // mapredWork, the
+      // DDLWork at the tail of the chain will have the output
+      outputs.clear();
+
+      Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork(
+          inputs, outputs, crtTblDesc), conf);
+
+      // find all leaf tasks and make the DDLTask as a dependent task of all of
+      // them
+      HashSet<Task<? extends Serializable>> leaves = new HashSet<Task<? extends Serializable>>();
+      getLeafTasks(rootTasks, leaves);
+      assert (leaves.size() > 0);
+      for (Task<? extends Serializable> task : leaves) {
+        if (task instanceof StatsTask) {
+          // StatsTask require table to already exist
+          for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) {
+            parentOfStatsTask.addDependentTask(crtTblTask);
+          }
+          for (Task<? extends Serializable> parentOfCrtTblTask : crtTblTask.getParentTasks()) {
+            parentOfCrtTblTask.removeDependentTask(task);
+          }
+          crtTblTask.addDependentTask(task);
+        } else {
+          task.addDependentTask(crtTblTask);
+        }
+      }
+    }
+
+    if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) {
+      LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit());
+      pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit());
+    }
+
+    if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) {
+      LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit());
+      globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit());
+      List<ExecDriver> mrTasks = Utilities.getMRTasks(rootTasks);
+      for (ExecDriver tsk : mrTasks) {
+        tsk.setRetryCmdWhenFail(true);
+      }
+    }
+  }
+
+  private void setInputFormat(MapredWork work, Operator<? extends OperatorDesc> op) {
+    if (op.isUseBucketizedHiveInputFormat()) {
+      work.setUseBucketizedHiveInputFormat(true);
+      return;
+    }
+
+    if (op.getChildOperators() != null) {
+      for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+        setInputFormat(work, childOp);
+      }
+    }
+  }
+
+  // loop over all the tasks recursively
+  private void setInputFormat(Task<? extends Serializable> task) {
+    if (task instanceof ExecDriver) {
+      MapredWork work = (MapredWork) task.getWork();
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
+      if (!opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          setInputFormat(work, op);
+        }
+      }
+    } else if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks
+        = ((ConditionalTask) task).getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks) {
+        setInputFormat(tsk);
+      }
+    }
+
+    if (task.getChildTasks() != null) {
+      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+        setInputFormat(childTask);
+      }
+    }
+  }
+
+  // loop over all the tasks recursively
+  private void generateCountersTask(Task<? extends Serializable> task) {
+    if (task instanceof ExecDriver) {
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
+          .getWork()).getAliasToWork();
+      if (!opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          generateCountersOperator(op);
+        }
+      }
+
+      Operator<? extends OperatorDesc> reducer = ((MapredWork) task.getWork())
+          .getReducer();
+      if (reducer != null) {
+        LOG.info("Generating counters for operator " + reducer);
+        generateCountersOperator(reducer);
+      }
+    } else if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+          .getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks) {
+        generateCountersTask(tsk);
+      }
+    }
+
+    // Start the counters from scratch - a hack for hadoop 17.
+    Operator.resetLastEnumUsed();
+
+    if (task.getChildTasks() == null) {
+      return;
+    }
+
+    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+      generateCountersTask(childTask);
+    }
+  }
+
+  private void generateCountersOperator(Operator<? extends OperatorDesc> op) {
+    op.assignCounterNameToEnum();
+
+    if (op.getChildOperators() == null) {
+      return;
+    }
+
+    for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
+      generateCountersOperator(child);
+    }
+  }
+
+  public ParseContext getParseContext(ParseContext pCtx, List<Task<? extends Serializable>> rootTasks) {
+    return new ParseContext(conf, pCtx.getQB(), pCtx.getParseTree(),
+        pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(),
+        pCtx.getTopSelOps(), pCtx.getOpParseCtx(), pCtx.getJoinContext(),
+        pCtx.getSmbMapJoinContext(), pCtx.getTopToTable(), pCtx.getFsopToTable(),
+        pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(),
+        pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(),
+        pCtx.getListMapJoinOpsNoReducer(), pCtx.getGroupOpToInputTables(),
+        pCtx.getPrunedPartitions(), pCtx.getOpToSamplePruner(), pCtx.getGlobalLimitCtx(),
+        pCtx.getNameToSplitSample(), pCtx.getSemanticInputs(), rootTasks,
+        pCtx.getOpToPartToSkewedPruner(), pCtx.getViewAliasToInput(),
+        pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(),
+        pCtx.getQueryProperties());
+  }
+
+  // loop over all the tasks recursively
+  private void breakTaskTree(Task<? extends Serializable> task) {
+
+    if (task instanceof ExecDriver) {
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
+          .getWork()).getAliasToWork();
+      if (!opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          breakOperatorTree(op);
+        }
+      }
+    } else if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+          .getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks) {
+        breakTaskTree(tsk);
+      }
+    }
+
+    if (task.getChildTasks() == null) {
+      return;
+    }
+
+    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+      breakTaskTree(childTask);
+    }
+  }
+
+  // loop over all the operators recursively
+  private void breakOperatorTree(Operator<? extends OperatorDesc> topOp) {
+    if (topOp instanceof ReduceSinkOperator) {
+      topOp.setChildOperators(null);
+    }
+
+    if (topOp.getChildOperators() == null) {
+      return;
+    }
+
+    for (Operator<? extends OperatorDesc> op : topOp.getChildOperators()) {
+      breakOperatorTree(op);
+    }
+  }
+
+  /**
+   * A helper function to generate a column stats task on top of map-red task. The column stats
+   * task fetches from the output of the map-red task, constructs the column stats object and
+   * persists it to the metastore.
+   *
+   * This method generates a plan with a column stats task on top of map-red task and sets up the
+   * appropriate metadata to be used during execution.
+   *
+   * @param qb
+   */
+  @SuppressWarnings("unchecked")
+  private void genColumnStatsTask(QB qb, List<LoadTableDesc> loadTableWork,
+      List<LoadFileDesc> loadFileWork, List<Task<? extends Serializable>> rootTasks) {
+    QBParseInfo qbParseInfo = qb.getParseInfo();
+    ColumnStatsTask cStatsTask = null;
+    ColumnStatsWork cStatsWork = null;
+    FetchWork fetch = null;
+    String tableName = qbParseInfo.getTableName();
+    String partName = qbParseInfo.getPartName();
+    List<String> colName = qbParseInfo.getColName();
+    List<String> colType = qbParseInfo.getColType();
+    boolean isTblLevel = qbParseInfo.isTblLvl();
+
+    String cols = loadFileWork.get(0).getColumns();
+    String colTypes = loadFileWork.get(0).getColumnTypes();
+
+    String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+    TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+
+    fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
+        resultTab, qb.getParseInfo().getOuterQueryLimit());
+
+    ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName,
+        colName, colType, isTblLevel);
+    cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
+    cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
+    rootTasks.add(cStatsTask);
+  }
+
+  /**
+   * Find all leaf tasks of the list of root tasks.
+   */
+  private void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
+      HashSet<Task<? extends Serializable>> leaves) {
+
+    for (Task<? extends Serializable> root : rootTasks) {
+      getLeafTasks(root, leaves);
+    }
+  }
+
+  private void getLeafTasks(Task<? extends Serializable> task,
+      HashSet<Task<? extends Serializable>> leaves) {
+    if (task.getDependentTasks() == null) {
+      if (!leaves.contains(task)) {
+        leaves.add(task);
+      }
+    } else {
+      getLeafTasks(task.getDependentTasks(), leaves);
+    }
+  }
+
+  /**
+   * Make a best guess at trying to find the number of reducers
+   */
+  private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) {
+    if (mrwork.getReducer() == null) {
+      return 0;
+    }
+
+    if (mrwork.getNumReduceTasks() >= 0) {
+      return mrwork.getNumReduceTasks();
+    }
+
+    return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+  }
+
+  private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
+      GlobalLimitCtx globalLimitCtx)
+      throws SemanticException {
+
+    // bypass for explain queries for now
+    if (ctx.getExplain()) {
+      return;
+    }
+
+    // user has told us to run in local mode or doesn't want auto-local mode
+    if (ctx.isLocalOnlyExecutionMode() ||
+        !conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
+      return;
+    }
+
+    final Context lCtx = ctx;
+    PathFilter p = new PathFilter() {
+      public boolean accept(Path file) {
+        return !lCtx.isMRTmpFileURI(file.toUri().getPath());
+      }
+    };
+    List<ExecDriver> mrtasks = Utilities.getMRTasks(rootTasks);
+
+    // map-reduce jobs will be run locally based on data size
+    // first find out if any of the jobs needs to run non-locally
+    boolean hasNonLocalJob = false;
+    for (ExecDriver mrtask : mrtasks) {
+      try {
+        ContentSummary inputSummary = Utilities.getInputSummary
+            (ctx, (MapredWork) mrtask.getWork(), p);
+        int numReducers = getNumberOfReducers(mrtask.getWork(), conf);
+
+        long estimatedInput;
+
+        if (globalLimitCtx != null && globalLimitCtx.isEnable()) {
+          // If the global limit optimization is triggered, we will
+          // estimate input data actually needed based on limit rows.
+          // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2)
+          //
+          long sizePerRow = HiveConf.getLongVar(conf,
+              HiveConf.ConfVars.HIVELIMITMAXROWSIZE);
+          estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow;
+          long minSplitSize = HiveConf.getLongVar(conf,
+              HiveConf.ConfVars.MAPREDMINSPLITSIZE);
+          long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1;
+          estimatedInput = estimatedInput * (estimatedNumMap + 1);
+        } else {
+          estimatedInput = inputSummary.getLength();
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Task: " + mrtask.getId() + ", Summary: " +
+              inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
+              + numReducers + ", estimated Input: " + estimatedInput);
+        }
+
+        if (MapRedTask.isEligibleForLocalMode(conf, numReducers,
+            estimatedInput, inputSummary.getFileCount()) != null) {
+          hasNonLocalJob = true;
+          break;
+        } else {
+          mrtask.setLocalMode(true);
+        }
+      } catch (IOException e) {
+        throw new SemanticException(e);
+      }
+    }
+
+    if (!hasNonLocalJob) {
+      // Entire query can be run locally.
+      // Save the current tracker value and restore it when done.
+      ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf));
+      ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, "local");
+      console.printInfo("Automatically selecting local only mode for query");
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1500375&r1=1500374&r2=1500375&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sun Jul  7 06:42:24 2013
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,9 +37,7 @@ import org.antlr.runtime.tree.Tree;
 import org.antlr.runtime.tree.TreeWizard;
 import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -51,22 +48,17 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExecDriver;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -75,7 +67,6 @@ import org.apache.hadoop.hive.ql.exec.Re
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -89,13 +80,9 @@ import org.apache.hadoop.hive.ql.io.Comb
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
-import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.Rule;
-import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.DummyPartition;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -104,20 +91,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
-import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
-import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
-import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
-import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1;
-import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
-import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
 import org.apache.hadoop.hive.ql.optimizer.Optimizer;
-import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
-import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
@@ -139,8 +113,6 @@ import org.apache.hadoop.hive.ql.parse.W
 import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
 import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc;
 import org.apache.hadoop.hive.ql.plan.CreateViewDesc;
@@ -154,7 +126,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
-import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
@@ -170,8 +141,6 @@ import org.apache.hadoop.hive.ql.plan.Li
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.OrderExpressionDef;
@@ -205,7 +174,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.InputFormat;
 
 /**
@@ -885,7 +853,7 @@ public class SemanticAnalyzer extends Ba
               ErrorMsg.ORDERBY_DISTRIBUTEBY_CONFLICT.getMsg()));
         }
         break;
-        
+
       case HiveParser.TOK_SORTBY:
      // Get the sort by aliases - these are aliased to the entries in the
         // select list
@@ -8224,434 +8192,6 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
-  /**
-   * A helper function to generate a column stats task on top of map-red task. The column stats
-   * task fetches from the output of the map-red task, constructs the column stats object and
-   * persists it to the metastore.
-   *
-   * This method generates a plan with a column stats task on top of map-red task and sets up the
-   * appropriate metadata to be used during execution.
-   *
-   * @param qb
-   */
-  private void genColumnStatsTask(QB qb) {
-    QBParseInfo qbParseInfo = qb.getParseInfo();
-    ColumnStatsTask cStatsTask = null;
-    ColumnStatsWork cStatsWork = null;
-    FetchWork fetch = null;
-    String tableName = qbParseInfo.getTableName();
-    String partName = qbParseInfo.getPartName();
-    List<String> colName = qbParseInfo.getColName();
-    List<String> colType = qbParseInfo.getColType();
-    boolean isTblLevel = qbParseInfo.isTblLvl();
-
-    String cols = loadFileWork.get(0).getColumns();
-    String colTypes = loadFileWork.get(0).getColumnTypes();
-
-    String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-    TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
-
-    fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
-        resultTab, qb.getParseInfo().getOuterQueryLimit());
-
-    ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName,
-        colName, colType, isTblLevel);
-    cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
-    cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
-    rootTasks.add(cStatsTask);
-  }
-
-  @SuppressWarnings("nls")
-  private void genMapRedTasks(ParseContext pCtx) throws SemanticException {
-    boolean isCStats = qb.isAnalyzeRewrite();
-
-    if (pCtx.getFetchTask() != null) {
-      // replaced by single fetch task
-      initParseCtx(pCtx);
-      return;
-    }
-
-    initParseCtx(pCtx);
-    List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
-
-    /*
-     * In case of a select, use a fetch task instead of a move task.
-     * If the select is from analyze table column rewrite, don't create a fetch task. Instead create
-     * a column stats task later.
-     */
-    if (qb.getIsQuery() && !isCStats) {
-      if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
-        throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
-      }
-      String cols = loadFileWork.get(0).getColumns();
-      String colTypes = loadFileWork.get(0).getColumnTypes();
-
-      String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-      TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
-
-      FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
-          resultTab, qb.getParseInfo().getOuterQueryLimit());
-
-      FetchTask fetchTask = (FetchTask) TaskFactory.get(fetch, conf);
-      setFetchTask(fetchTask);
-
-      // For the FetchTask, the limit optimiztion requires we fetch all the rows
-      // in memory and count how many rows we get. It's not practical if the
-      // limit factor is too big
-      int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
-      if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) {
-        LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit
-            + ". Doesn't qualify limit optimiztion.");
-        globalLimitCtx.disableOpt();
-      }
-    } else if (!isCStats) {
-      for (LoadTableDesc ltd : loadTableWork) {
-        Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false),
-            conf);
-        mvTask.add(tsk);
-        // Check to see if we are stale'ing any indexes and auto-update them if we want
-        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
-          IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, getInputs(), conf);
-          try {
-            List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
-                .generateUpdateTasks();
-            for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
-              tsk.addDependentTask(updateTask);
-            }
-          } catch (HiveException e) {
-            console
-                .printInfo("WARNING: could not auto-update stale indexes, which are not in sync");
-          }
-        }
-      }
-
-      boolean oneLoadFile = true;
-      for (LoadFileDesc lfd : loadFileWork) {
-        if (qb.isCTAS()) {
-          assert (oneLoadFile); // should not have more than 1 load file for
-          // CTAS
-          // make the movetask's destination directory the table's destination.
-          String location = qb.getTableDesc().getLocation();
-          if (location == null) {
-            // get the table's default location
-            Table dumpTable;
-            Path targetPath;
-            try {
-              dumpTable = db.newTable(qb.getTableDesc().getTableName());
-              if (!db.databaseExists(dumpTable.getDbName())) {
-                throw new SemanticException("ERROR: The database " + dumpTable.getDbName()
-                    + " does not exist.");
-              }
-              Warehouse wh = new Warehouse(conf);
-              targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable
-                  .getTableName());
-            } catch (HiveException e) {
-              throw new SemanticException(e);
-            } catch (MetaException e) {
-              throw new SemanticException(e);
-            }
-
-            location = targetPath.toString();
-          }
-          lfd.setTargetDir(location);
-
-          oneLoadFile = false;
-        }
-        mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false),
-            conf));
-      }
-    }
-
-    // generate map reduce plans
-    ParseContext tempParseContext = getParseContext();
-    GenMRProcContext procCtx = new GenMRProcContext(
-        conf,
-        new HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
-        new ArrayList<Operator<? extends OperatorDesc>>(), tempParseContext,
-        mvTask, rootTasks,
-        new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
-        inputs, outputs);
-
-    // create a walker which walks the tree in a DFS manner while maintaining
-    // the operator stack.
-    // The dispatcher generates the plan from the operator tree
-    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp(new String("R1"),
-        TableScanOperator.getOperatorName() + "%"),
-        new GenMRTableScan1());
-    opRules.put(new RuleRegExp(new String("R2"),
-        TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
-        new GenMRRedSink1());
-    opRules.put(new RuleRegExp(new String("R3"),
-        ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
-        new GenMRRedSink2());
-    opRules.put(new RuleRegExp(new String("R4"),
-        FileSinkOperator.getOperatorName() + "%"),
-        new GenMRFileSink1());
-    opRules.put(new RuleRegExp(new String("R5"),
-        UnionOperator.getOperatorName() + "%"),
-        new GenMRUnion1());
-    opRules.put(new RuleRegExp(new String("R6"),
-        UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
-        new GenMRRedSink3());
-    opRules.put(new RuleRegExp(new String("R7"),
-        MapJoinOperator.getOperatorName() + "%"),
-        MapJoinFactory.getTableScanMapJoin());
-
-    // The dispatcher fires the processor corresponding to the closest matching
-    // rule and passes the context along
-    Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
-        procCtx);
-
-    GraphWalker ogw = new GenMapRedWalker(disp);
-    ArrayList<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(topOps.values());
-    ogw.startWalking(topNodes, null);
-
-    /*
-     * If the query was the result of analyze table column compute statistics rewrite, create
-     * a column stats task instead of a fetch task to persist stats to the metastore.
-     */
-    if (isCStats) {
-      genColumnStatsTask(qb);
-    }
-
-    // reduce sink does not have any kids - since the plan by now has been
-    // broken up into multiple
-    // tasks, iterate over all tasks.
-    // For each task, go over all operators recursively
-    for (Task<? extends Serializable> rootTask : rootTasks) {
-      breakTaskTree(rootTask);
-    }
-
-    // For each task, set the key descriptor for the reducer
-    for (Task<? extends Serializable> rootTask : rootTasks) {
-      GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
-    }
-
-    // If a task contains an operator which instructs bucketizedhiveinputformat
-    // to be used, please do so
-    for (Task<? extends Serializable> rootTask : rootTasks) {
-      setInputFormat(rootTask);
-    }
-
-    PhysicalContext physicalContext = new PhysicalContext(conf,
-        getParseContext(), ctx, rootTasks, fetchTask);
-    PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
-        physicalContext, conf);
-    physicalOptimizer.optimize();
-
-    // For each operator, generate the counters if needed
-    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS)) {
-      for (Task<? extends Serializable> rootTask : rootTasks) {
-        generateCountersTask(rootTask);
-      }
-    }
-
-    decideExecMode(rootTasks, ctx, globalLimitCtx);
-
-    if (qb.isCTAS()) {
-      // generate a DDL task and make it a dependent task of the leaf
-      CreateTableDesc crtTblDesc = qb.getTableDesc();
-
-      crtTblDesc.validate();
-
-      // Clear the output for CTAS since we don't need the output from the
-      // mapredWork, the
-      // DDLWork at the tail of the chain will have the output
-      getOutputs().clear();
-
-      Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork(
-          getInputs(), getOutputs(), crtTblDesc), conf);
-
-      // find all leaf tasks and make the DDLTask as a dependent task of all of
-      // them
-      HashSet<Task<? extends Serializable>> leaves = new HashSet<Task<? extends Serializable>>();
-      getLeafTasks(rootTasks, leaves);
-      assert (leaves.size() > 0);
-      for (Task<? extends Serializable> task : leaves) {
-        if (task instanceof StatsTask) {
-          // StatsTask require table to already exist
-          for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) {
-            parentOfStatsTask.addDependentTask(crtTblTask);
-          }
-          for (Task<? extends Serializable> parentOfCrtTblTask : crtTblTask.getParentTasks()) {
-            parentOfCrtTblTask.removeDependentTask(task);
-          }
-          crtTblTask.addDependentTask(task);
-        } else {
-          task.addDependentTask(crtTblTask);
-        }
-      }
-    }
-
-    if (globalLimitCtx.isEnable() && fetchTask != null) {
-      int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
-      LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit());
-      fetchTask.getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit());
-    }
-
-    if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) {
-      LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit());
-      globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit());
-      List<ExecDriver> mrTasks = Utilities.getMRTasks(rootTasks);
-      for (ExecDriver tsk : mrTasks) {
-        tsk.setRetryCmdWhenFail(true);
-      }
-    }
-  }
-
-  /**
-   * Find all leaf tasks of the list of root tasks.
-   */
-  private void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
-      HashSet<Task<? extends Serializable>> leaves) {
-
-    for (Task<? extends Serializable> root : rootTasks) {
-      getLeafTasks(root, leaves);
-    }
-  }
-
-  private void getLeafTasks(Task<? extends Serializable> task,
-      HashSet<Task<? extends Serializable>> leaves) {
-    if (task.getDependentTasks() == null) {
-      if (!leaves.contains(task)) {
-        leaves.add(task);
-      }
-    } else {
-      getLeafTasks(task.getDependentTasks(), leaves);
-    }
-  }
-
-  // loop over all the tasks recursviely
-  private void generateCountersTask(Task<? extends Serializable> task) {
-    if (task instanceof ExecDriver) {
-      HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
-          .getWork()).getAliasToWork();
-      if (!opMap.isEmpty()) {
-        for (Operator<? extends OperatorDesc> op : opMap.values()) {
-          generateCountersOperator(op);
-        }
-      }
-
-      Operator<? extends OperatorDesc> reducer = ((MapredWork) task.getWork())
-          .getReducer();
-      if (reducer != null) {
-        LOG.info("Generating counters for operator " + reducer);
-        generateCountersOperator(reducer);
-      }
-    } else if (task instanceof ConditionalTask) {
-      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
-          .getListTasks();
-      for (Task<? extends Serializable> tsk : listTasks) {
-        generateCountersTask(tsk);
-      }
-    }
-
-    // Start the counters from scratch - a hack for hadoop 17.
-    Operator.resetLastEnumUsed();
-
-    if (task.getChildTasks() == null) {
-      return;
-    }
-
-    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
-      generateCountersTask(childTask);
-    }
-  }
-
-  private void generateCountersOperator(Operator<? extends OperatorDesc> op) {
-    op.assignCounterNameToEnum();
-
-    if (op.getChildOperators() == null) {
-      return;
-    }
-
-    for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
-      generateCountersOperator(child);
-    }
-  }
-
-  // loop over all the tasks recursviely
-  private void breakTaskTree(Task<? extends Serializable> task) {
-
-    if (task instanceof ExecDriver) {
-      HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
-          .getWork()).getAliasToWork();
-      if (!opMap.isEmpty()) {
-        for (Operator<? extends OperatorDesc> op : opMap.values()) {
-          breakOperatorTree(op);
-        }
-      }
-    } else if (task instanceof ConditionalTask) {
-      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
-          .getListTasks();
-      for (Task<? extends Serializable> tsk : listTasks) {
-        breakTaskTree(tsk);
-      }
-    }
-
-    if (task.getChildTasks() == null) {
-      return;
-    }
-
-    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
-      breakTaskTree(childTask);
-    }
-  }
-
-  // loop over all the operators recursviely
-  private void breakOperatorTree(Operator<? extends OperatorDesc> topOp) {
-    if (topOp instanceof ReduceSinkOperator) {
-      topOp.setChildOperators(null);
-    }
-
-    if (topOp.getChildOperators() == null) {
-      return;
-    }
-
-    for (Operator<? extends OperatorDesc> op : topOp.getChildOperators()) {
-      breakOperatorTree(op);
-    }
-  }
-
-  private void setInputFormat(MapredWork work, Operator<? extends OperatorDesc> op) {
-    if (op.isUseBucketizedHiveInputFormat()) {
-      work.setUseBucketizedHiveInputFormat(true);
-      return;
-    }
-
-    if (op.getChildOperators() != null) {
-      for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
-        setInputFormat(work, childOp);
-      }
-    }
-  }
-
-  // loop over all the tasks recursviely
-  private void setInputFormat(Task<? extends Serializable> task) {
-    if (task instanceof ExecDriver) {
-      MapredWork work = (MapredWork) task.getWork();
-      HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
-      if (!opMap.isEmpty()) {
-        for (Operator<? extends OperatorDesc> op : opMap.values()) {
-          setInputFormat(work, op);
-        }
-      }
-    } else if (task instanceof ConditionalTask) {
-      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task).getListTasks();
-      for (Task<? extends Serializable> tsk : listTasks) {
-        setInputFormat(tsk);
-      }
-    }
-
-    if (task.getChildTasks() != null) {
-      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
-        setInputFormat(childTask);
-      }
-    }
-  }
-
   @SuppressWarnings("nls")
   public Phase1Ctx initPhase1Ctx() {
 
@@ -8780,8 +8320,11 @@ public class SemanticAnalyzer extends Ba
     }
 
     // At this point we have the complete operator tree
-    // from which we want to find the reduce operator
-    genMapRedTasks(pCtx);
+    // from which we want to create the map-reduce plan
+    MapReduceCompiler compiler = new MapReduceCompiler();
+    compiler.init(conf, console, db);
+    compiler.compile(pCtx, rootTasks, inputs, outputs);
+    fetchTask = pCtx.getFetchTask();
 
     LOG.info("Completed plan generation");
 
@@ -9569,98 +9112,6 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
-  private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
-      GlobalLimitCtx globalLimitCtx)
-      throws SemanticException {
-
-    // bypass for explain queries for now
-    if (ctx.getExplain()) {
-      return;
-    }
-
-    // user has told us to run in local mode or doesn't want auto-local mode
-    if (ctx.isLocalOnlyExecutionMode() ||
-        !conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
-      return;
-    }
-
-    final Context lCtx = ctx;
-    PathFilter p = new PathFilter() {
-      public boolean accept(Path file) {
-        return !lCtx.isMRTmpFileURI(file.toUri().getPath());
-      }
-    };
-    List<ExecDriver> mrtasks = Utilities.getMRTasks(rootTasks);
-
-    // map-reduce jobs will be run locally based on data size
-    // first find out if any of the jobs needs to run non-locally
-    boolean hasNonLocalJob = false;
-    for (ExecDriver mrtask : mrtasks) {
-      try {
-        ContentSummary inputSummary = Utilities.getInputSummary
-            (ctx, (MapredWork) mrtask.getWork(), p);
-        int numReducers = getNumberOfReducers(mrtask.getWork(), conf);
-
-        long estimatedInput;
-
-        if (globalLimitCtx != null && globalLimitCtx.isEnable()) {
-          // If the global limit optimization is triggered, we will
-          // estimate input data actually needed based on limit rows.
-          // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2)
-          //
-          long sizePerRow = HiveConf.getLongVar(conf,
-              HiveConf.ConfVars.HIVELIMITMAXROWSIZE);
-          estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow;
-          long minSplitSize = HiveConf.getLongVar(conf,
-              HiveConf.ConfVars.MAPREDMINSPLITSIZE);
-          long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1;
-          estimatedInput = estimatedInput * (estimatedNumMap + 1);
-        } else {
-          estimatedInput = inputSummary.getLength();
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Task: " + mrtask.getId() + ", Summary: " +
-              inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
-              + numReducers + ", estimated Input: " + estimatedInput);
-        }
-
-        if (MapRedTask.isEligibleForLocalMode(conf, numReducers,
-            estimatedInput, inputSummary.getFileCount()) != null) {
-          hasNonLocalJob = true;
-          break;
-        } else {
-          mrtask.setLocalMode(true);
-        }
-      } catch (IOException e) {
-        throw new SemanticException(e);
-      }
-    }
-
-    if (!hasNonLocalJob) {
-      // Entire query can be run locally.
-      // Save the current tracker value and restore it when done.
-      ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf));
-      ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, "local");
-      console.printInfo("Automatically selecting local only mode for query");
-    }
-  }
-
-  /**
-   * Make a best guess at trying to find the number of reducers
-   */
-  private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) {
-    if (mrwork.getReducer() == null) {
-      return 0;
-    }
-
-    if (mrwork.getNumReduceTasks() >= 0) {
-      return mrwork.getNumReduceTasks();
-    }
-
-    return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
-  }
-
   // Process the position alias in GROUPBY and ORDERBY
   private void processPositionAlias(ASTNode ast) throws SemanticException {
     if (HiveConf.getBoolVar(conf,