You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/06/04 03:21:35 UTC
svn commit: r781633 [3/13] - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ data/scripts/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/parse/ ...
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java?rev=781633&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java Thu Jun 4 01:21:30 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Stack;
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+/**
+ * Processor for the rule - map join followed by reduce sink
+ */
+public class GenMRRedSink4 implements NodeProcessor {
+
+ public GenMRRedSink4() {
+ }
+
+ /**
+ * Reduce Scan encountered
+ * @param nd the reduce sink operator encountered
+ * @param opProcCtx context
+ */
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator op = (ReduceSinkOperator)nd;
+ GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+
+ ParseContext parseCtx = ctx.getParseCtx();
+
+ // map-join consisted on a bunch of map-only jobs, and it has been split after the mapjoin
+ Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ mapredWork plan = (mapredWork) currTask.getWork();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+
+ ctx.setCurrTask(currTask);
+
+ // If the plan for this reducer does not exist, initialize the plan
+ if (opMapTask == null) {
+ // When the reducer is encountered for the first time
+ if (plan.getReducer() == null)
+ GenMapRedUtils.initMapJoinPlan(op, ctx, true, false, true, -1);
+ // When mapjoin is followed by a multi-table insert
+ else
+ GenMapRedUtils.splitPlan(op, ctx);
+ }
+ // There is a join after mapjoin. One of the branches of mapjoin has already been initialized.
+ // Initialize the current branch, and join with the original plan.
+ else {
+ assert plan.getReducer() != reducer;
+ GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true, false);
+ }
+
+ mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+
+ // the mapjoin operator has been processed
+ ctx.setCurrMapJoinOp(null);
+ return null;
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Thu Jun 4 01:21:30 2009
@@ -97,7 +97,6 @@
GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
Task<? extends Serializable> uTask = null;
- pos = UnionProcFactory.getPositionParent(union, stack);
Operator<? extends Serializable> parent = union.getParentOperators().get(pos);
mapredWork uPlan = null;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Thu Jun 4 01:21:30 2009
@@ -18,46 +18,46 @@
package org.apache.hadoop.hive.ql.optimizer;
+import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Set;
-import java.util.Stack;
import java.io.Serializable;
-import java.io.File;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.mapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.tableScanDesc;
import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.parse.*;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* General utility common functions for the Processor to convert operator into map-reduce tasks
@@ -111,6 +111,91 @@
}
/**
+ * Initialize the current plan by adding it to root tasks
+ * @param op the map join operator encountered
+ * @param opProcCtx processing context
+ * @param pos position of the parent
+ */
+ public static void initMapJoinPlan(Operator<? extends Serializable> op, GenMRProcContext opProcCtx, boolean readInputMapJoin, boolean readInputUnion,
+ boolean setReducer, int pos)
+ throws SemanticException {
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ assert (((pos == -1) && (readInputMapJoin)) || (pos != -1));
+ int parentPos = (pos == -1) ? 0 : pos;
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(parentPos));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ mapredWork plan = (mapredWork) currTask.getWork();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+ Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+
+ // The mapjoin has already been encountered. Some context must be stored about that
+ if (readInputMapJoin) {
+ MapJoinOperator currMapJoinOp = opProcCtx.getCurrMapJoinOp();
+ assert currMapJoinOp != null;
+ boolean local = ((pos == -1) || (pos == ((mapJoinDesc)currMapJoinOp.getConf()).getPosBigTable())) ? false : true;
+
+ if (setReducer) {
+ Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+ plan.setReducer(reducer);
+ opTaskMap.put(reducer, currTask);
+ if (reducer.getClass() == JoinOperator.class)
+ plan.setNeedsTagging(true);
+ reduceSinkDesc desc = (reduceSinkDesc)op.getConf();
+ plan.setNumReduceTasks(desc.getNumReducers());
+ }
+ else
+ opTaskMap.put(op, currTask);
+
+ if (!readInputUnion) {
+ GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(currMapJoinOp);
+ String taskTmpDir;
+ tableDesc tt_desc;
+ Operator<? extends Serializable> rootOp;
+
+ if (mjCtx.getOldMapJoin() == null) {
+ taskTmpDir = mjCtx.getTaskTmpDir();
+ tt_desc = mjCtx.getTTDesc();
+ rootOp = mjCtx.getRootMapJoinOp();
+ }
+ else {
+ GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(mjCtx.getOldMapJoin());
+ taskTmpDir = oldMjCtx.getTaskTmpDir();
+ tt_desc = oldMjCtx.getTTDesc();
+ rootOp = oldMjCtx.getRootMapJoinOp();
+ }
+
+ setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
+ }
+ else {
+ initUnionPlan(opProcCtx, currTask, false);
+ }
+
+ opProcCtx.setCurrMapJoinOp(null);
+ }
+ else {
+ mapJoinDesc desc = (mapJoinDesc)op.getConf();
+
+ // The map is overloaded to keep track of mapjoins also
+ opTaskMap.put(op, currTask);
+
+ List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
+ rootTasks.add(currTask);
+
+ assert currTopOp != null;
+ List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+ String currAliasId = opProcCtx.getCurrAliasId();
+
+ seenOps.add(currTopOp);
+ boolean local = (pos == desc.getPosBigTable()) ? false : true;
+ setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
+ }
+
+ opProcCtx.setCurrTask(currTask);
+ opProcCtx.setCurrTopOp(null);
+ opProcCtx.setCurrAliasId(null);
+ }
+
+ /**
* Initialize the current union plan.
*
* @param op the reduce sink operator encountered
@@ -130,20 +215,17 @@
plan.setNumReduceTasks(desc.getNumReducers());
- List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
-
- // rootTasks.add(currTask);
if (reducer.getClass() == JoinOperator.class)
plan.setNeedsTagging(true);
- initUnionPlan(opProcCtx, currTask);
+ initUnionPlan(opProcCtx, currTask, false);
}
/*
* It is a idempotent function to add various intermediate files as the source for the
* union. The plan has already been created.
*/
- public static void initUnionPlan(GenMRProcContext opProcCtx, Task<? extends Serializable> currTask) {
+ public static void initUnionPlan(GenMRProcContext opProcCtx, Task<? extends Serializable> currTask, boolean local) {
mapredWork plan = (mapredWork) currTask.getWork();
UnionOperator currUnionOp = opProcCtx.getCurrUnionOp();
assert currUnionOp != null;
@@ -155,9 +237,10 @@
assert !taskTmpDirLst.isEmpty() && !tt_descLst.isEmpty();
assert taskTmpDirLst.size() == tt_descLst.size();
int size = taskTmpDirLst.size();
-
+ assert local == false;
+
for (int pos = 0; pos < size; pos++) {
- String taskTmpDir = taskTmpDirLst.get(pos);
+ String taskTmpDir = taskTmpDirLst.get(pos);
tableDesc tt_desc = tt_descLst.get(pos);
if (plan.getPathToAliases().get(taskTmpDir) == null) {
plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
@@ -170,20 +253,38 @@
/**
* Merge the current task with the task for the current reducer
- * @param task for the old task for the current reducer
+ * @param op operator being processed
+ * @param oldTask the old task for the current reducer
+ * @param task the current task for the current reducer
* @param opProcCtx processing context
+ * @param pos position of the parent in the stack
*/
- public static void joinPlan(ReduceSinkOperator op,
+ public static void joinPlan(Operator<? extends Serializable> op,
Task<? extends Serializable> oldTask,
Task<? extends Serializable> task,
- GenMRProcContext opProcCtx) throws SemanticException {
+ GenMRProcContext opProcCtx,
+ int pos, boolean split,
+ boolean readMapJoinData,
+ boolean readUnionData) throws SemanticException {
Task<? extends Serializable> currTask = task;
mapredWork plan = (mapredWork) currTask.getWork();
Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
-
+ List<Task<? extends Serializable>> parTasks = null;
+
// terminate the old task and make current task dependent on it
- if (oldTask != null) {
- splitTasks(op, oldTask, currTask, opProcCtx);
+ if (split) {
+ assert oldTask != null;
+ splitTasks((ReduceSinkOperator)op, oldTask, currTask, opProcCtx, true, false, 0);
+ }
+ else {
+ if ((oldTask != null) && (oldTask.getParentTasks() != null) && !oldTask.getParentTasks().isEmpty()) {
+ parTasks = new ArrayList<Task<? extends Serializable>>();
+ parTasks.addAll(oldTask.getParentTasks());
+
+ Object[] parTaskArr = parTasks.toArray();
+ for (int i = 0; i < parTaskArr.length; i++)
+ ((Task<? extends Serializable>)parTaskArr[i]).removeDependentTask(oldTask);
+ }
}
if (currTopOp != null) {
@@ -192,12 +293,55 @@
if (!seenOps.contains(currTopOp)) {
seenOps.add(currTopOp);
- setTaskPlan(currAliasId, currTopOp, plan, false, opProcCtx);
+ boolean local = false;
+ if (pos != -1)
+ local = (pos == ((mapJoinDesc)op.getConf()).getPosBigTable()) ? false : true;
+ setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
}
currTopOp = null;
opProcCtx.setCurrTopOp(currTopOp);
}
-
+ else if (opProcCtx.getCurrMapJoinOp() != null) {
+ MapJoinOperator mjOp = opProcCtx.getCurrMapJoinOp();
+ if (readUnionData) {
+ initUnionPlan(opProcCtx, currTask, false);
+ }
+ else {
+ GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
+
+ // In case of map-join followed by map-join, the file needs to be obtained from the old map join
+ MapJoinOperator oldMapJoin = mjCtx.getOldMapJoin();
+ String taskTmpDir = null;
+ tableDesc tt_desc = null;
+ Operator<? extends Serializable> rootOp = null;
+
+ if (oldMapJoin == null) {
+ taskTmpDir = mjCtx.getTaskTmpDir();
+ tt_desc = mjCtx.getTTDesc();
+ rootOp = mjCtx.getRootMapJoinOp();
+ }
+ else {
+ GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(oldMapJoin);
+ assert oldMjCtx != null;
+ taskTmpDir = oldMjCtx.getTaskTmpDir();
+ tt_desc = oldMjCtx.getTTDesc();
+ rootOp = oldMjCtx.getRootMapJoinOp();
+ }
+
+ boolean local = ((pos == -1) || (pos == ((mapJoinDesc)mjOp.getConf()).getPosBigTable())) ? false : true;
+ setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
+ }
+ opProcCtx.setCurrMapJoinOp(null);
+
+ if ((oldTask != null) && (parTasks != null)) {
+ for (Task<? extends Serializable> parTask : parTasks)
+ parTask.addDependentTask(currTask);
+ }
+
+ if (opProcCtx.getRootTasks().contains(currTask))
+ opProcCtx.getRootTasks().remove(currTask);
+ }
+
opProcCtx.setCurrTask(currTask);
}
@@ -224,7 +368,7 @@
opTaskMap.put(reducer, redTask);
Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
- splitTasks(op, currTask, redTask, opProcCtx);
+ splitTasks(op, currTask, redTask, opProcCtx, true, false, 0);
opProcCtx.getRootOps().add(op);
}
@@ -242,58 +386,144 @@
ParseContext parseCtx = opProcCtx.getParseCtx();
Set<ReadEntity> inputs = opProcCtx.getInputs();
- if (!local) {
- // Generate the map work for this alias_id
- PartitionPruner pruner = parseCtx.getAliasToPruner().get(alias_id);
- Set<Partition> parts = null;
- try {
- // pass both confirmed and unknown partitions through the map-reduce framework
- PartitionPruner.PrunedPartitionList partsList = pruner.prune();
-
- parts = partsList.getConfirmedPartns();
- parts.addAll(partsList.getUnknownPartns());
- } catch (HiveException e) {
- // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new SemanticException(e.getMessage(), e);
- }
- SamplePruner samplePruner = parseCtx.getAliasToSamplePruner().get(alias_id);
-
- for (Partition part : parts) {
- if (part.getTable().isPartitioned())
- inputs.add(new ReadEntity(part));
- else
- inputs.add(new ReadEntity(part.getTable()));
-
- // Later the properties have to come from the partition as opposed
- // to from the table in order to support versioning.
- Path paths[];
- if (samplePruner != null) {
- paths = samplePruner.prune(part);
- }
- else {
- paths = part.getPath();
- }
+ ArrayList<Path> partDir = new ArrayList<Path>();
+ ArrayList<partitionDesc> partDesc = new ArrayList<partitionDesc>();
+
+ Path tblDir = null;
+ tableDesc tblDesc = null;
+
+ // Generate the map work for this alias_id
+ PartitionPruner pruner = parseCtx.getAliasToPruner().get(alias_id);
+ Set<Partition> parts = null;
+ try {
+ // pass both confirmed and unknown partitions through the map-reduce framework
+ PartitionPruner.PrunedPartitionList partsList = pruner.prune();
+
+ parts = partsList.getConfirmedPartns();
+ parts.addAll(partsList.getUnknownPartns());
+ partitionDesc aliasPartnDesc = null;
+ if (parts.isEmpty()) {
+ if (!partsList.getDeniedPartns().isEmpty())
+ aliasPartnDesc = Utilities.getPartitionDesc(partsList.getDeniedPartns().iterator().next());
+ }
+ else {
+ aliasPartnDesc = Utilities.getPartitionDesc(parts.iterator().next());
+ }
+ plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
+ } catch (HiveException e) {
+ // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new SemanticException(e.getMessage(), e);
+ }
+ SamplePruner samplePruner = parseCtx.getAliasToSamplePruner().get(alias_id);
+
+ for (Partition part : parts) {
+ if (part.getTable().isPartitioned())
+ inputs.add(new ReadEntity(part));
+ else
+ inputs.add(new ReadEntity(part.getTable()));
+
+ // Later the properties have to come from the partition as opposed
+ // to from the table in order to support versioning.
+ Path paths[];
+ if (samplePruner != null) {
+ paths = samplePruner.prune(part);
+ }
+ else {
+ paths = part.getPath();
+ }
+
+ // is it a partitioned table ?
+ if (!part.getTable().isPartitioned()) {
+ assert ((tblDir == null) && (tblDesc == null));
+
+ tblDir = paths[0];
+ tblDesc = Utilities.getTableDesc(part.getTable());
+ }
+
+ for (Path p: paths) {
+ String path = p.toString();
+ LOG.debug("Adding " + path + " of table" + alias_id);
- for (Path p: paths) {
- String path = p.toString();
- LOG.debug("Adding " + path + " of table" + alias_id);
- // Add the path to alias mapping
- if (plan.getPathToAliases().get(path) == null) {
- plan.getPathToAliases().put(path, new ArrayList<String>());
- }
- plan.getPathToAliases().get(path).add(alias_id);
- plan.getPathToPartitionInfo().put(path, Utilities.getPartitionDesc(part));
- LOG.debug("Information added for path " + path);
+ partDir.add(p);
+ partDesc.add(Utilities.getPartitionDesc(part));
+ }
+ }
+
+ Iterator<Path> iterPath = partDir.iterator();
+ Iterator<partitionDesc> iterPartnDesc = partDesc.iterator();
+
+ if (!local) {
+ while (iterPath.hasNext()) {
+ assert iterPartnDesc.hasNext();
+ String path = iterPath.next().toString();
+
+ partitionDesc prtDesc = iterPartnDesc.next();
+
+ // Add the path to alias mapping
+ if (plan.getPathToAliases().get(path) == null) {
+ plan.getPathToAliases().put(path, new ArrayList<String>());
}
+ plan.getPathToAliases().get(path).add(alias_id);
+ plan.getPathToPartitionInfo().put(path, prtDesc);
+ LOG.debug("Information added for path " + path);
}
+
+ assert plan.getAliasToWork().get(alias_id) == null;
plan.getAliasToWork().put(alias_id, topOp);
- LOG.debug("Created Map Work for " + alias_id);
}
else {
- FileSinkOperator fOp = (FileSinkOperator) topOp;
- fileSinkDesc fConf = (fileSinkDesc)fOp.getConf();
// populate local work if needed
+ mapredLocalWork localPlan = plan.getMapLocalWork();
+ if (localPlan == null)
+ localPlan = new mapredLocalWork(
+ new LinkedHashMap<String, Operator<? extends Serializable>>(),
+ new LinkedHashMap<String, fetchWork>());
+
+ assert localPlan.getAliasToWork().get(alias_id) == null;
+ assert localPlan.getAliasToFetchWork().get(alias_id) == null;
+ localPlan.getAliasToWork().put(alias_id, topOp);
+ if (tblDir == null)
+ localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(fetchWork.convertPathToStringArray(partDir), partDesc));
+ else
+ localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(tblDir.toString(), tblDesc));
+ plan.setMapLocalWork(localPlan);
+ }
+ }
+
+
+ /**
+ * set the current task in the mapredWork
+ * @param alias current alias
+ * @param topOp the top operator of the stack
+ * @param plan current plan
+ * @param local whether you need to add to map-reduce or local work
+ * @param tt_desc table descriptor
+ */
+ public static void setTaskPlan(String path, String alias, Operator<? extends Serializable> topOp,
+ mapredWork plan, boolean local, tableDesc tt_desc)
+ throws SemanticException {
+
+ if (!local) {
+ if (plan.getPathToAliases().get(path) == null)
+ plan.getPathToAliases().put(path, new ArrayList<String>());
+ plan.getPathToAliases().get(path).add(alias);
+ plan.getPathToPartitionInfo().put(path, new partitionDesc(tt_desc, null));
+ plan.getAliasToWork().put(alias, topOp);
+ }
+ else {
+ // populate local work if needed
+ mapredLocalWork localPlan = plan.getMapLocalWork();
+ if (localPlan == null)
+ localPlan = new mapredLocalWork(
+ new LinkedHashMap<String, Operator<? extends Serializable>>(),
+ new LinkedHashMap<String, fetchWork>());
+
+ assert localPlan.getAliasToWork().get(alias) == null;
+ assert localPlan.getAliasToFetchWork().get(alias) == null;
+ localPlan.getAliasToWork().put(alias, topOp);
+ localPlan.getAliasToFetchWork().put(alias, new fetchWork(alias, tt_desc));
+ plan.setMapLocalWork(localPlan);
}
}
@@ -360,11 +590,14 @@
* @param oldTask the parent task
* @param task the child task
* @param opProcCtx context
+ * @param setReducer does the reducer needs to be set
+ * @param pos position of the parent
**/
- private static void splitTasks(ReduceSinkOperator op,
+ public static void splitTasks(Operator<? extends Serializable> op,
Task<? extends Serializable> parentTask,
Task<? extends Serializable> childTask,
- GenMRProcContext opProcCtx) throws SemanticException {
+ GenMRProcContext opProcCtx, boolean setReducer,
+ boolean local, int posn) throws SemanticException {
mapredWork plan = (mapredWork) childTask.getWork();
Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
@@ -380,7 +613,7 @@
Context baseCtx = parseCtx.getContext();
String taskTmpDir = baseCtx.getMRTmpFileURI();
- Operator<? extends Serializable> parent = op.getParentOperators().get(0);
+ Operator<? extends Serializable> parent = op.getParentOperators().get(posn);
tableDesc tt_desc =
PlanUtils.getBinaryTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
@@ -403,37 +636,59 @@
List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
parentOpList.add(parent);
fs_op.setParentOperators(parentOpList);
+
+ // create a dummy tableScan operator on top of op
+ Operator<? extends Serializable> ts_op =
+ putOpInsertMap(OperatorFactory.get(tableScanDesc.class, parent.getSchema()), null, parseCtx);
+
+ childOpList = new ArrayList<Operator<? extends Serializable>>();
+ childOpList.add(op);
+ ts_op.setChildOperators(childOpList);
+ op.getParentOperators().set(posn, ts_op);
+
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+ mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
- Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-
- String streamDesc;
+ String streamDesc = taskTmpDir;
mapredWork cplan = (mapredWork) childTask.getWork();
+
+ if (setReducer) {
+ Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
- if (reducer.getClass() == JoinOperator.class) {
- String origStreamDesc;
- streamDesc = "$INTNAME";
- origStreamDesc = streamDesc;
- int pos = 0;
- while (cplan.getAliasToWork().get(streamDesc) != null)
- streamDesc = origStreamDesc.concat(String.valueOf(++pos));
+ if (reducer.getClass() == JoinOperator.class) {
+ String origStreamDesc;
+ streamDesc = "$INTNAME";
+ origStreamDesc = streamDesc;
+ int pos = 0;
+ while (cplan.getAliasToWork().get(streamDesc) != null)
+ streamDesc = origStreamDesc.concat(String.valueOf(++pos));
+ }
+
+ // TODO: Allocate work to remove the temporary files and make that
+ // dependent on the redTask
+ if (reducer.getClass() == JoinOperator.class)
+ cplan.setNeedsTagging(true);
}
- else
- streamDesc = taskTmpDir;
-
+
// Add the path to alias mapping
- if (cplan.getPathToAliases().get(taskTmpDir) == null) {
- cplan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
+ setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, local, tt_desc);
+
+ // This can be cleaned up as a function table in future
+ if (op instanceof MapJoinOperator) {
+ MapJoinOperator mjOp = (MapJoinOperator)op;
+ opProcCtx.setCurrMapJoinOp(mjOp);
+ GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
+ if (mjCtx == null)
+ mjCtx = new GenMRMapJoinCtx(taskTmpDir, tt_desc, ts_op, null);
+ else {
+ mjCtx.setTaskTmpDir(taskTmpDir);
+ mjCtx.setTTDesc(tt_desc);
+ mjCtx.setRootMapJoinOp(ts_op);
+ }
+ opProcCtx.setMapJoinCtx(mjOp, mjCtx);
+ opProcCtx.getMapCurrCtx().put(parent, new GenMapRedCtx(childTask, null, null));
}
- cplan.getPathToAliases().get(taskTmpDir).add(streamDesc);
- cplan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
- cplan.getAliasToWork().put(streamDesc, op);
-
- // TODO: Allocate work to remove the temporary files and make that
- // dependent on the redTask
- if (reducer.getClass() == JoinOperator.class)
- cplan.setNeedsTagging(true);
-
currTopOp = null;
String currAliasId = null;
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=781633&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Thu Jun 4 01:21:30 2009
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Stack;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Operator factory for MapJoin processing
+ */
+public class MapJoinFactory {
+
+ public static int getPositionParent(MapJoinOperator op, Stack<Node> stack) {
+ int pos = 0;
+ int size = stack.size();
+ assert size >= 2 && stack.get(size - 1) == op;
+ Operator<? extends Serializable> parent = (Operator<? extends Serializable>)stack.get(size - 2);
+ List<Operator<? extends Serializable>> parOp = op.getParentOperators();
+ pos = parOp.indexOf(parent);
+ assert pos < parOp.size();
+ return pos;
+ }
+
+ /**
+ * TableScan followed by MapJoin
+ */
+ public static class TableScanMapJoin implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ MapJoinOperator mapJoin = (MapJoinOperator)nd;
+ GenMRProcContext ctx = (GenMRProcContext)procCtx;
+
+ // find the branch on which this processor was invoked
+ int pos = getPositionParent(mapJoin, stack);
+
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ mapredWork currPlan = (mapredWork) currTask.getWork();
+ Operator<? extends Serializable> currTopOp = mapredCtx.getCurrTopOp();
+ String currAliasId = mapredCtx.getCurrAliasId();
+ Operator<? extends Serializable> reducer = mapJoin;
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+
+ ctx.setCurrTopOp(currTopOp);
+ ctx.setCurrAliasId(currAliasId);
+ ctx.setCurrTask(currTask);
+
+ // If the plan for this reducer does not exist, initialize the plan
+ if (opMapTask == null) {
+ assert currPlan.getReducer() == null;
+ GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, false, false, false, pos);
+ }
+ // The current plan can be thrown away after being merged with the original plan
+ else {
+ GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false, false, false);
+ currTask = opMapTask;
+ ctx.setCurrTask(currTask);
+ }
+
+ mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+ return null;
+ }
+ }
+
+ /**
+ * ReduceSink followed by MapJoin
+ */
+ public static class ReduceSinkMapJoin implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ MapJoinOperator mapJoin = (MapJoinOperator)nd;
+ GenMRProcContext opProcCtx = (GenMRProcContext)procCtx;
+
+ mapredWork cplan = GenMapRedUtils.getMapRedWork();
+ ParseContext parseCtx = opProcCtx.getParseCtx();
+ Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx.getConf());
+ Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
+
+ // find the branch on which this processor was invoked
+ int pos = getPositionParent(mapJoin, stack);
+ boolean local = (pos == ((mapJoinDesc)mapJoin.getConf()).getPosBigTable()) ? false : true;
+
+ GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false, local, pos);
+
+ currTask = opProcCtx.getCurrTask();
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+ Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
+
+ // If the plan for this reducer does not exist, initialize the plan
+ if (opMapTask == null) {
+ assert cplan.getReducer() == null;
+ opTaskMap.put(mapJoin, currTask);
+ opProcCtx.setCurrMapJoinOp(null);
+ }
+ // The current plan can be thrown away after being merged with the original plan
+ else {
+ GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, opProcCtx, pos, false, false, false);
+ currTask = opMapTask;
+ opProcCtx.setCurrTask(currTask);
+ }
+
+ return null;
+ }
+ }
+
+ /**
+ * MapJoin followed by Select
+ */
+ public static class MapJoin implements NodeProcessor {
+
+ /**
+ * Create a task by splitting the plan below the join. The reason, we have to do so in the
+ * processing of Select and not MapJoin is due to the walker. While processing a node, it is not safe
+ * to alter its children because that will decide the course of the walk. It is perfectly fine to muck around
+ * with its parents though, since those nodes have already been visited.
+ */
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ SelectOperator sel = (SelectOperator)nd;
+ MapJoinOperator mapJoin = (MapJoinOperator)sel.getParentOperators().get(0);
+ assert sel.getParentOperators().size() == 1;
+
+ GenMRProcContext ctx = (GenMRProcContext)procCtx;
+ ParseContext parseCtx = ctx.getParseCtx();
+ ctx.setCurrMapJoinOp(mapJoin);
+
+ Task<? extends Serializable> currTask = ctx.getCurrTask();
+ GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
+ if (mjCtx == null) {
+ mjCtx = new GenMRMapJoinCtx();
+ ctx.setMapJoinCtx(mapJoin, mjCtx);
+ }
+
+ mapredWork mjPlan = GenMapRedUtils.getMapRedWork();
+ Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx.getConf());
+
+ tableDesc tt_desc =
+ PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol"));
+
+ // generate the temporary file
+ Context baseCtx = parseCtx.getContext();
+ String taskTmpDir = baseCtx.getMRTmpFileURI();
+
+ // Add the path to alias mapping
+ mjCtx.setTaskTmpDir(taskTmpDir);
+ mjCtx.setTTDesc(tt_desc);
+ mjCtx.setRootMapJoinOp(sel);
+
+ sel.setParentOperators(null);
+
+ // Create a file sink operator for this file name
+ Operator<? extends Serializable> fs_op =
+ OperatorFactory.get
+ (new fileSinkDesc(taskTmpDir, tt_desc,
+ parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE)),
+ mapJoin.getSchema());
+
+ assert mapJoin.getChildOperators().size() == 1;
+ mapJoin.getChildOperators().set(0, fs_op);
+
+ List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
+ parentOpList.add(mapJoin);
+ fs_op.setParentOperators(parentOpList);
+
+ currTask.addDependentTask(mjTask);
+
+ ctx.setCurrTask(mjTask);
+ ctx.setCurrAliasId(null);
+ ctx.setCurrTopOp(null);
+
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+
+ return null;
+ }
+ }
+
+ /**
+ * MapJoin followed by MapJoin
+ */
+ public static class MapJoinMapJoin implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ MapJoinOperator mapJoin = (MapJoinOperator)nd;
+ GenMRProcContext ctx = (GenMRProcContext)procCtx;
+
+ ParseContext parseCtx = ctx.getParseCtx();
+ MapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp();
+ assert oldMapJoin != null;
+ GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
+ if (mjCtx != null)
+ mjCtx.setOldMapJoin(oldMapJoin);
+ else
+ ctx.setMapJoinCtx(mapJoin, new GenMRMapJoinCtx(null, null, null, oldMapJoin));
+ ctx.setCurrMapJoinOp(mapJoin);
+
+ // find the branch on which this processor was invoked
+ int pos = getPositionParent(mapJoin, stack);
+
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ mapredWork currPlan = (mapredWork) currTask.getWork();
+ String currAliasId = mapredCtx.getCurrAliasId();
+ Operator<? extends Serializable> reducer = mapJoin;
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+
+ ctx.setCurrTask(currTask);
+
+ // If the plan for this reducer does not exist, initialize the plan
+ if (opMapTask == null) {
+ assert currPlan.getReducer() == null;
+ GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, false, false, pos);
+ }
+ // The current plan can be thrown away after being merged with the original plan
+ else {
+ GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, ctx, pos, false, true, false);
+ currTask = opMapTask;
+ ctx.setCurrTask(currTask);
+ }
+
+ mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+ return null;
+ }
+ }
+
+ /**
+ * Union followed by MapJoin
+ */
+ public static class UnionMapJoin implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ GenMRProcContext ctx = (GenMRProcContext)procCtx;
+
+ ParseContext parseCtx = ctx.getParseCtx();
+ UnionProcContext uCtx = parseCtx.getUCtx();
+
+ // union was map only - no special processing needed
+ if (uCtx.isMapOnlySubq())
+ return (new TableScanMapJoin()).process(nd, stack, procCtx, nodeOutputs);
+
+ UnionOperator currUnion = ctx.getCurrUnionOp();
+ assert currUnion != null;
+ GenMRUnionCtx unionCtx = ctx.getUnionTask(currUnion);
+ MapJoinOperator mapJoin = (MapJoinOperator)nd;
+
+ // find the branch on which this processor was invoked
+ int pos = getPositionParent(mapJoin, stack);
+
+ Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+ GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
+ Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+ mapredWork currPlan = (mapredWork) currTask.getWork();
+ Operator<? extends Serializable> reducer = mapJoin;
+ HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+ Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+
+ // union result cannot be a map table
+ boolean local = (pos == ((mapJoinDesc)mapJoin.getConf()).getPosBigTable()) ? false : true;
+ if (local)
+ throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_TABLE.getMsg());
+
+ // If the plan for this reducer does not exist, initialize the plan
+ if (opMapTask == null) {
+ assert currPlan.getReducer() == null;
+ ctx.setCurrMapJoinOp(mapJoin);
+ GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, true, false, pos);
+ ctx.setCurrUnionOp(null);
+ }
+ // The current plan can be thrown away after being merged with the original plan
+ else {
+ Task<? extends Serializable> uTask = ctx.getUnionTask(ctx.getCurrUnionOp()).getUTask();
+ if (uTask.getId().equals(opMapTask.getId()))
+ GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false, false, true);
+ else
+ GenMapRedUtils.joinPlan(mapJoin, uTask, opMapTask, ctx, pos, false, false, true);
+ currTask = opMapTask;
+ ctx.setCurrTask(currTask);
+ }
+
+ mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+ return null;
+ }
+ }
+
+ public static NodeProcessor getTableScanMapJoin() {
+ return new TableScanMapJoin();
+ }
+
+ public static NodeProcessor getUnionMapJoin() {
+ return new UnionMapJoin();
+ }
+
+ public static NodeProcessor getReduceSinkMapJoin() {
+ return new ReduceSinkMapJoin();
+ }
+
+ public static NodeProcessor getMapJoin() {
+ return new MapJoin();
+ }
+
+ public static NodeProcessor getMapJoinMapJoin() {
+ return new MapJoinMapJoin();
+ }
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=781633&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Thu Jun 4 01:21:30 2009
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.parse.joinCond;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.selectDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.joinDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * Implementation of one of the rule-based map join optimization. User passes hints to specify map-joins and during this optimization,
+ * all user specified map joins are converted to MapJoins - the reduce sink operator above the join are converted to map sink operators.
+ * In future, once statistics are implemented, this transformation can also be done based on costs.
+ */
+public class MapJoinProcessor implements Transform {
+ private ParseContext pGraphContext;
+
+ /**
+ * empty constructor
+ */
+ public MapJoinProcessor() {
+ pGraphContext = null;
+ }
+
+ @SuppressWarnings("nls")
+ private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr) {
+ OpParseContext ctx = new OpParseContext(rr);
+ pGraphContext.getOpParseCtx().put(op, ctx);
+ return op;
+ }
+
+ /**
+ * convert a regular join to a a map-side join.
+ * @param op join operator
+ * @param qbJoin qb join tree
+ * @param mapJoinPos position of the source to be read as part of map-reduce framework. All other sources are cached in memory
+ */
+ private void convertMapJoin(ParseContext pctx, JoinOperator op, QBJoinTree joinTree, int mapJoinPos) throws SemanticException {
+ // outer join cannot be performed on a table which is being cached
+ joinDesc desc = op.getConf();
+ org.apache.hadoop.hive.ql.plan.joinCond[] condns = desc.getConds();
+ for (org.apache.hadoop.hive.ql.plan.joinCond condn : condns) {
+ if (condn.getType() == joinDesc.FULL_OUTER_JOIN)
+ throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+ if ((condn.getType() == joinDesc.LEFT_OUTER_JOIN) && (condn.getLeft() != mapJoinPos))
+ throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+ if ((condn.getType() == joinDesc.RIGHT_OUTER_JOIN) && (condn.getRight() != mapJoinPos))
+ throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+ }
+
+ RowResolver outputRS = new RowResolver();
+ Map<Byte, List<exprNodeDesc>> keyExprMap = new HashMap<Byte, List<exprNodeDesc>>();
+ Map<Byte, List<exprNodeDesc>> valueExprMap = new HashMap<Byte, List<exprNodeDesc>>();
+
+ // Walk over all the sources (which are guaranteed to be reduce sink operators).
+ // The join outputs a concatenation of all the inputs.
+ QBJoinTree leftSrc = joinTree.getJoinSrc();
+
+ List<Operator<? extends Serializable>> parentOps = op.getParentOperators();
+ List<Operator<? extends Serializable>> newParentOps = new ArrayList<Operator<? extends Serializable>>();
+
+ // found a source which is not to be stored in memory
+ if (leftSrc != null) {
+ // assert mapJoinPos == 0;
+ Operator<? extends Serializable> parentOp = parentOps.get(0);
+ assert parentOp.getParentOperators().size() == 1;
+ Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
+
+ grandParentOp.removeChild(parentOp);
+ newParentOps.add(grandParentOp);
+ }
+
+ int pos = 0;
+ // Remove parent reduce-sink operators
+ for (String src : joinTree.getBaseSrc()) {
+ if (src != null) {
+ Operator<? extends Serializable> parentOp = parentOps.get(pos);
+ assert parentOp.getParentOperators().size() == 1;
+ Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
+
+ grandParentOp.removeChild(parentOp);
+
+ newParentOps.add(grandParentOp);
+ }
+ pos++;
+ }
+
+ int keyLength = 0;
+ int outputPos = 0;
+
+ // create the map-join operator
+ for (pos = 0; pos < newParentOps.size(); pos++) {
+ RowResolver inputRS = pGraphContext.getOpParseCtx().get(newParentOps.get(pos)).getRR();
+
+ List<exprNodeDesc> keys = new ArrayList<exprNodeDesc>();
+ List<exprNodeDesc> values = new ArrayList<exprNodeDesc>();
+
+ // Compute join keys and store in reduceKeys
+ Vector<ASTNode> exprs = joinTree.getExpressions().get(pos);
+ for (int i = 0; i < exprs.size(); i++) {
+ ASTNode expr = exprs.get(i);
+ keys.add(SemanticAnalyzer.genExprNodeDesc(expr, inputRS));
+ }
+
+ if (pos == 0)
+ keyLength = keys.size();
+ else
+ assert (keyLength == keys.size());
+
+ keyExprMap.put(new Byte((byte)pos), keys);
+
+ Iterator<String> keysIter = inputRS.getTableNames().iterator();
+ while (keysIter.hasNext())
+ {
+ String key = keysIter.next();
+ HashMap<String, ColumnInfo> rrMap = inputRS.getFieldMap(key);
+ Iterator<String> fNamesIter = rrMap.keySet().iterator();
+ while (fNamesIter.hasNext())
+ {
+ String field = fNamesIter.next();
+ ColumnInfo valueInfo = inputRS.get(key, field);
+ values.add(new exprNodeColumnDesc(valueInfo.getType(), valueInfo.getInternalName()));
+ if (outputRS.get(key, field) == null)
+ outputRS.put(key, field, new ColumnInfo((Integer.valueOf(outputPos++)).toString(),
+ valueInfo.getType()));
+ }
+ }
+
+ valueExprMap.put(new Byte((byte)pos), values);
+ }
+
+ // implicit type conversion hierarchy
+ for (int k = 0; k < keyLength; k++) {
+ // Find the common class for type conversion
+ TypeInfo commonType = keyExprMap.get(new Byte((byte)0)).get(k).getTypeInfo();
+ for (int i=1; i < newParentOps.size(); i++) {
+ TypeInfo a = commonType;
+ TypeInfo b = keyExprMap.get(new Byte((byte)i)).get(k).getTypeInfo();
+ commonType = FunctionRegistry.getCommonClass(a, b);
+ if (commonType == null) {
+ throw new SemanticException("Cannot do equality join on different types: " + a.getTypeName() + " and " + b.getTypeName());
+ }
+ }
+
+ // Add implicit type conversion if necessary
+ for (int i=0; i < newParentOps.size(); i++) {
+ if (!commonType.equals(keyExprMap.get(new Byte((byte)i)).get(k).getTypeInfo())) {
+ keyExprMap.get(new Byte((byte)i)).set(k, TypeCheckProcFactory.DefaultExprProcessor.getFuncExprNodeDesc(commonType.getTypeName(), keyExprMap.get(new Byte((byte)i)).get(k)));
+ }
+ }
+ }
+
+ org.apache.hadoop.hive.ql.plan.joinCond[] joinCondns = new org.apache.hadoop.hive.ql.plan.joinCond[joinTree.getJoinCond().length];
+ for (int i = 0; i < joinTree.getJoinCond().length; i++) {
+ joinCond condn = joinTree.getJoinCond()[i];
+ joinCondns[i] = new org.apache.hadoop.hive.ql.plan.joinCond(condn);
+ }
+
+ Operator[] newPar = new Operator[newParentOps.size()];
+ pos = 0;
+ for (Operator<? extends Serializable> o : newParentOps)
+ newPar[pos++] = o;
+
+ List<exprNodeDesc> keyCols = keyExprMap.get(new Byte((byte)0));
+ StringBuilder keyOrder = new StringBuilder();
+ for (int i=0; i < keyCols.size(); i++) {
+ keyOrder.append("+");
+ }
+
+ tableDesc keyTableDesc =
+ PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromColumnList(keyCols, "mapjoinkey"));
+
+ List<tableDesc> valueTableDescs = new ArrayList<tableDesc>();
+
+ for (pos = 0; pos < newParentOps.size(); pos++) {
+ List<exprNodeDesc> valueCols = valueExprMap.get(new Byte((byte)pos));
+ keyOrder = new StringBuilder();
+ for (int i=0; i < valueCols.size(); i++) {
+ keyOrder.append("+");
+ }
+
+ tableDesc valueTableDesc =
+ PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
+
+ valueTableDescs.add(valueTableDesc);
+ }
+
+ MapJoinOperator mapJoinOp = (MapJoinOperator)putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new mapJoinDesc(keyExprMap, keyTableDesc, valueExprMap, valueTableDescs, mapJoinPos, joinCondns),
+ new RowSchema(outputRS.getColumnInfos()), newPar), outputRS);
+
+ // change the children of the original join operator to point to the map join operator
+ List<Operator<? extends Serializable>> childOps = op.getChildOperators();
+ for (Operator<? extends Serializable> childOp : childOps)
+ childOp.replaceParent(op, mapJoinOp);
+
+ // TODO: do as part of replaceParent
+ mapJoinOp.setChildOperators(childOps);
+ mapJoinOp.setParentOperators(newParentOps);
+ op.setChildOperators(null);
+ op.setParentOperators(null);
+
+ // create a dummy select to select all columns
+ genSelectPlan(pctx, mapJoinOp);
+ }
+
+ private void genSelectPlan(ParseContext pctx, Operator<? extends Serializable> input) {
+ List<Operator<? extends Serializable>> childOps = input.getChildOperators();
+ input.setChildOperators(null);
+
+ // create a dummy select - This select is needed by the walker to split the mapJoin later on
+ RowResolver inputRR = pctx.getOpParseCtx().get(input).getRR();
+ SelectOperator sel =
+ (SelectOperator)putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new selectDesc(true), new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+
+ // Insert the select operator in between.
+ sel.setChildOperators(childOps);
+ for (Operator<? extends Serializable> ch: childOps) {
+ ch.replaceParent(input, sel);
+ }
+ }
+
+ /**
+ * Is it a map-side join.
+ * @param op join operator
+ * @param qbJoin qb join tree
+ * @return -1 if it cannot be converted to a map-side join, position of the map join node otherwise
+ */
+ private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticException {
+ int mapJoinPos = -1;
+ if (joinTree.isMapSideJoin()) {
+ int pos = 0;
+ // In a map-side join, exactly one table is not present in memory.
+ // The client provides the list of tables which can be cached in memory via a hint.
+ if (joinTree.getJoinSrc() != null)
+ mapJoinPos = pos;
+ for (String src : joinTree.getBaseSrc()) {
+ if (src != null) {
+ if (!joinTree.getMapAliases().contains(src)) {
+ if (mapJoinPos >= 0)
+ return -1;
+ mapJoinPos = pos;
+ }
+ }
+ pos++;
+ }
+
+ // All tables are to be cached - this is not possible. In future, we can support this by randomly
+ // leaving some table from the list of tables to be cached
+ if (mapJoinPos == -1)
+ throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_HINT.getMsg(pGraphContext.getQB().getParseInfo().getHints()));
+ }
+
+ return mapJoinPos;
+ }
+
+ /**
+ * Transform the query tree. For each join, check if it is a map-side join (user specified). If yes,
+ * convert it to a map-side join.
+ * @param pactx current parse context
+ */
+ public ParseContext transform(ParseContext pactx) throws SemanticException {
+ this.pGraphContext = pactx;
+
+ // traverse all the joins and convert them if necessary
+ if (pGraphContext.getJoinContext() != null) {
+ Map<JoinOperator, QBJoinTree> joinMap = new HashMap<JoinOperator, QBJoinTree>();
+
+ Set<Map.Entry<JoinOperator, QBJoinTree>> joinCtx = pGraphContext.getJoinContext().entrySet();
+ Iterator<Map.Entry<JoinOperator, QBJoinTree>> joinCtxIter = joinCtx.iterator();
+ while (joinCtxIter.hasNext()) {
+ Map.Entry<JoinOperator, QBJoinTree> joinEntry = joinCtxIter.next();
+ JoinOperator joinOp = joinEntry.getKey();
+ QBJoinTree qbJoin = joinEntry.getValue();
+ int mapJoinPos = mapSideJoin(joinOp, qbJoin);
+ if (mapJoinPos >= 0) {
+ convertMapJoin(pactx, joinOp, qbJoin, mapJoinPos);
+ }
+ else {
+ joinMap.put(joinOp, qbJoin);
+ }
+ }
+
+ // store the new joinContext
+ pGraphContext.setJoinContext(joinMap);
+ }
+
+ return pGraphContext;
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Thu Jun 4 01:21:30 2009
@@ -50,6 +50,7 @@
if (hiveConf.getBoolean("hive.optimize.ppd", false))
transformations.add(new PredicatePushDown());
transformations.add(new UnionProcessor());
+ transformations.add(new MapJoinProcessor());
}
/**
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Thu Jun 4 01:21:30 2009
@@ -507,7 +507,7 @@
prop.setProperty("columns.types", colTypes[1]);
fetchWork fetch = new fetchWork(
- ctx.getResFile(),
+ ctx.getResFile().toString(),
new tableDesc(LazySimpleSerDe.class, TextInputFormat.class, IgnoreKeyTextOutputFormat.class, prop),
-1
);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java Thu Jun 4 01:21:30 2009
@@ -78,6 +78,9 @@
INVALID_INPUT_FORMAT_TYPE("Input Format must implement InputFormat"),
INVALID_OUTPUT_FORMAT_TYPE("Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat"),
NO_VALID_PARTN("The query does not reference any valid partition. To run this query, set hive.mapred.mode=nonstrict"),
+ NO_OUTER_MAPJOIN("Map Join cannot be performed with Outer join"),
+ INVALID_MAPJOIN_HINT("neither table specified as map-table"),
+ INVALID_MAPJOIN_TABLE("result of a union cannot be a map table"),
NON_BUCKETED_TABLE("Sampling Expression Needed for Non-Bucketed Table");
private String mesg;
@@ -110,7 +113,7 @@
return getText((ASTNode)tree.getChild(tree.getChildCount() - 1));
}
- String getMsg(ASTNode tree) {
+ public String getMsg(ASTNode tree) {
return "line " + getLine(tree) + ":" + getCharPositionInLine(tree) + " " + mesg + " " + getText(tree);
}
@@ -126,7 +129,7 @@
return getMsg((ASTNode)tree, reason);
}
- String getMsg() {
+ public String getMsg() {
return mesg;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Thu Jun 4 01:21:30 2009
@@ -125,6 +125,10 @@
TOK_LIMIT;
TOK_TABLEPROPERTY;
TOK_IFNOTEXISTS;
+TOK_HINTLIST;
+TOK_HINT;
+TOK_MAPJOIN;
+TOK_HINTARGLIST;
}
@@ -624,7 +628,49 @@
@init { msgs.push("select list"); }
@after { msgs.pop(); }
:
- selectItem ( COMMA selectItem )* -> selectItem+
+ hintClause? selectItem ( COMMA selectItem )* -> hintClause? selectItem+
+ ;
+
+hintClause
+@init { msgs.push("hint clause"); }
+@after { msgs.pop(); }
+ :
+ DIVIDE STAR PLUS hintList STAR DIVIDE -> ^(TOK_HINTLIST hintList)
+ ;
+
+hintList
+@init { msgs.push("hint list"); }
+@after { msgs.pop(); }
+ :
+ hintItem (COMMA hintItem)* -> hintItem+
+ ;
+
+hintItem
+@init { msgs.push("hint item"); }
+@after { msgs.pop(); }
+ :
+ hintName (LPAREN hintArgs RPAREN)? -> ^(TOK_HINT hintName hintArgs)
+ ;
+
+hintName
+@init { msgs.push("hint name"); }
+@after { msgs.pop(); }
+ :
+ KW_MAPJOIN -> TOK_MAPJOIN
+ ;
+
+hintArgs
+@init { msgs.push("hint arguments"); }
+@after { msgs.pop(); }
+ :
+ hintArgName (COMMA hintArgName)* -> ^(TOK_HINTARGLIST hintArgName+)
+ ;
+
+hintArgName
+@init { msgs.push("hint argument name"); }
+@after { msgs.pop(); }
+ :
+ Identifier
;
selectItem
@@ -1178,6 +1224,7 @@
KW_THEN: 'THEN';
KW_ELSE: 'ELSE';
KW_END: 'END';
+KW_MAPJOIN: 'MAPJOIN';
// Operators
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Thu Jun 4 01:21:30 2009
@@ -21,7 +21,9 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.plan.loadFileDesc;
import org.apache.hadoop.hive.ql.plan.loadTableDesc;
@@ -47,6 +49,7 @@
private HashMap<String, Operator<? extends Serializable>> topOps;
private HashMap<String, Operator<? extends Serializable>> topSelOps;
private HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx;
+ private Map<JoinOperator, QBJoinTree> joinContext;
private List<loadTableDesc> loadTableWork;
private List<loadFileDesc> loadFileWork;
private Context ctx;
@@ -82,6 +85,7 @@
HashMap<String, Operator<? extends Serializable>> topOps,
HashMap<String, Operator<? extends Serializable>> topSelOps,
HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx,
+ Map<JoinOperator, QBJoinTree> joinContext,
List<loadTableDesc> loadTableWork, List<loadFileDesc> loadFileWork,
Context ctx, HashMap<String, String> idToTableNameMap, int destTableId, UnionProcContext uCtx) {
this.conf = conf;
@@ -89,6 +93,7 @@
this.ast = ast;
this.aliasToPruner = aliasToPruner;
this.aliasToSamplePruner = aliasToSamplePruner;
+ this.joinContext = joinContext;
this.loadFileWork = loadFileWork;
this.loadTableWork = loadTableWork;
this.opParseCtx = opParseCtx;
@@ -292,4 +297,18 @@
this.uCtx = uCtx;
}
+ /**
+ * @return the joinContext
+ */
+ public Map<JoinOperator, QBJoinTree> getJoinContext() {
+ return joinContext;
+ }
+
+ /**
+ * @param joinContext the joinContext to set
+ */
+ public void setJoinContext(Map<JoinOperator, QBJoinTree> joinContext) {
+ this.joinContext = joinContext;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionPruner.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionPruner.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionPruner.java Thu Jun 4 01:21:30 2009
@@ -416,13 +416,17 @@
// unknown partitions - may/may not satisfy the partition criteria
private Set<Partition> unknownPartns;
+ // denied partitions - do not satisfy the partition criteria
+ private Set<Partition> deniedPartns;
+
/**
* @param confirmedPartns confirmed paritions
* @param unknownPartns unknown partitions
*/
- public PrunedPartitionList(Set<Partition> confirmedPartns, Set<Partition> unknownPartns) {
+ public PrunedPartitionList(Set<Partition> confirmedPartns, Set<Partition> unknownPartns, Set<Partition> deniedPartns) {
this.confirmedPartns = confirmedPartns;
this.unknownPartns = unknownPartns;
+ this.deniedPartns = deniedPartns;
}
/**
@@ -442,6 +446,14 @@
}
/**
+ * get denied partitions
+ * @return deniedPartns denied paritions
+ */
+ public Set<Partition> getDeniedPartns() {
+ return deniedPartns;
+ }
+
+ /**
* set confirmed partitions
* @param confirmedPartns confirmed paritions
*/
@@ -470,6 +482,7 @@
LinkedHashSet<Partition> true_parts = new LinkedHashSet<Partition>();
LinkedHashSet<Partition> unkn_parts = new LinkedHashSet<Partition>();
+ LinkedHashSet<Partition> denied_parts = new LinkedHashSet<Partition>();
try {
StructObjectInspector rowObjectInspector = (StructObjectInspector)this.tab.getDeserializer().getObjectInspector();
@@ -505,6 +518,10 @@
Boolean r = (Boolean) ((PrimitiveObjectInspector)evaluateResultOI).getPrimitiveJavaObject(evaluateResultO);
LOG.trace("prune result for partition " + partSpec + ": " + r);
if (Boolean.FALSE.equals(r)) {
+ if (denied_parts.isEmpty()) {
+ Partition part = Hive.get().getPartition(tab, partSpec, Boolean.FALSE);
+ denied_parts.add(part);
+ }
LOG.trace("pruned partition: " + partSpec);
} else {
Partition part = Hive.get().getPartition(tab, partSpec, Boolean.FALSE);
@@ -529,7 +546,7 @@
}
// Now return the set of partitions
- return new PrunedPartitionList(true_parts, unkn_parts);
+ return new PrunedPartitionList(true_parts, unkn_parts, denied_parts);
}
public Table getTable() {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java Thu Jun 4 01:21:30 2009
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.Vector;
+import java.util.List;
/**
* Internal representation of the join tree
@@ -40,6 +41,10 @@
// filters
private Vector<Vector<ASTNode>> filters;
+
+ // user asked for map-side join
+ private boolean mapSideJoin;
+ private List<String> mapAliases;
/**
* constructor
@@ -140,6 +145,33 @@
this.filters = filters;
}
+ /**
+ * @return the mapSidejoin
+ */
+ public boolean isMapSideJoin() {
+ return mapSideJoin;
+ }
+
+ /**
+ * @param mapSidejoin the mapSidejoin to set
+ */
+ public void setMapSideJoin(boolean mapSideJoin) {
+ this.mapSideJoin = mapSideJoin;
+ }
+
+ /**
+ * @return the mapAliases
+ */
+ public List<String> getMapAliases() {
+ return mapAliases;
+ }
+
+ /**
+ * @param mapAliases the mapAliases to set
+ */
+ public void setMapAliases(List<String> mapAliases) {
+ this.mapAliases = mapAliases;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Thu Jun 4 01:21:30 2009
@@ -34,6 +34,7 @@
private boolean isSubQ;
private String alias;
private ASTNode joinExpr;
+ private ASTNode hints;
private HashMap<String, ASTNode> aliasToSrc;
private HashMap<String, ASTNode> nameToDest;
private HashMap<String, TableSample> nameToSample;
@@ -331,5 +332,12 @@
return true;
}
-
+
+ public void setHints(ASTNode hint) {
+ this.hints = hint;
+ }
+
+ public ASTNode getHints() {
+ return hints;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jun 4 01:21:30 2009
@@ -72,6 +72,7 @@
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
@@ -84,6 +85,7 @@
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
+import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink4;
import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -141,6 +143,7 @@
private HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx;
private List<loadTableDesc> loadTableWork;
private List<loadFileDesc> loadFileWork;
+ private Map<JoinOperator, QBJoinTree> joinContext;
private QB qb;
private ASTNode ast;
private int destTableId;
@@ -171,6 +174,7 @@
this.loadTableWork = new ArrayList<loadTableDesc>();
this.loadFileWork = new ArrayList<loadFileDesc>();
opParseCtx = new HashMap<Operator<? extends Serializable>, OpParseContext>();
+ joinContext = new HashMap<JoinOperator, QBJoinTree>();
this.destTableId = 1;
this.uCtx = null;
@@ -192,6 +196,9 @@
qb = null;
ast = null;
uCtx = null;
+ this.aliasToSamplePruner.clear();
+ this.joinContext.clear();
+ this.opParseCtx.clear();
}
public void init(ParseContext pctx) {
@@ -202,15 +209,17 @@
opParseCtx = pctx.getOpParseCtx();
loadTableWork = pctx.getLoadTableWork();
loadFileWork = pctx.getLoadFileWork();
+ joinContext = pctx.getJoinContext();
ctx = pctx.getContext();
destTableId = pctx.getDestTableId();
idToTableNameMap = pctx.getIdToTableNameMap();
this.uCtx = pctx.getUCtx();
+ qb = pctx.getQB();
}
public ParseContext getParseContext() {
return new ParseContext(conf, qb, ast, aliasToPruner, aliasToSamplePruner, topOps,
- topSelOps, opParseCtx, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx);
+ topSelOps, opParseCtx, joinContext, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx);
}
@SuppressWarnings("nls")
@@ -424,6 +433,10 @@
case HiveParser.TOK_SELECT:
qb.countSel();
qbp.setSelExprForClause(ctx_1.dest, ast);
+
+ if (((ASTNode)ast.getChild(0)).getToken().getType() == HiveParser.TOK_HINTLIST)
+ qbp.setHints((ASTNode)ast.getChild(0));
+
LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast);
qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
qbp.setDistinctFuncExprForClause(ctx_1.dest,
@@ -1220,11 +1233,16 @@
RowResolver inputRR = opParseCtx.get(input).getRR();
// SELECT * or SELECT TRANSFORM(*)
boolean selectStar = false;
+ int posn = 0;
+ boolean hintPresent = (selExprList.getChild(0).getType() == HiveParser.TOK_HINTLIST);
+ if (hintPresent) {
+ posn++;
+ }
- boolean isInTransform = (selExprList.getChild(0).getChild(0).getType()
+ boolean isInTransform = (selExprList.getChild(posn).getChild(0).getType()
== HiveParser.TOK_TRANSFORM);
if (isInTransform) {
- trfm = (ASTNode) selExprList.getChild(0).getChild(0);
+ trfm = (ASTNode) selExprList.getChild(posn).getChild(0);
}
// The list of expressions after SELECT or SELECT TRANSFORM.
@@ -1232,7 +1250,7 @@
LOG.debug("genSelectPlan: input = " + inputRR.toString());
// Iterate over all expression (either after SELECT, or in SELECT TRANSFORM)
- for (int i = 0; i < exprList.getChildCount(); ++i) {
+ for (int i = posn; i < exprList.getChildCount(); ++i) {
// child can be EXPR AS ALIAS, or EXPR.
ASTNode child = (ASTNode) exprList.getChild(i);
@@ -1294,8 +1312,7 @@
pos = Integer.valueOf(pos.intValue() + 1);
}
}
- selectStar = selectStar && exprList.getChildCount() == 1;
-
+ selectStar = selectStar && exprList.getChildCount() == posn + 1;
Map<String, exprNodeDesc> colExprMap = new HashMap<String, exprNodeDesc>();
for (int i=0; i<col_list.size(); i++) {
@@ -2553,7 +2570,7 @@
int pos = 0;
int outputPos = 0;
- HashMap<Byte, ArrayList<exprNodeDesc>> exprMap = new HashMap<Byte, ArrayList<exprNodeDesc>>();
+ HashMap<Byte, List<exprNodeDesc>> exprMap = new HashMap<Byte, List<exprNodeDesc>>();
Map<String, exprNodeDesc> colExprMap = new HashMap<String, exprNodeDesc>();
HashMap<Integer, Set<String>> posToAliasMap = new HashMap<Integer, Set<String>>();
for (Operator input : right)
@@ -2694,7 +2711,9 @@
// Type checking and implicit type conversion for join keys
genJoinOperatorTypeCheck(joinSrcOp, srcOps);
- return genJoinOperatorChildren(joinTree, joinSrcOp, srcOps);
+ JoinOperator joinOp = (JoinOperator)genJoinOperatorChildren(joinTree, joinSrcOp, srcOps);
+ joinContext.put(joinOp, joinTree);
+ return joinOp;
}
private void genJoinOperatorTypeCheck(Operator left, Operator[] right) throws SemanticException {
@@ -2735,6 +2754,7 @@
for (int i=0; i<right.length; i++) {
Operator oi = (i==0 && right[i] == null ? left : right[i]);
reduceSinkDesc now = ((ReduceSinkOperator)(oi)).getConf();
+
now.setKeySerializeInfo(
PlanUtils.getBinarySortableTableDesc(
PlanUtils.getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey"),
@@ -2772,8 +2792,29 @@
pos++;
}
}
+
+ private List<String> getMapSideJoinTables(QB qb) {
+ List<String> cols = null;
+ ASTNode hints = qb.getParseInfo().getHints();
+ for (int pos = 0; pos < hints.getChildCount(); pos++) {
+ ASTNode hint = (ASTNode)hints.getChild(pos);
+ if (((ASTNode)hint.getChild(0)).getToken().getType() == HiveParser.TOK_MAPJOIN) {
+ ASTNode hintTblNames = (ASTNode)hint.getChild(1);
+ int numCh = hintTblNames.getChildCount();
+ for (int tblPos = 0; tblPos < numCh; tblPos++) {
+ String tblName = ((ASTNode)hintTblNames.getChild(tblPos)).getText();
+ if (cols == null)
+ cols = new ArrayList<String>();
+ if (!cols.contains(tblName))
+ cols.add(tblName);
+ }
+ }
+ }
+
+ return cols;
+ }
- private QBJoinTree genJoinTree(ASTNode joinParseTree)
+ private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree)
throws SemanticException {
QBJoinTree joinTree = new QBJoinTree();
joinCond[] condn = new joinCond[1];
@@ -2818,7 +2859,7 @@
joinTree.setBaseSrc(children);
}
else if (isJoinToken(left)) {
- QBJoinTree leftTree = genJoinTree(left);
+ QBJoinTree leftTree = genJoinTree(qb, left);
joinTree.setJoinSrc(leftTree);
String[] leftChildAliases = leftTree.getLeftAliases();
String leftAliases[] = new String[leftChildAliases.length + 1];
@@ -2861,6 +2902,35 @@
if (leftSrc.size() == 1)
joinTree.setLeftAlias(leftSrc.get(0));
+ // check the hints to see if the user has specified a map-side join. This will be removed later on, once the cost-based
+ // infrastructure is in place
+ if (qb.getParseInfo().getHints() != null) {
+ List<String> mapSideTables = getMapSideJoinTables(qb);
+ List<String> mapAliases = joinTree.getMapAliases();
+
+ for (String mapTbl : mapSideTables) {
+ boolean mapTable = false;
+ for (String leftAlias : joinTree.getLeftAliases()) {
+ if (mapTbl.equals(leftAlias))
+ mapTable = true;
+ }
+ for (String rightAlias : joinTree.getRightAliases()) {
+ if (mapTbl.equals(rightAlias))
+ mapTable = true;
+ }
+
+ if (mapTable) {
+ if (mapAliases == null) {
+ mapAliases = new ArrayList<String>();
+ }
+ mapAliases.add(mapTbl);
+ joinTree.setMapSideJoin(true);
+ }
+ }
+
+ joinTree.setMapAliases(mapAliases);
+ }
+
return joinTree;
}
@@ -2930,6 +3000,14 @@
}
target.setJoinCond(newCondns);
+ if (target.isMapSideJoin()) {
+ assert node.isMapSideJoin();
+ List<String> mapAliases = target.getMapAliases();
+ for (String mapTbl : node.getMapAliases())
+ if (!mapAliases.contains(mapTbl))
+ mapAliases.add(mapTbl);
+ target.setMapAliases(mapAliases);
+ }
}
private int findMergePos(QBJoinTree node, QBJoinTree target) {
@@ -3447,7 +3525,7 @@
// process join
if (qb.getParseInfo().getJoinExpr() != null) {
ASTNode joinExpr = qb.getParseInfo().getJoinExpr();
- QBJoinTree joinTree = genJoinTree(joinExpr);
+ QBJoinTree joinTree = genJoinTree(qb, joinExpr);
qb.setQbJoinTree(joinTree);
mergeJoinTree(qb);
@@ -3507,7 +3585,7 @@
Table tab = ((Map.Entry<String, Table>)iter.next()).getValue();
if (!tab.isPartitioned()) {
if (qbParseInfo.getDestToWhereExpr().isEmpty())
- fetch = new fetchWork(tab.getPath(), Utilities.getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit());
+ fetch = new fetchWork(tab.getPath().toString(), Utilities.getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit());
inputs.add(new ReadEntity(tab));
}
else {
@@ -3515,7 +3593,7 @@
Iterator<Map.Entry<String, PartitionPruner>> iterP = aliasToPruner.entrySet().iterator();
PartitionPruner pr = ((Map.Entry<String, PartitionPruner>)iterP.next()).getValue();
if (pr.onlyContainsPartitionCols()) {
- List<Path> listP = new ArrayList<Path>();
+ List<String> listP = new ArrayList<String>();
List<partitionDesc> partP = new ArrayList<partitionDesc>();
PartitionPruner.PrunedPartitionList partsList = null;
Set<Partition> parts = null;
@@ -3527,7 +3605,7 @@
Iterator<Partition> iterParts = parts.iterator();
while (iterParts.hasNext()) {
Partition part = iterParts.next();
- listP.add(part.getPartitionPath());
+ listP.add(part.getPartitionPath().toString());
partP.add(Utilities.getPartitionDesc(part));
inputs.add(new ReadEntity(part));
}
@@ -3554,7 +3632,7 @@
throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
String cols = loadFileWork.get(0).getColumns();
- fetch = new fetchWork(new Path(loadFileWork.get(0).getSourceDir()),
+ fetch = new fetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
new tableDesc(LazySimpleSerDe.class, TextInputFormat.class,
IgnoreKeyTextOutputFormat.class,
Utilities.makeProperties(
@@ -3590,6 +3668,12 @@
opRules.put(new RuleRegExp(new String("R4"), "FS%"), new GenMRFileSink1());
opRules.put(new RuleRegExp(new String("R5"), "UNION%"), new GenMRUnion1());
opRules.put(new RuleRegExp(new String("R6"), "UNION%.*RS%"), new GenMRRedSink3());
+ opRules.put(new RuleRegExp(new String("R6"), "MAPJOIN%.*RS%"), new GenMRRedSink4());
+ opRules.put(new RuleRegExp(new String("R7"), "TS%.*MAPJOIN%"), MapJoinFactory.getTableScanMapJoin());
+ opRules.put(new RuleRegExp(new String("R8"), "RS%.*MAPJOIN%"), MapJoinFactory.getReduceSinkMapJoin());
+ opRules.put(new RuleRegExp(new String("R9"), "UNION%.*MAPJOIN%"), MapJoinFactory.getUnionMapJoin());
+ opRules.put(new RuleRegExp(new String("R10"), "MAPJOIN%.*MAPJOIN%"), MapJoinFactory.getMapJoinMapJoin());
+ opRules.put(new RuleRegExp(new String("R11"), "MAPJOIN%SEL%"), MapJoinFactory.getMapJoin());
// The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, procCtx);
@@ -3616,8 +3700,9 @@
if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) {
HashMap<String, Operator<? extends Serializable>> opMap = ((mapredWork)task.getWork()).getAliasToWork();
if (!opMap.isEmpty())
- for (Operator<? extends Serializable> op: opMap.values())
+ for (Operator<? extends Serializable> op: opMap.values()) {
breakOperatorTree(op);
+ }
}
if (task.getChildTasks() == null)
@@ -3687,7 +3772,7 @@
ParseContext pCtx = new ParseContext(conf, qb, ast, aliasToPruner, aliasToSamplePruner, topOps,
- topSelOps, opParseCtx, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx);
+ topSelOps, opParseCtx, joinContext, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx);
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
@@ -3723,7 +3808,7 @@
* @throws SemanticException
*/
@SuppressWarnings("nls")
- private exprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input)
+ public static exprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input)
throws SemanticException {
// We recursively create the exprNodeDesc. Base cases: when we encounter
// a column ref, we convert that into an exprNodeColumnDesc; when we encounter
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Thu Jun 4 01:21:30 2009
@@ -53,7 +53,7 @@
new tableDesc(),
new ArrayList<tableDesc> (),
null,
- Integer.valueOf (1));
+ Integer.valueOf (1), null);
}
/**
@@ -155,7 +155,7 @@
/**
* Convert the ColumnList to FieldSchema list.
*/
- public static List<FieldSchema> getFieldSchemasFromColumnList(ArrayList<exprNodeDesc> cols,
+ public static List<FieldSchema> getFieldSchemasFromColumnList(List<exprNodeDesc> cols,
String fieldPrefix) {
List<FieldSchema> schemas = new ArrayList<FieldSchema>(cols.size());
for (int i=0; i<cols.size(); i++) {
@@ -205,9 +205,9 @@
int numReducers) {
return new reduceSinkDesc(keyCols, valueCols, tag, partitionCols, numReducers,
- getBinarySortableTableDesc(getFieldSchemasFromColumnList(keyCols, "reducesinkkey"), order),
- // Revert to DynamicSerDe: getBinaryTableDesc(getFieldSchemasFromColumnList(valueCols, "reducesinkvalue")));
- getLazySimpleSerDeTableDesc(getFieldSchemasFromColumnList(valueCols, "reducesinkvalue")));
+ getBinarySortableTableDesc(getFieldSchemasFromColumnList(keyCols, "reducesinkkey"), order),
+ // Revert to DynamicSerDe: getBinaryTableDesc(getFieldSchemasFromColumnList(valueCols, "reducesinkvalue")));
+ getLazySimpleSerDeTableDesc(getFieldSchemasFromColumnList(valueCols, "reducesinkvalue")));
}
/**