You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/07 08:42:24 UTC
svn commit: r1500375 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse:
MapReduceCompiler.java SemanticAnalyzer.java
Author: hashutosh
Date: Sun Jul 7 06:42:24 2013
New Revision: 1500375
URL: http://svn.apache.org/r1500375
Log:
HIVE-4811 : (Slightly) break up the SemanticAnalyzer monstrosity (Gunther Hagleitner via Ashutosh Chauhan)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java?rev=1500375&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java Sun Jul 7 06:42:24 2013
@@ -0,0 +1,650 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
+import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
+import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
+import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
+import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1;
+import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+public class MapReduceCompiler {
+
+ protected final Log LOG = LogFactory.getLog(MapReduceCompiler.class);
+ private Hive db;
+ protected LogHelper console;
+ private HiveConf conf;
+
+
+ public MapReduceCompiler() {
+ }
+
+ public void init(HiveConf conf, LogHelper console, Hive db) {
+ this.conf = conf;
+ this.db = db;
+ this.console = console;
+ }
+
+ @SuppressWarnings({"nls", "unchecked"})
+ public void compile(final ParseContext pCtx, final List<Task<? extends Serializable>> rootTasks,
+ final HashSet<ReadEntity> inputs, final HashSet<WriteEntity> outputs) throws SemanticException {
+
+ Context ctx = pCtx.getContext();
+ GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx();
+ QB qb = pCtx.getQB();
+ List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
+
+ List<LoadTableDesc> loadTableWork = pCtx.getLoadTableWork();
+ List<LoadFileDesc> loadFileWork = pCtx.getLoadFileWork();
+
+ boolean isCStats = qb.isAnalyzeRewrite();
+
+ if (pCtx.getFetchTask() != null) {
+ return;
+ }
+
+ /*
+ * In case of a select, use a fetch task instead of a move task.
+ * If the select is from analyze table column rewrite, don't create a fetch task. Instead create
+ * a column stats task later.
+ */
+ if (pCtx.getQB().getIsQuery() && !isCStats) {
+ if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
+ throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
+ }
+ String cols = loadFileWork.get(0).getColumns();
+ String colTypes = loadFileWork.get(0).getColumnTypes();
+
+ String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+ TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+
+ FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
+ resultTab, qb.getParseInfo().getOuterQueryLimit());
+
+ pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch, conf));
+
+ // For the FetchTask, the limit optimization requires we fetch all the rows
+ // in memory and count how many rows we get. It's not practical if the
+ // limit factor is too big
+ int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
+ if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) {
+ LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit
+ + ". Doesn't qualify limit optimiztion.");
+ globalLimitCtx.disableOpt();
+ }
+ } else if (!isCStats) {
+ for (LoadTableDesc ltd : loadTableWork) {
+ Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
+ mvTask.add(tsk);
+ // Check to see if we are stale'ing any indexes and auto-update them if we want
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
+ IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf);
+ try {
+ List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
+ .generateUpdateTasks();
+ for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
+ tsk.addDependentTask(updateTask);
+ }
+ } catch (HiveException e) {
+ console
+ .printInfo("WARNING: could not auto-update stale indexes, which are not in sync");
+ }
+ }
+ }
+
+ boolean oneLoadFile = true;
+ for (LoadFileDesc lfd : loadFileWork) {
+ if (qb.isCTAS()) {
+ assert (oneLoadFile); // should not have more than 1 load file for
+ // CTAS
+ // make the movetask's destination directory the table's destination.
+ String location = qb.getTableDesc().getLocation();
+ if (location == null) {
+ // get the table's default location
+ Table dumpTable;
+ Path targetPath;
+ try {
+ dumpTable = db.newTable(qb.getTableDesc().getTableName());
+ if (!db.databaseExists(dumpTable.getDbName())) {
+ throw new SemanticException("ERROR: The database " + dumpTable.getDbName()
+ + " does not exist.");
+ }
+ Warehouse wh = new Warehouse(conf);
+ targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable
+ .getTableName());
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ } catch (MetaException e) {
+ throw new SemanticException(e);
+ }
+
+ location = targetPath.toString();
+ }
+ lfd.setTargetDir(location);
+
+ oneLoadFile = false;
+ }
+ mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf));
+ }
+ }
+
+ // generate map reduce plans
+ ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
+ GenMRProcContext procCtx = new GenMRProcContext(
+ conf,
+ new HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
+ new ArrayList<Operator<? extends OperatorDesc>>(), tempParseContext,
+ mvTask, rootTasks,
+ new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
+ inputs, outputs);
+
+ // create a walker which walks the tree in a DFS manner while maintaining
+ // the operator stack.
+ // The dispatcher generates the plan from the operator tree
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp(new String("R1"),
+ TableScanOperator.getOperatorName() + "%"),
+ new GenMRTableScan1());
+ opRules.put(new RuleRegExp(new String("R2"),
+ TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+ new GenMRRedSink1());
+ opRules.put(new RuleRegExp(new String("R3"),
+ ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+ new GenMRRedSink2());
+ opRules.put(new RuleRegExp(new String("R4"),
+ FileSinkOperator.getOperatorName() + "%"),
+ new GenMRFileSink1());
+ opRules.put(new RuleRegExp(new String("R5"),
+ UnionOperator.getOperatorName() + "%"),
+ new GenMRUnion1());
+ opRules.put(new RuleRegExp(new String("R6"),
+ UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
+ new GenMRRedSink3());
+ opRules.put(new RuleRegExp(new String("R7"),
+ MapJoinOperator.getOperatorName() + "%"),
+ MapJoinFactory.getTableScanMapJoin());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
+ procCtx);
+
+ GraphWalker ogw = new GenMapRedWalker(disp);
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ /*
+ * If the query was the result of analyze table column compute statistics rewrite, create
+ * a column stats task instead of a fetch task to persist stats to the metastore.
+ */
+ if (isCStats) {
+ genColumnStatsTask(qb, loadTableWork, loadFileWork, rootTasks);
+ }
+
+ // reduce sink does not have any kids - since the plan by now has been
+ // broken up into multiple
+ // tasks, iterate over all tasks.
+ // For each task, go over all operators recursively
+ for (Task<? extends Serializable> rootTask : rootTasks) {
+ breakTaskTree(rootTask);
+ }
+
+ // For each task, set the key descriptor for the reducer
+ for (Task<? extends Serializable> rootTask : rootTasks) {
+ GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
+ }
+
+ // If a task contains an operator which instructs bucketizedhiveinputformat
+ // to be used, please do so
+ for (Task<? extends Serializable> rootTask : rootTasks) {
+ setInputFormat(rootTask);
+ }
+
+ PhysicalContext physicalContext = new PhysicalContext(conf,
+ getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
+ PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
+ physicalContext, conf);
+ physicalOptimizer.optimize();
+
+ // For each operator, generate the counters if needed
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS)) {
+ for (Task<? extends Serializable> rootTask : rootTasks) {
+ generateCountersTask(rootTask);
+ }
+ }
+
+ decideExecMode(rootTasks, ctx, globalLimitCtx);
+
+ if (qb.isCTAS()) {
+ // generate a DDL task and make it a dependent task of the leaf
+ CreateTableDesc crtTblDesc = qb.getTableDesc();
+
+ crtTblDesc.validate();
+
+ // Clear the output for CTAS since we don't need the output from the
+ // mapredWork, the
+ // DDLWork at the tail of the chain will have the output
+ outputs.clear();
+
+ Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork(
+ inputs, outputs, crtTblDesc), conf);
+
+ // find all leaf tasks and make the DDLTask as a dependent task of all of
+ // them
+ HashSet<Task<? extends Serializable>> leaves = new HashSet<Task<? extends Serializable>>();
+ getLeafTasks(rootTasks, leaves);
+ assert (leaves.size() > 0);
+ for (Task<? extends Serializable> task : leaves) {
+ if (task instanceof StatsTask) {
+ // StatsTask require table to already exist
+ for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) {
+ parentOfStatsTask.addDependentTask(crtTblTask);
+ }
+ for (Task<? extends Serializable> parentOfCrtTblTask : crtTblTask.getParentTasks()) {
+ parentOfCrtTblTask.removeDependentTask(task);
+ }
+ crtTblTask.addDependentTask(task);
+ } else {
+ task.addDependentTask(crtTblTask);
+ }
+ }
+ }
+
+ if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) {
+ LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit());
+ pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit());
+ }
+
+ if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) {
+ LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit());
+ globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit());
+ List<ExecDriver> mrTasks = Utilities.getMRTasks(rootTasks);
+ for (ExecDriver tsk : mrTasks) {
+ tsk.setRetryCmdWhenFail(true);
+ }
+ }
+ }
+
+ private void setInputFormat(MapredWork work, Operator<? extends OperatorDesc> op) {
+ if (op.isUseBucketizedHiveInputFormat()) {
+ work.setUseBucketizedHiveInputFormat(true);
+ return;
+ }
+
+ if (op.getChildOperators() != null) {
+ for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+ setInputFormat(work, childOp);
+ }
+ }
+ }
+
+ // loop over all the tasks recursively
+ private void setInputFormat(Task<? extends Serializable> task) {
+ if (task instanceof ExecDriver) {
+ MapredWork work = (MapredWork) task.getWork();
+ HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
+ if (!opMap.isEmpty()) {
+ for (Operator<? extends OperatorDesc> op : opMap.values()) {
+ setInputFormat(work, op);
+ }
+ }
+ } else if (task instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> listTasks
+ = ((ConditionalTask) task).getListTasks();
+ for (Task<? extends Serializable> tsk : listTasks) {
+ setInputFormat(tsk);
+ }
+ }
+
+ if (task.getChildTasks() != null) {
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+ setInputFormat(childTask);
+ }
+ }
+ }
+
+ // loop over all the tasks recursively
+ private void generateCountersTask(Task<? extends Serializable> task) {
+ if (task instanceof ExecDriver) {
+ HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
+ .getWork()).getAliasToWork();
+ if (!opMap.isEmpty()) {
+ for (Operator<? extends OperatorDesc> op : opMap.values()) {
+ generateCountersOperator(op);
+ }
+ }
+
+ Operator<? extends OperatorDesc> reducer = ((MapredWork) task.getWork())
+ .getReducer();
+ if (reducer != null) {
+ LOG.info("Generating counters for operator " + reducer);
+ generateCountersOperator(reducer);
+ }
+ } else if (task instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+ .getListTasks();
+ for (Task<? extends Serializable> tsk : listTasks) {
+ generateCountersTask(tsk);
+ }
+ }
+
+ // Start the counters from scratch - a hack for hadoop 17.
+ Operator.resetLastEnumUsed();
+
+ if (task.getChildTasks() == null) {
+ return;
+ }
+
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+ generateCountersTask(childTask);
+ }
+ }
+
+ private void generateCountersOperator(Operator<? extends OperatorDesc> op) {
+ op.assignCounterNameToEnum();
+
+ if (op.getChildOperators() == null) {
+ return;
+ }
+
+ for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
+ generateCountersOperator(child);
+ }
+ }
+
+ public ParseContext getParseContext(ParseContext pCtx, List<Task<? extends Serializable>> rootTasks) {
+ return new ParseContext(conf, pCtx.getQB(), pCtx.getParseTree(),
+ pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(),
+ pCtx.getTopSelOps(), pCtx.getOpParseCtx(), pCtx.getJoinContext(),
+ pCtx.getSmbMapJoinContext(), pCtx.getTopToTable(), pCtx.getFsopToTable(),
+ pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(),
+ pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(),
+ pCtx.getListMapJoinOpsNoReducer(), pCtx.getGroupOpToInputTables(),
+ pCtx.getPrunedPartitions(), pCtx.getOpToSamplePruner(), pCtx.getGlobalLimitCtx(),
+ pCtx.getNameToSplitSample(), pCtx.getSemanticInputs(), rootTasks,
+ pCtx.getOpToPartToSkewedPruner(), pCtx.getViewAliasToInput(),
+ pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(),
+ pCtx.getQueryProperties());
+ }
+
+ // loop over all the tasks recursively
+ private void breakTaskTree(Task<? extends Serializable> task) {
+
+ if (task instanceof ExecDriver) {
+ HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
+ .getWork()).getAliasToWork();
+ if (!opMap.isEmpty()) {
+ for (Operator<? extends OperatorDesc> op : opMap.values()) {
+ breakOperatorTree(op);
+ }
+ }
+ } else if (task instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+ .getListTasks();
+ for (Task<? extends Serializable> tsk : listTasks) {
+ breakTaskTree(tsk);
+ }
+ }
+
+ if (task.getChildTasks() == null) {
+ return;
+ }
+
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+ breakTaskTree(childTask);
+ }
+ }
+
+ // loop over all the operators recursively
+ private void breakOperatorTree(Operator<? extends OperatorDesc> topOp) {
+ if (topOp instanceof ReduceSinkOperator) {
+ topOp.setChildOperators(null);
+ }
+
+ if (topOp.getChildOperators() == null) {
+ return;
+ }
+
+ for (Operator<? extends OperatorDesc> op : topOp.getChildOperators()) {
+ breakOperatorTree(op);
+ }
+ }
+
+ /**
+ * A helper function to generate a column stats task on top of map-red task. The column stats
+ * task fetches from the output of the map-red task, constructs the column stats object and
+ * persists it to the metastore.
+ *
+ * This method generates a plan with a column stats task on top of map-red task and sets up the
+ * appropriate metadata to be used during execution.
+ *
+ * @param qb
+ */
+ @SuppressWarnings("unchecked")
+ private void genColumnStatsTask(QB qb, List<LoadTableDesc> loadTableWork,
+ List<LoadFileDesc> loadFileWork, List<Task<? extends Serializable>> rootTasks) {
+ QBParseInfo qbParseInfo = qb.getParseInfo();
+ ColumnStatsTask cStatsTask = null;
+ ColumnStatsWork cStatsWork = null;
+ FetchWork fetch = null;
+ String tableName = qbParseInfo.getTableName();
+ String partName = qbParseInfo.getPartName();
+ List<String> colName = qbParseInfo.getColName();
+ List<String> colType = qbParseInfo.getColType();
+ boolean isTblLevel = qbParseInfo.isTblLvl();
+
+ String cols = loadFileWork.get(0).getColumns();
+ String colTypes = loadFileWork.get(0).getColumnTypes();
+
+ String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
+ TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
+
+ fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
+ resultTab, qb.getParseInfo().getOuterQueryLimit());
+
+ ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName,
+ colName, colType, isTblLevel);
+ cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
+ cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
+ rootTasks.add(cStatsTask);
+ }
+
+ /**
+ * Find all leaf tasks of the list of root tasks.
+ */
+ private void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
+ HashSet<Task<? extends Serializable>> leaves) {
+
+ for (Task<? extends Serializable> root : rootTasks) {
+ getLeafTasks(root, leaves);
+ }
+ }
+
+ private void getLeafTasks(Task<? extends Serializable> task,
+ HashSet<Task<? extends Serializable>> leaves) {
+ if (task.getDependentTasks() == null) {
+ if (!leaves.contains(task)) {
+ leaves.add(task);
+ }
+ } else {
+ getLeafTasks(task.getDependentTasks(), leaves);
+ }
+ }
+
+ /**
+ * Make a best guess at trying to find the number of reducers
+ */
+ private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) {
+ if (mrwork.getReducer() == null) {
+ return 0;
+ }
+
+ if (mrwork.getNumReduceTasks() >= 0) {
+ return mrwork.getNumReduceTasks();
+ }
+
+ return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+ }
+
+ private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
+ GlobalLimitCtx globalLimitCtx)
+ throws SemanticException {
+
+ // bypass for explain queries for now
+ if (ctx.getExplain()) {
+ return;
+ }
+
+ // user has told us to run in local mode or doesn't want auto-local mode
+ if (ctx.isLocalOnlyExecutionMode() ||
+ !conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
+ return;
+ }
+
+ final Context lCtx = ctx;
+ PathFilter p = new PathFilter() {
+ public boolean accept(Path file) {
+ return !lCtx.isMRTmpFileURI(file.toUri().getPath());
+ }
+ };
+ List<ExecDriver> mrtasks = Utilities.getMRTasks(rootTasks);
+
+ // map-reduce jobs will be run locally based on data size
+ // first find out if any of the jobs needs to run non-locally
+ boolean hasNonLocalJob = false;
+ for (ExecDriver mrtask : mrtasks) {
+ try {
+ ContentSummary inputSummary = Utilities.getInputSummary
+ (ctx, (MapredWork) mrtask.getWork(), p);
+ int numReducers = getNumberOfReducers(mrtask.getWork(), conf);
+
+ long estimatedInput;
+
+ if (globalLimitCtx != null && globalLimitCtx.isEnable()) {
+ // If the global limit optimization is triggered, we will
+ // estimate input data actually needed based on limit rows.
+ // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2)
+ //
+ long sizePerRow = HiveConf.getLongVar(conf,
+ HiveConf.ConfVars.HIVELIMITMAXROWSIZE);
+ estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow;
+ long minSplitSize = HiveConf.getLongVar(conf,
+ HiveConf.ConfVars.MAPREDMINSPLITSIZE);
+ long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1;
+ estimatedInput = estimatedInput * (estimatedNumMap + 1);
+ } else {
+ estimatedInput = inputSummary.getLength();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Task: " + mrtask.getId() + ", Summary: " +
+ inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
+ + numReducers + ", estimated Input: " + estimatedInput);
+ }
+
+ if (MapRedTask.isEligibleForLocalMode(conf, numReducers,
+ estimatedInput, inputSummary.getFileCount()) != null) {
+ hasNonLocalJob = true;
+ break;
+ } else {
+ mrtask.setLocalMode(true);
+ }
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ if (!hasNonLocalJob) {
+ // Entire query can be run locally.
+ // Save the current tracker value and restore it when done.
+ ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf));
+ ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, "local");
+ console.printInfo("Automatically selecting local only mode for query");
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1500375&r1=1500374&r2=1500375&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sun Jul 7 06:42:24 2013
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse;
-import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,9 +37,7 @@ import org.antlr.runtime.tree.Tree;
import org.antlr.runtime.tree.TreeWizard;
import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -51,22 +48,17 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapRedTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -75,7 +67,6 @@ import org.apache.hadoop.hive.ql.exec.Re
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -89,13 +80,9 @@ import org.apache.hadoop.hive.ql.io.Comb
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
-import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.Rule;
-import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -104,20 +91,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
-import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
-import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
-import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
-import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1;
-import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
-import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.Optimizer;
-import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
-import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType;
import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
@@ -139,8 +113,6 @@ import org.apache.hadoop.hive.ql.parse.W
import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc;
import org.apache.hadoop.hive.ql.plan.CreateViewDesc;
@@ -154,7 +126,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
import org.apache.hadoop.hive.ql.plan.ExtractDesc;
-import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
@@ -170,8 +141,6 @@ import org.apache.hadoop.hive.ql.plan.Li
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc.OrderExpressionDef;
@@ -205,7 +174,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.InputFormat;
/**
@@ -885,7 +853,7 @@ public class SemanticAnalyzer extends Ba
ErrorMsg.ORDERBY_DISTRIBUTEBY_CONFLICT.getMsg()));
}
break;
-
+
case HiveParser.TOK_SORTBY:
// Get the sort by aliases - these are aliased to the entries in the
// select list
@@ -8224,434 +8192,6 @@ public class SemanticAnalyzer extends Ba
}
}
- /**
- * A helper function to generate a column stats task on top of map-red task. The column stats
- * task fetches from the output of the map-red task, constructs the column stats object and
- * persists it to the metastore.
- *
- * This method generates a plan with a column stats task on top of map-red task and sets up the
- * appropriate metadata to be used during execution.
- *
- * @param qb
- */
- private void genColumnStatsTask(QB qb) {
- QBParseInfo qbParseInfo = qb.getParseInfo();
- ColumnStatsTask cStatsTask = null;
- ColumnStatsWork cStatsWork = null;
- FetchWork fetch = null;
- String tableName = qbParseInfo.getTableName();
- String partName = qbParseInfo.getPartName();
- List<String> colName = qbParseInfo.getColName();
- List<String> colType = qbParseInfo.getColType();
- boolean isTblLevel = qbParseInfo.isTblLvl();
-
- String cols = loadFileWork.get(0).getColumns();
- String colTypes = loadFileWork.get(0).getColumnTypes();
-
- String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
-
- fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
- resultTab, qb.getParseInfo().getOuterQueryLimit());
-
- ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName,
- colName, colType, isTblLevel);
- cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
- cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
- rootTasks.add(cStatsTask);
- }
-
- @SuppressWarnings("nls")
- private void genMapRedTasks(ParseContext pCtx) throws SemanticException {
- boolean isCStats = qb.isAnalyzeRewrite();
-
- if (pCtx.getFetchTask() != null) {
- // replaced by single fetch task
- initParseCtx(pCtx);
- return;
- }
-
- initParseCtx(pCtx);
- List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
-
- /*
- * In case of a select, use a fetch task instead of a move task.
- * If the select is from analyze table column rewrite, don't create a fetch task. Instead create
- * a column stats task later.
- */
- if (qb.getIsQuery() && !isCStats) {
- if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {
- throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
- }
- String cols = loadFileWork.get(0).getColumns();
- String colTypes = loadFileWork.get(0).getColumnTypes();
-
- String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat);
-
- FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
- resultTab, qb.getParseInfo().getOuterQueryLimit());
-
- FetchTask fetchTask = (FetchTask) TaskFactory.get(fetch, conf);
- setFetchTask(fetchTask);
-
- // For the FetchTask, the limit optimiztion requires we fetch all the rows
- // in memory and count how many rows we get. It's not practical if the
- // limit factor is too big
- int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
- if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) {
- LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit
- + ". Doesn't qualify limit optimiztion.");
- globalLimitCtx.disableOpt();
- }
- } else if (!isCStats) {
- for (LoadTableDesc ltd : loadTableWork) {
- Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false),
- conf);
- mvTask.add(tsk);
- // Check to see if we are stale'ing any indexes and auto-update them if we want
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
- IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, getInputs(), conf);
- try {
- List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
- .generateUpdateTasks();
- for (Task<? extends Serializable> updateTask : indexUpdateTasks) {
- tsk.addDependentTask(updateTask);
- }
- } catch (HiveException e) {
- console
- .printInfo("WARNING: could not auto-update stale indexes, which are not in sync");
- }
- }
- }
-
- boolean oneLoadFile = true;
- for (LoadFileDesc lfd : loadFileWork) {
- if (qb.isCTAS()) {
- assert (oneLoadFile); // should not have more than 1 load file for
- // CTAS
- // make the movetask's destination directory the table's destination.
- String location = qb.getTableDesc().getLocation();
- if (location == null) {
- // get the table's default location
- Table dumpTable;
- Path targetPath;
- try {
- dumpTable = db.newTable(qb.getTableDesc().getTableName());
- if (!db.databaseExists(dumpTable.getDbName())) {
- throw new SemanticException("ERROR: The database " + dumpTable.getDbName()
- + " does not exist.");
- }
- Warehouse wh = new Warehouse(conf);
- targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable
- .getTableName());
- } catch (HiveException e) {
- throw new SemanticException(e);
- } catch (MetaException e) {
- throw new SemanticException(e);
- }
-
- location = targetPath.toString();
- }
- lfd.setTargetDir(location);
-
- oneLoadFile = false;
- }
- mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false),
- conf));
- }
- }
-
- // generate map reduce plans
- ParseContext tempParseContext = getParseContext();
- GenMRProcContext procCtx = new GenMRProcContext(
- conf,
- new HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
- new ArrayList<Operator<? extends OperatorDesc>>(), tempParseContext,
- mvTask, rootTasks,
- new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
- inputs, outputs);
-
- // create a walker which walks the tree in a DFS manner while maintaining
- // the operator stack.
- // The dispatcher generates the plan from the operator tree
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp(new String("R1"),
- TableScanOperator.getOperatorName() + "%"),
- new GenMRTableScan1());
- opRules.put(new RuleRegExp(new String("R2"),
- TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
- new GenMRRedSink1());
- opRules.put(new RuleRegExp(new String("R3"),
- ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
- new GenMRRedSink2());
- opRules.put(new RuleRegExp(new String("R4"),
- FileSinkOperator.getOperatorName() + "%"),
- new GenMRFileSink1());
- opRules.put(new RuleRegExp(new String("R5"),
- UnionOperator.getOperatorName() + "%"),
- new GenMRUnion1());
- opRules.put(new RuleRegExp(new String("R6"),
- UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
- new GenMRRedSink3());
- opRules.put(new RuleRegExp(new String("R7"),
- MapJoinOperator.getOperatorName() + "%"),
- MapJoinFactory.getTableScanMapJoin());
-
- // The dispatcher fires the processor corresponding to the closest matching
- // rule and passes the context along
- Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
- procCtx);
-
- GraphWalker ogw = new GenMapRedWalker(disp);
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(topOps.values());
- ogw.startWalking(topNodes, null);
-
- /*
- * If the query was the result of analyze table column compute statistics rewrite, create
- * a column stats task instead of a fetch task to persist stats to the metastore.
- */
- if (isCStats) {
- genColumnStatsTask(qb);
- }
-
- // reduce sink does not have any kids - since the plan by now has been
- // broken up into multiple
- // tasks, iterate over all tasks.
- // For each task, go over all operators recursively
- for (Task<? extends Serializable> rootTask : rootTasks) {
- breakTaskTree(rootTask);
- }
-
- // For each task, set the key descriptor for the reducer
- for (Task<? extends Serializable> rootTask : rootTasks) {
- GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
- }
-
- // If a task contains an operator which instructs bucketizedhiveinputformat
- // to be used, please do so
- for (Task<? extends Serializable> rootTask : rootTasks) {
- setInputFormat(rootTask);
- }
-
- PhysicalContext physicalContext = new PhysicalContext(conf,
- getParseContext(), ctx, rootTasks, fetchTask);
- PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
- physicalContext, conf);
- physicalOptimizer.optimize();
-
- // For each operator, generate the counters if needed
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS)) {
- for (Task<? extends Serializable> rootTask : rootTasks) {
- generateCountersTask(rootTask);
- }
- }
-
- decideExecMode(rootTasks, ctx, globalLimitCtx);
-
- if (qb.isCTAS()) {
- // generate a DDL task and make it a dependent task of the leaf
- CreateTableDesc crtTblDesc = qb.getTableDesc();
-
- crtTblDesc.validate();
-
- // Clear the output for CTAS since we don't need the output from the
- // mapredWork, the
- // DDLWork at the tail of the chain will have the output
- getOutputs().clear();
-
- Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork(
- getInputs(), getOutputs(), crtTblDesc), conf);
-
- // find all leaf tasks and make the DDLTask as a dependent task of all of
- // them
- HashSet<Task<? extends Serializable>> leaves = new HashSet<Task<? extends Serializable>>();
- getLeafTasks(rootTasks, leaves);
- assert (leaves.size() > 0);
- for (Task<? extends Serializable> task : leaves) {
- if (task instanceof StatsTask) {
- // StatsTask require table to already exist
- for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) {
- parentOfStatsTask.addDependentTask(crtTblTask);
- }
- for (Task<? extends Serializable> parentOfCrtTblTask : crtTblTask.getParentTasks()) {
- parentOfCrtTblTask.removeDependentTask(task);
- }
- crtTblTask.addDependentTask(task);
- } else {
- task.addDependentTask(crtTblTask);
- }
- }
- }
-
- if (globalLimitCtx.isEnable() && fetchTask != null) {
- int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH);
- LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit());
- fetchTask.getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit());
- }
-
- if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) {
- LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit());
- globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit());
- List<ExecDriver> mrTasks = Utilities.getMRTasks(rootTasks);
- for (ExecDriver tsk : mrTasks) {
- tsk.setRetryCmdWhenFail(true);
- }
- }
- }
-
- /**
- * Find all leaf tasks of the list of root tasks.
- */
- private void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
- HashSet<Task<? extends Serializable>> leaves) {
-
- for (Task<? extends Serializable> root : rootTasks) {
- getLeafTasks(root, leaves);
- }
- }
-
- private void getLeafTasks(Task<? extends Serializable> task,
- HashSet<Task<? extends Serializable>> leaves) {
- if (task.getDependentTasks() == null) {
- if (!leaves.contains(task)) {
- leaves.add(task);
- }
- } else {
- getLeafTasks(task.getDependentTasks(), leaves);
- }
- }
-
- // loop over all the tasks recursviely
- private void generateCountersTask(Task<? extends Serializable> task) {
- if (task instanceof ExecDriver) {
- HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
- .getWork()).getAliasToWork();
- if (!opMap.isEmpty()) {
- for (Operator<? extends OperatorDesc> op : opMap.values()) {
- generateCountersOperator(op);
- }
- }
-
- Operator<? extends OperatorDesc> reducer = ((MapredWork) task.getWork())
- .getReducer();
- if (reducer != null) {
- LOG.info("Generating counters for operator " + reducer);
- generateCountersOperator(reducer);
- }
- } else if (task instanceof ConditionalTask) {
- List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
- .getListTasks();
- for (Task<? extends Serializable> tsk : listTasks) {
- generateCountersTask(tsk);
- }
- }
-
- // Start the counters from scratch - a hack for hadoop 17.
- Operator.resetLastEnumUsed();
-
- if (task.getChildTasks() == null) {
- return;
- }
-
- for (Task<? extends Serializable> childTask : task.getChildTasks()) {
- generateCountersTask(childTask);
- }
- }
-
- private void generateCountersOperator(Operator<? extends OperatorDesc> op) {
- op.assignCounterNameToEnum();
-
- if (op.getChildOperators() == null) {
- return;
- }
-
- for (Operator<? extends OperatorDesc> child : op.getChildOperators()) {
- generateCountersOperator(child);
- }
- }
-
- // loop over all the tasks recursviely
- private void breakTaskTree(Task<? extends Serializable> task) {
-
- if (task instanceof ExecDriver) {
- HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
- .getWork()).getAliasToWork();
- if (!opMap.isEmpty()) {
- for (Operator<? extends OperatorDesc> op : opMap.values()) {
- breakOperatorTree(op);
- }
- }
- } else if (task instanceof ConditionalTask) {
- List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
- .getListTasks();
- for (Task<? extends Serializable> tsk : listTasks) {
- breakTaskTree(tsk);
- }
- }
-
- if (task.getChildTasks() == null) {
- return;
- }
-
- for (Task<? extends Serializable> childTask : task.getChildTasks()) {
- breakTaskTree(childTask);
- }
- }
-
- // loop over all the operators recursviely
- private void breakOperatorTree(Operator<? extends OperatorDesc> topOp) {
- if (topOp instanceof ReduceSinkOperator) {
- topOp.setChildOperators(null);
- }
-
- if (topOp.getChildOperators() == null) {
- return;
- }
-
- for (Operator<? extends OperatorDesc> op : topOp.getChildOperators()) {
- breakOperatorTree(op);
- }
- }
-
- private void setInputFormat(MapredWork work, Operator<? extends OperatorDesc> op) {
- if (op.isUseBucketizedHiveInputFormat()) {
- work.setUseBucketizedHiveInputFormat(true);
- return;
- }
-
- if (op.getChildOperators() != null) {
- for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
- setInputFormat(work, childOp);
- }
- }
- }
-
- // loop over all the tasks recursviely
- private void setInputFormat(Task<? extends Serializable> task) {
- if (task instanceof ExecDriver) {
- MapredWork work = (MapredWork) task.getWork();
- HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
- if (!opMap.isEmpty()) {
- for (Operator<? extends OperatorDesc> op : opMap.values()) {
- setInputFormat(work, op);
- }
- }
- } else if (task instanceof ConditionalTask) {
- List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task).getListTasks();
- for (Task<? extends Serializable> tsk : listTasks) {
- setInputFormat(tsk);
- }
- }
-
- if (task.getChildTasks() != null) {
- for (Task<? extends Serializable> childTask : task.getChildTasks()) {
- setInputFormat(childTask);
- }
- }
- }
-
@SuppressWarnings("nls")
public Phase1Ctx initPhase1Ctx() {
@@ -8780,8 +8320,11 @@ public class SemanticAnalyzer extends Ba
}
// At this point we have the complete operator tree
- // from which we want to find the reduce operator
- genMapRedTasks(pCtx);
+ // from which we want to create the map-reduce plan
+ MapReduceCompiler compiler = new MapReduceCompiler();
+ compiler.init(conf, console, db);
+ compiler.compile(pCtx, rootTasks, inputs, outputs);
+ fetchTask = pCtx.getFetchTask();
LOG.info("Completed plan generation");
@@ -9569,98 +9112,6 @@ public class SemanticAnalyzer extends Ba
}
}
- private void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx,
- GlobalLimitCtx globalLimitCtx)
- throws SemanticException {
-
- // bypass for explain queries for now
- if (ctx.getExplain()) {
- return;
- }
-
- // user has told us to run in local mode or doesn't want auto-local mode
- if (ctx.isLocalOnlyExecutionMode() ||
- !conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
- return;
- }
-
- final Context lCtx = ctx;
- PathFilter p = new PathFilter() {
- public boolean accept(Path file) {
- return !lCtx.isMRTmpFileURI(file.toUri().getPath());
- }
- };
- List<ExecDriver> mrtasks = Utilities.getMRTasks(rootTasks);
-
- // map-reduce jobs will be run locally based on data size
- // first find out if any of the jobs needs to run non-locally
- boolean hasNonLocalJob = false;
- for (ExecDriver mrtask : mrtasks) {
- try {
- ContentSummary inputSummary = Utilities.getInputSummary
- (ctx, (MapredWork) mrtask.getWork(), p);
- int numReducers = getNumberOfReducers(mrtask.getWork(), conf);
-
- long estimatedInput;
-
- if (globalLimitCtx != null && globalLimitCtx.isEnable()) {
- // If the global limit optimization is triggered, we will
- // estimate input data actually needed based on limit rows.
- // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2)
- //
- long sizePerRow = HiveConf.getLongVar(conf,
- HiveConf.ConfVars.HIVELIMITMAXROWSIZE);
- estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow;
- long minSplitSize = HiveConf.getLongVar(conf,
- HiveConf.ConfVars.MAPREDMINSPLITSIZE);
- long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1;
- estimatedInput = estimatedInput * (estimatedNumMap + 1);
- } else {
- estimatedInput = inputSummary.getLength();
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task: " + mrtask.getId() + ", Summary: " +
- inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
- + numReducers + ", estimated Input: " + estimatedInput);
- }
-
- if (MapRedTask.isEligibleForLocalMode(conf, numReducers,
- estimatedInput, inputSummary.getFileCount()) != null) {
- hasNonLocalJob = true;
- break;
- } else {
- mrtask.setLocalMode(true);
- }
- } catch (IOException e) {
- throw new SemanticException(e);
- }
- }
-
- if (!hasNonLocalJob) {
- // Entire query can be run locally.
- // Save the current tracker value and restore it when done.
- ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf));
- ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, "local");
- console.printInfo("Automatically selecting local only mode for query");
- }
- }
-
- /**
- * Make a best guess at trying to find the number of reducers
- */
- private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) {
- if (mrwork.getReducer() == null) {
- return 0;
- }
-
- if (mrwork.getNumReduceTasks() >= 0) {
- return mrwork.getNumReduceTasks();
- }
-
- return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
- }
-
// Process the position alias in GROUPBY and ORDERBY
private void processPositionAlias(ASTNode ast) throws SemanticException {
if (HiveConf.getBoolVar(conf,