You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/06/04 03:21:35 UTC

svn commit: r781633 [3/13] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ data/scripts/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ...

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java?rev=781633&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java Thu Jun  4 01:21:30 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Stack;
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+/**
+ * Processor for the rule - map join followed by reduce sink
+ */
+public class GenMRRedSink4 implements NodeProcessor {
+
+  public GenMRRedSink4() {
+  }
+
+  /**
+   * Reduce Scan encountered 
+   * @param nd the reduce sink operator encountered
+   * @param opProcCtx context
+   */
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
+    ReduceSinkOperator op = (ReduceSinkOperator)nd;
+    GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
+
+    ParseContext parseCtx = ctx.getParseCtx();
+
+    // map-join consisted on a bunch of map-only jobs, and it has been split after the mapjoin
+    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+    GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
+    Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+    mapredWork plan = (mapredWork) currTask.getWork();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+    Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+    
+    ctx.setCurrTask(currTask);
+
+    // If the plan for this reducer does not exist, initialize the plan
+    if (opMapTask == null) {
+      // When the reducer is encountered for the first time
+      if (plan.getReducer() == null)
+        GenMapRedUtils.initMapJoinPlan(op, ctx, true, false, true, -1);
+      // When mapjoin is followed by a multi-table insert
+      else
+        GenMapRedUtils.splitPlan(op, ctx);
+    }
+    // There is a join after mapjoin. One of the branches of mapjoin has already been initialized.
+    // Initialize the current branch, and join with the original plan.
+    else {
+      assert plan.getReducer() != reducer;
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true, false);
+    }
+
+    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+    
+    // the mapjoin operator has been processed
+    ctx.setCurrMapJoinOp(null);
+    return null;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Thu Jun  4 01:21:30 2009
@@ -97,7 +97,6 @@
     GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
     Task<? extends Serializable> uTask = null;
 
-    pos = UnionProcFactory.getPositionParent(union, stack);
     Operator<? extends Serializable> parent = union.getParentOperators().get(pos);   
     mapredWork uPlan = null;
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Thu Jun  4 01:21:30 2009
@@ -18,46 +18,46 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Set;
-import java.util.Stack;
 import java.io.Serializable;
-import java.io.File;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.mapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.plan.partitionDesc;
 import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.tableScanDesc;
 import org.apache.hadoop.hive.ql.metadata.*;
 import org.apache.hadoop.hive.ql.parse.*;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * General utility common functions for the Processor to convert operator into map-reduce tasks
@@ -111,6 +111,91 @@
   }
 
   /**
+   * Initialize the current plan by adding it to root tasks
+   * @param op the map join operator encountered
+   * @param opProcCtx processing context
+   * @param pos position of the parent
+   */
+  public static void initMapJoinPlan(Operator<? extends Serializable> op, GenMRProcContext opProcCtx, boolean readInputMapJoin, boolean readInputUnion,
+      boolean setReducer, int pos) 
+    throws SemanticException {
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+    assert (((pos == -1) && (readInputMapJoin)) || (pos != -1));
+    int parentPos = (pos == -1) ? 0 : pos;
+    GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(parentPos));
+    Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+    mapredWork plan = (mapredWork) currTask.getWork();
+    HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+    Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
+
+    // The mapjoin has already been encountered. Some context must be stored about that
+    if (readInputMapJoin) {
+      MapJoinOperator currMapJoinOp = opProcCtx.getCurrMapJoinOp();
+      assert currMapJoinOp != null;
+      boolean local = ((pos == -1) || (pos == ((mapJoinDesc)currMapJoinOp.getConf()).getPosBigTable())) ? false : true;
+
+      if (setReducer) {
+        Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
+        plan.setReducer(reducer);
+        opTaskMap.put(reducer, currTask);      
+        if (reducer.getClass() == JoinOperator.class)
+          plan.setNeedsTagging(true);
+        reduceSinkDesc desc = (reduceSinkDesc)op.getConf();      
+        plan.setNumReduceTasks(desc.getNumReducers());
+      }
+      else 
+        opTaskMap.put(op, currTask);
+
+      if (!readInputUnion) {
+        GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(currMapJoinOp);
+        String taskTmpDir;
+        tableDesc tt_desc; 
+        Operator<? extends Serializable> rootOp;
+
+        if (mjCtx.getOldMapJoin() == null) {
+          taskTmpDir = mjCtx.getTaskTmpDir();
+          tt_desc = mjCtx.getTTDesc(); 
+          rootOp = mjCtx.getRootMapJoinOp();
+        }
+        else {
+          GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(mjCtx.getOldMapJoin());
+          taskTmpDir = oldMjCtx.getTaskTmpDir();
+          tt_desc = oldMjCtx.getTTDesc(); 
+          rootOp = oldMjCtx.getRootMapJoinOp();
+        }
+      
+        setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
+      }
+      else {
+        initUnionPlan(opProcCtx, currTask, false);
+      }
+        
+      opProcCtx.setCurrMapJoinOp(null);
+    }
+    else {
+      mapJoinDesc desc = (mapJoinDesc)op.getConf();
+
+      // The map is overloaded to keep track of mapjoins also
+      opTaskMap.put(op, currTask);
+      
+      List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
+      rootTasks.add(currTask);
+      
+      assert currTopOp != null;
+      List<Operator<? extends Serializable>> seenOps = opProcCtx.getSeenOps();
+      String currAliasId = opProcCtx.getCurrAliasId();
+      
+      seenOps.add(currTopOp);
+      boolean local = (pos == desc.getPosBigTable()) ? false : true;
+      setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
+    }
+
+    opProcCtx.setCurrTask(currTask);
+    opProcCtx.setCurrTopOp(null);
+    opProcCtx.setCurrAliasId(null);
+  }
+
+  /**
    * Initialize the current union plan.
    * 
    * @param op the reduce sink operator encountered
@@ -130,20 +215,17 @@
     
     plan.setNumReduceTasks(desc.getNumReducers());
 
-    List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
-
-    //    rootTasks.add(currTask);
     if (reducer.getClass() == JoinOperator.class)
       plan.setNeedsTagging(true);
 
-    initUnionPlan(opProcCtx, currTask); 
+    initUnionPlan(opProcCtx, currTask, false); 
   }
 
   /*
    * It is a idempotent function to add various intermediate files as the source for the
    * union. The plan has already been created.
    */
-  public static void initUnionPlan(GenMRProcContext opProcCtx, Task<? extends Serializable> currTask) {
+  public static void initUnionPlan(GenMRProcContext opProcCtx, Task<? extends Serializable> currTask, boolean local) {
     mapredWork plan = (mapredWork) currTask.getWork();
     UnionOperator currUnionOp = opProcCtx.getCurrUnionOp();
     assert currUnionOp != null;
@@ -155,9 +237,10 @@
     assert !taskTmpDirLst.isEmpty() && !tt_descLst.isEmpty();
     assert taskTmpDirLst.size() == tt_descLst.size();
     int size = taskTmpDirLst.size();
-
+    assert local == false;
+    
     for (int pos = 0; pos < size; pos++) {
-      String taskTmpDir = taskTmpDirLst.get(pos);
+      String taskTmpDir = taskTmpDirLst.get(pos); 
       tableDesc tt_desc = tt_descLst.get(pos); 
       if (plan.getPathToAliases().get(taskTmpDir) == null) {
         plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
@@ -170,20 +253,38 @@
 
   /**
    * Merge the current task with the task for the current reducer
-   * @param task for the old task for the current reducer
+   * @param op operator being processed
+   * @param oldTask the old task for the current reducer
+   * @param task the current task for the current reducer
    * @param opProcCtx processing context
+   * @param pos position of the parent in the stack
    */
-  public static void joinPlan(ReduceSinkOperator op,
+  public static void joinPlan(Operator<? extends Serializable> op,
                               Task<? extends Serializable> oldTask, 
                               Task<? extends Serializable> task, 
-                              GenMRProcContext opProcCtx) throws SemanticException {
+                              GenMRProcContext opProcCtx, 
+                              int pos, boolean split,
+                              boolean readMapJoinData, 
+                              boolean readUnionData) throws SemanticException {
     Task<? extends Serializable> currTask = task;
     mapredWork plan = (mapredWork) currTask.getWork();
     Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
-
+    List<Task<? extends Serializable>> parTasks = null;
+      
     // terminate the old task and make current task dependent on it
-    if (oldTask != null) {
-      splitTasks(op, oldTask, currTask, opProcCtx);
+    if (split) {
+      assert oldTask != null;
+      splitTasks((ReduceSinkOperator)op, oldTask, currTask, opProcCtx, true, false, 0);
+    } 
+    else {
+      if ((oldTask != null) && (oldTask.getParentTasks() != null) && !oldTask.getParentTasks().isEmpty()) {
+        parTasks = new ArrayList<Task<? extends Serializable>>();
+        parTasks.addAll(oldTask.getParentTasks());
+        
+        Object[] parTaskArr = parTasks.toArray();
+        for (int i = 0; i < parTaskArr.length; i++)
+          ((Task<? extends Serializable>)parTaskArr[i]).removeDependentTask(oldTask);
+      }
     }
 
     if (currTopOp != null) {
@@ -192,12 +293,55 @@
       
       if (!seenOps.contains(currTopOp)) {
         seenOps.add(currTopOp);
-        setTaskPlan(currAliasId, currTopOp, plan, false, opProcCtx);
+        boolean local = false;
+        if (pos != -1)
+          local = (pos == ((mapJoinDesc)op.getConf()).getPosBigTable()) ? false : true;
+        setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
       }
       currTopOp = null;
       opProcCtx.setCurrTopOp(currTopOp);
     }
-
+    else if (opProcCtx.getCurrMapJoinOp() != null) {
+      MapJoinOperator mjOp  = opProcCtx.getCurrMapJoinOp();
+      if (readUnionData) {
+        initUnionPlan(opProcCtx, currTask, false);
+      }
+      else {
+        GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
+      
+        // In case of map-join followed by map-join, the file needs to be obtained from the old map join
+        MapJoinOperator oldMapJoin = mjCtx.getOldMapJoin();
+        String          taskTmpDir = null;
+        tableDesc       tt_desc    = null; 
+        Operator<? extends Serializable> rootOp = null;
+      
+        if (oldMapJoin == null) {
+          taskTmpDir = mjCtx.getTaskTmpDir();
+          tt_desc    = mjCtx.getTTDesc();
+          rootOp     = mjCtx.getRootMapJoinOp();
+        }
+        else {
+          GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(oldMapJoin);
+          assert oldMjCtx != null;
+          taskTmpDir = oldMjCtx.getTaskTmpDir();
+          tt_desc    = oldMjCtx.getTTDesc();
+          rootOp     = oldMjCtx.getRootMapJoinOp();
+        }
+      
+        boolean local = ((pos == -1) || (pos == ((mapJoinDesc)mjOp.getConf()).getPosBigTable())) ? false : true;
+        setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
+      }
+      opProcCtx.setCurrMapJoinOp(null);
+      
+      if ((oldTask != null) && (parTasks != null)) {
+        for (Task<? extends Serializable> parTask : parTasks) 
+          parTask.addDependentTask(currTask);
+      }
+      
+      if (opProcCtx.getRootTasks().contains(currTask))
+        opProcCtx.getRootTasks().remove(currTask);
+    }
+    
     opProcCtx.setCurrTask(currTask);
   }
 
@@ -224,7 +368,7 @@
     opTaskMap.put(reducer, redTask);
     Task<? extends Serializable> currTask    = opProcCtx.getCurrTask();
 
-    splitTasks(op, currTask, redTask, opProcCtx);
+    splitTasks(op, currTask, redTask, opProcCtx, true, false, 0);
     opProcCtx.getRootOps().add(op);
   }
 
@@ -242,58 +386,144 @@
     ParseContext parseCtx = opProcCtx.getParseCtx();
     Set<ReadEntity> inputs = opProcCtx.getInputs();
 
-    if (!local) {
-      // Generate the map work for this alias_id
-      PartitionPruner pruner = parseCtx.getAliasToPruner().get(alias_id);
-      Set<Partition> parts = null;
-      try {
-        // pass both confirmed and unknown partitions through the map-reduce framework
-        PartitionPruner.PrunedPartitionList partsList = pruner.prune();
-
-        parts = partsList.getConfirmedPartns();
-        parts.addAll(partsList.getUnknownPartns());
-      } catch (HiveException e) {
-        // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
-        LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
-        throw new SemanticException(e.getMessage(), e);
-      }
-      SamplePruner samplePruner = parseCtx.getAliasToSamplePruner().get(alias_id);
-      
-      for (Partition part : parts) {
-        if (part.getTable().isPartitioned())
-          inputs.add(new ReadEntity(part));
-        else
-          inputs.add(new ReadEntity(part.getTable()));
-        
-        // Later the properties have to come from the partition as opposed
-        // to from the table in order to support versioning.
-        Path paths[];
-        if (samplePruner != null) {
-          paths = samplePruner.prune(part);
-        }
-        else {
-          paths = part.getPath();
-        }
+    ArrayList<Path> partDir = new ArrayList<Path>();
+    ArrayList<partitionDesc> partDesc = new ArrayList<partitionDesc>();
+
+    Path       tblDir  = null;
+    tableDesc  tblDesc = null;
+
+    // Generate the map work for this alias_id
+    PartitionPruner pruner = parseCtx.getAliasToPruner().get(alias_id);
+    Set<Partition> parts = null;
+    try {
+      // pass both confirmed and unknown partitions through the map-reduce framework
+      PartitionPruner.PrunedPartitionList partsList = pruner.prune();
+      
+      parts = partsList.getConfirmedPartns();
+      parts.addAll(partsList.getUnknownPartns());
+      partitionDesc aliasPartnDesc = null;
+      if (parts.isEmpty()) {
+        if (!partsList.getDeniedPartns().isEmpty())
+          aliasPartnDesc = Utilities.getPartitionDesc(partsList.getDeniedPartns().iterator().next());
+      }
+      else {
+        aliasPartnDesc = Utilities.getPartitionDesc(parts.iterator().next());
+      }
+      plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
+    } catch (HiveException e) {
+      // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
+      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      throw new SemanticException(e.getMessage(), e);
+    }
+    SamplePruner samplePruner = parseCtx.getAliasToSamplePruner().get(alias_id);
+    
+    for (Partition part : parts) {
+      if (part.getTable().isPartitioned())
+        inputs.add(new ReadEntity(part));
+      else
+        inputs.add(new ReadEntity(part.getTable()));
+
+      // Later the properties have to come from the partition as opposed
+      // to from the table in order to support versioning.
+      Path paths[];
+      if (samplePruner != null) {
+        paths = samplePruner.prune(part);
+      }
+      else {
+        paths = part.getPath();
+      }
+      
+      // is it a partitioned table ?
+      if (!part.getTable().isPartitioned()) {
+        assert ((tblDir == null) && (tblDesc == null));
+
+        tblDir = paths[0];
+        tblDesc = Utilities.getTableDesc(part.getTable());
+      }
+
+      for (Path p: paths) {
+        String path = p.toString();
+        LOG.debug("Adding " + path + " of table" + alias_id);
         
-        for (Path p: paths) {
-          String path = p.toString();
-          LOG.debug("Adding " + path + " of table" + alias_id);
-          // Add the path to alias mapping
-          if (plan.getPathToAliases().get(path) == null) {
-            plan.getPathToAliases().put(path, new ArrayList<String>());
-          }
-          plan.getPathToAliases().get(path).add(alias_id);
-          plan.getPathToPartitionInfo().put(path, Utilities.getPartitionDesc(part));
-          LOG.debug("Information added for path " + path);
+        partDir.add(p);
+        partDesc.add(Utilities.getPartitionDesc(part));
+      }
+    }
+    
+    Iterator<Path>          iterPath      = partDir.iterator();
+    Iterator<partitionDesc> iterPartnDesc = partDesc.iterator();
+
+    if (!local) {
+      while (iterPath.hasNext()) {
+        assert iterPartnDesc.hasNext();
+        String path = iterPath.next().toString();
+
+        partitionDesc prtDesc = iterPartnDesc.next();
+
+        // Add the path to alias mapping
+        if (plan.getPathToAliases().get(path) == null) {
+          plan.getPathToAliases().put(path, new ArrayList<String>());
         }
+        plan.getPathToAliases().get(path).add(alias_id);
+        plan.getPathToPartitionInfo().put(path, prtDesc);
+        LOG.debug("Information added for path " + path);
       }
+
+      assert plan.getAliasToWork().get(alias_id) == null;
       plan.getAliasToWork().put(alias_id, topOp);
-      LOG.debug("Created Map Work for " + alias_id);
     }
     else {
-      FileSinkOperator fOp = (FileSinkOperator) topOp;
-      fileSinkDesc fConf = (fileSinkDesc)fOp.getConf();
       // populate local work if needed
+      mapredLocalWork localPlan = plan.getMapLocalWork();
+      if (localPlan == null)
+        localPlan = new mapredLocalWork(
+            new LinkedHashMap<String, Operator<? extends Serializable>>(),
+            new LinkedHashMap<String, fetchWork>());
+
+      assert localPlan.getAliasToWork().get(alias_id) == null;
+      assert localPlan.getAliasToFetchWork().get(alias_id) == null;
+      localPlan.getAliasToWork().put(alias_id, topOp);
+      if (tblDir == null)
+        localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(fetchWork.convertPathToStringArray(partDir), partDesc));
+      else
+        localPlan.getAliasToFetchWork().put(alias_id, new fetchWork(tblDir.toString(), tblDesc));
+      plan.setMapLocalWork(localPlan);
+    }
+  }
+
+
+  /**
+   * set the current task in the mapredWork
+   * @param alias    current alias
+   * @param topOp    the top operator of the stack
+   * @param plan     current plan
+   * @param local    whether you need to add to map-reduce or local work
+   * @param tt_desc  table descriptor
+   */
+  public static void setTaskPlan(String path, String alias, Operator<? extends Serializable> topOp,
+                                 mapredWork plan, boolean local, tableDesc tt_desc) 
+    throws SemanticException {
+
+    if (!local) {
+      if (plan.getPathToAliases().get(path) == null)
+        plan.getPathToAliases().put(path, new ArrayList<String>());
+      plan.getPathToAliases().get(path).add(alias);
+      plan.getPathToPartitionInfo().put(path, new partitionDesc(tt_desc, null));
+      plan.getAliasToWork().put(alias, topOp);
+    }
+    else {
+      // populate local work if needed
+      mapredLocalWork localPlan = plan.getMapLocalWork();
+      if (localPlan == null)
+        localPlan = new mapredLocalWork(
+                                        new LinkedHashMap<String, Operator<? extends Serializable>>(),
+                                        new LinkedHashMap<String, fetchWork>());
+      
+      assert localPlan.getAliasToWork().get(alias) == null;
+      assert localPlan.getAliasToFetchWork().get(alias) == null;
+      localPlan.getAliasToWork().put(alias, topOp);
+      localPlan.getAliasToFetchWork().put(alias, new fetchWork(alias, tt_desc));
+      plan.setMapLocalWork(localPlan);
     }
   }
 
@@ -360,11 +590,14 @@
    * @param oldTask the parent task
    * @param task the child task
    * @param opProcCtx context
+   * @param setReducer does the reducer needs to be set
+   * @param pos position of the parent
    **/
-  private static void splitTasks(ReduceSinkOperator op,
+  public static void splitTasks(Operator<? extends Serializable> op,
                                  Task<? extends Serializable> parentTask, 
                                  Task<? extends Serializable> childTask, 
-                                 GenMRProcContext opProcCtx) throws SemanticException {
+                                 GenMRProcContext opProcCtx, boolean setReducer,
+                                 boolean local, int posn) throws SemanticException {
     mapredWork plan = (mapredWork) childTask.getWork();
     Operator<? extends Serializable> currTopOp = opProcCtx.getCurrTopOp();
     
@@ -380,7 +613,7 @@
     Context baseCtx = parseCtx.getContext();
     String taskTmpDir = baseCtx.getMRTmpFileURI();
     
-    Operator<? extends Serializable> parent = op.getParentOperators().get(0);
+    Operator<? extends Serializable> parent = op.getParentOperators().get(posn);
     tableDesc tt_desc = 
       PlanUtils.getBinaryTableDesc(PlanUtils.getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol")); 
     
@@ -403,37 +636,59 @@
     List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
     parentOpList.add(parent);
     fs_op.setParentOperators(parentOpList);
+
+    // create a dummy tableScan operator on top of op
+    Operator<? extends Serializable> ts_op = 
+      putOpInsertMap(OperatorFactory.get(tableScanDesc.class, parent.getSchema()), null, parseCtx);
+    
+    childOpList = new ArrayList<Operator<? extends Serializable>>();
+    childOpList.add(op);
+    ts_op.setChildOperators(childOpList);
+    op.getParentOperators().set(posn, ts_op);
+   
+    Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = opProcCtx.getMapCurrCtx();
+    mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
     
-    Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
-    
-    String streamDesc;
+    String streamDesc = taskTmpDir;
     mapredWork cplan = (mapredWork) childTask.getWork();
+
+    if (setReducer) {
+      Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
     
-    if (reducer.getClass() == JoinOperator.class) {
-      String origStreamDesc;
-      streamDesc = "$INTNAME";
-      origStreamDesc = streamDesc;
-      int pos = 0;
-      while (cplan.getAliasToWork().get(streamDesc) != null)
-        streamDesc = origStreamDesc.concat(String.valueOf(++pos));
+      if (reducer.getClass() == JoinOperator.class) {
+        String origStreamDesc;
+        streamDesc = "$INTNAME";
+        origStreamDesc = streamDesc;
+        int pos = 0;
+        while (cplan.getAliasToWork().get(streamDesc) != null)
+          streamDesc = origStreamDesc.concat(String.valueOf(++pos));
+      }
+
+      // TODO: Allocate work to remove the temporary files and make that
+      // dependent on the redTask
+      if (reducer.getClass() == JoinOperator.class)
+        cplan.setNeedsTagging(true);
     }
-    else
-      streamDesc = taskTmpDir;
-    
+        
     // Add the path to alias mapping
-    if (cplan.getPathToAliases().get(taskTmpDir) == null) {
-      cplan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
+    setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, local, tt_desc);
+
+    // This can be cleaned up as a function table in future
+    if (op instanceof MapJoinOperator) {
+      MapJoinOperator mjOp = (MapJoinOperator)op;
+      opProcCtx.setCurrMapJoinOp(mjOp);
+      GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
+      if (mjCtx == null)
+        mjCtx = new GenMRMapJoinCtx(taskTmpDir, tt_desc, ts_op, null);
+      else {
+        mjCtx.setTaskTmpDir(taskTmpDir);
+        mjCtx.setTTDesc(tt_desc);
+        mjCtx.setRootMapJoinOp(ts_op);
+      }
+      opProcCtx.setMapJoinCtx(mjOp, mjCtx);
+      opProcCtx.getMapCurrCtx().put(parent, new GenMapRedCtx(childTask, null, null));
     }
     
-    cplan.getPathToAliases().get(taskTmpDir).add(streamDesc);
-    cplan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
-    cplan.getAliasToWork().put(streamDesc, op);
-
-    // TODO: Allocate work to remove the temporary files and make that
-    // dependent on the redTask
-    if (reducer.getClass() == JoinOperator.class)
-      cplan.setNeedsTagging(true);
-
     currTopOp = null;
     String currAliasId = null;
     

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=781633&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Thu Jun  4 01:21:30 2009
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Stack;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Operator factory for MapJoin processing
+ */
+public class MapJoinFactory {
+
+  public static int getPositionParent(MapJoinOperator op, Stack<Node> stack) {
+    int pos = 0;
+    int size = stack.size();
+    assert size >= 2 && stack.get(size - 1) == op;
+    Operator<? extends Serializable> parent = (Operator<? extends Serializable>)stack.get(size - 2);
+    List<Operator<? extends Serializable>> parOp = op.getParentOperators();
+    pos = parOp.indexOf(parent);
+    assert pos < parOp.size(); 
+    return pos;
+  }
+  
+  /**
+   * TableScan followed by MapJoin
+   */
+  public static class TableScanMapJoin implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      MapJoinOperator mapJoin = (MapJoinOperator)nd;
+      GenMRProcContext ctx = (GenMRProcContext)procCtx;
+
+      // find the branch on which this processor was invoked
+      int pos = getPositionParent(mapJoin, stack);
+
+      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
+      Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+      mapredWork currPlan = (mapredWork) currTask.getWork();
+      Operator<? extends Serializable> currTopOp   = mapredCtx.getCurrTopOp();
+      String currAliasId = mapredCtx.getCurrAliasId();
+      Operator<? extends Serializable> reducer = mapJoin;
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+      Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+      
+      ctx.setCurrTopOp(currTopOp);
+      ctx.setCurrAliasId(currAliasId);
+      ctx.setCurrTask(currTask);
+      
+      // If the plan for this reducer does not exist, initialize the plan
+      if (opMapTask == null) {
+        assert currPlan.getReducer() == null;
+        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, false, false, false, pos);
+      }
+      // The current plan can be thrown away after being merged with the original plan
+      else {
+        GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false, false, false);
+        currTask = opMapTask;
+        ctx.setCurrTask(currTask);
+      }
+      
+      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+      return null;
+    }
+  }
+  
+  /**
+   * ReduceSink followed by MapJoin
+   */
+  public static class ReduceSinkMapJoin implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      MapJoinOperator mapJoin = (MapJoinOperator)nd;
+      GenMRProcContext opProcCtx = (GenMRProcContext)procCtx;
+      
+      mapredWork cplan = GenMapRedUtils.getMapRedWork();
+      ParseContext parseCtx = opProcCtx.getParseCtx();
+      Task<? extends Serializable> redTask  = TaskFactory.get(cplan, parseCtx.getConf());
+      Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
+
+      // find the branch on which this processor was invoked
+      int pos = getPositionParent(mapJoin, stack);
+      boolean local = (pos == ((mapJoinDesc)mapJoin.getConf()).getPosBigTable()) ? false : true;
+      
+      GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false, local, pos);
+
+      currTask = opProcCtx.getCurrTask();
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = opProcCtx.getOpTaskMap();
+      Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
+      
+      // If the plan for this reducer does not exist, initialize the plan
+      if (opMapTask == null) {
+        assert cplan.getReducer() == null;
+        opTaskMap.put(mapJoin, currTask);
+        opProcCtx.setCurrMapJoinOp(null);
+      }
+      // The current plan can be thrown away after being merged with the original plan
+      else {
+        GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, opProcCtx, pos, false, false, false);
+        currTask = opMapTask;
+        opProcCtx.setCurrTask(currTask);
+      }
+      
+      return null;
+    }
+  }
+
+  /**
+   * MapJoin followed by Select
+   */
+  public static class MapJoin implements NodeProcessor {
+
+    /**
+     * Create a task by splitting the plan below the join. The reason, we have to do so in the
+     * processing of Select and not MapJoin is due to the walker. While processing a node, it is not safe
+     * to alter its children because that will decide the course of the walk. It is perfectly fine to muck around
+     * with its parents though, since those nodes have already been visited.
+     */
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      SelectOperator  sel     = (SelectOperator)nd;
+      MapJoinOperator mapJoin = (MapJoinOperator)sel.getParentOperators().get(0);
+      assert sel.getParentOperators().size() == 1;
+
+      GenMRProcContext ctx = (GenMRProcContext)procCtx;
+      ParseContext parseCtx = ctx.getParseCtx();
+      ctx.setCurrMapJoinOp(mapJoin);
+      
+      Task<? extends Serializable> currTask = ctx.getCurrTask();
+      GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
+      if (mjCtx == null) {
+        mjCtx = new GenMRMapJoinCtx();
+        ctx.setMapJoinCtx(mapJoin, mjCtx);
+      }
+      
+      mapredWork mjPlan = GenMapRedUtils.getMapRedWork();
+      Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx.getConf());
+      
+      tableDesc tt_desc = 
+        PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol")); 
+      
+      // generate the temporary file
+      Context baseCtx = parseCtx.getContext();
+      String taskTmpDir = baseCtx.getMRTmpFileURI();
+      
+      // Add the path to alias mapping
+      mjCtx.setTaskTmpDir(taskTmpDir);
+      mjCtx.setTTDesc(tt_desc);
+      mjCtx.setRootMapJoinOp(sel);
+      
+      sel.setParentOperators(null);
+      
+      // Create a file sink operator for this file name
+      Operator<? extends Serializable> fs_op =
+        OperatorFactory.get
+        (new fileSinkDesc(taskTmpDir, tt_desc,
+                          parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE)),
+         mapJoin.getSchema());
+      
+      assert mapJoin.getChildOperators().size() == 1;
+      mapJoin.getChildOperators().set(0, fs_op);
+      
+      List<Operator<? extends Serializable>> parentOpList = new ArrayList<Operator<? extends Serializable>>();
+      parentOpList.add(mapJoin);
+      fs_op.setParentOperators(parentOpList);
+      
+      currTask.addDependentTask(mjTask);
+      
+      ctx.setCurrTask(mjTask);
+      ctx.setCurrAliasId(null);
+      ctx.setCurrTopOp(null);
+      
+      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+      mapCurrCtx.put((Operator<? extends Serializable>)nd, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+      
+      return null;
+    }
+  }
+
+  /**
+   * MapJoin followed by MapJoin
+   */
+  public static class MapJoinMapJoin implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      MapJoinOperator mapJoin = (MapJoinOperator)nd;
+      GenMRProcContext ctx = (GenMRProcContext)procCtx;
+
+      ParseContext parseCtx = ctx.getParseCtx();
+      MapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp();
+      assert oldMapJoin != null;
+      GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
+      if (mjCtx != null)
+        mjCtx.setOldMapJoin(oldMapJoin);
+      else
+        ctx.setMapJoinCtx(mapJoin, new GenMRMapJoinCtx(null, null, null, oldMapJoin));
+      ctx.setCurrMapJoinOp(mapJoin);
+
+      // find the branch on which this processor was invoked
+      int pos = getPositionParent(mapJoin, stack);
+
+      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
+      Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+      mapredWork currPlan = (mapredWork) currTask.getWork();
+      String currAliasId = mapredCtx.getCurrAliasId();
+      Operator<? extends Serializable> reducer = mapJoin;
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+      Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+      
+      ctx.setCurrTask(currTask);
+      
+      // If the plan for this reducer does not exist, initialize the plan
+      if (opMapTask == null) {
+        assert currPlan.getReducer() == null;
+        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, false, false, pos);
+      }
+      // The current plan can be thrown away after being merged with the original plan
+      else {
+        GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, ctx, pos, false, true, false);
+        currTask = opMapTask;
+        ctx.setCurrTask(currTask);
+      }
+      
+      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), null, null));
+      return null;
+    }
+  }
+  
+  /**
+   * Union followed by MapJoin
+   */
+  public static class UnionMapJoin implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      GenMRProcContext ctx = (GenMRProcContext)procCtx;
+
+      ParseContext parseCtx = ctx.getParseCtx();
+      UnionProcContext uCtx = parseCtx.getUCtx();
+
+      // union was map only - no special processing needed
+      if (uCtx.isMapOnlySubq())
+        return (new TableScanMapJoin()).process(nd, stack, procCtx, nodeOutputs);
+      
+      UnionOperator currUnion = ctx.getCurrUnionOp();
+      assert currUnion != null;
+      GenMRUnionCtx unionCtx = ctx.getUnionTask(currUnion);
+      MapJoinOperator mapJoin = (MapJoinOperator)nd;
+
+      // find the branch on which this processor was invoked
+      int pos = getPositionParent(mapJoin, stack);
+
+      Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
+      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(pos));
+      Task<? extends Serializable> currTask    = mapredCtx.getCurrTask();
+      mapredWork currPlan = (mapredWork) currTask.getWork();
+      Operator<? extends Serializable> reducer = mapJoin;
+      HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
+      Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+      
+      // union result cannot be a map table
+      boolean local = (pos == ((mapJoinDesc)mapJoin.getConf()).getPosBigTable()) ? false : true;
+      if (local)
+        throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_TABLE.getMsg());
+      
+      // If the plan for this reducer does not exist, initialize the plan
+      if (opMapTask == null) {
+        assert currPlan.getReducer() == null;
+        ctx.setCurrMapJoinOp(mapJoin);
+        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, true, false, pos);
+        ctx.setCurrUnionOp(null);
+      }
+      // The current plan can be thrown away after being merged with the original plan
+      else {
+        Task<? extends Serializable> uTask = ctx.getUnionTask(ctx.getCurrUnionOp()).getUTask();
+        if (uTask.getId().equals(opMapTask.getId()))
+          GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false, false, true);
+        else
+          GenMapRedUtils.joinPlan(mapJoin, uTask, opMapTask, ctx, pos, false, false, true);
+        currTask = opMapTask;
+        ctx.setCurrTask(currTask);
+      }
+      
+      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
+      return null;
+    }
+  }
+  
+  public static NodeProcessor getTableScanMapJoin() {
+    return new TableScanMapJoin();
+  }
+  
+  public static NodeProcessor getUnionMapJoin() {
+    return new UnionMapJoin();
+  }
+
+  public static NodeProcessor getReduceSinkMapJoin() {
+    return new ReduceSinkMapJoin();
+  }
+
+  public static NodeProcessor getMapJoin() {
+    return new MapJoin();
+  }
+
+  public static NodeProcessor getMapJoinMapJoin() {
+    return new MapJoinMapJoin();
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=781633&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Thu Jun  4 01:21:30 2009
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
+import org.apache.hadoop.hive.ql.parse.joinCond;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.selectDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.joinDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * Implementation of one of the rule-based map join optimization. User passes hints to specify map-joins and during this optimization,
+ * all user specified map joins are converted to MapJoins - the reduce sink operator above the join are converted to map sink operators.
+ * In future, once statistics are implemented, this transformation can also be done based on costs.
+ */
+public class MapJoinProcessor implements Transform {
+  private ParseContext pGraphContext;
+
+  /**
+   * empty constructor
+   */
+	public MapJoinProcessor() {
+    pGraphContext = null;
+	}
+
+  @SuppressWarnings("nls")
+  private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr) {
+    OpParseContext ctx = new OpParseContext(rr);
+    pGraphContext.getOpParseCtx().put(op, ctx);
+    return op;
+  }
+  
+  /**
+   * convert a regular join to a a map-side join. 
+   * @param op join operator
+   * @param qbJoin qb join tree
+   * @param mapJoinPos position of the source to be read as part of map-reduce framework. All other sources are cached in memory
+   */
+  private void convertMapJoin(ParseContext pctx, JoinOperator op, QBJoinTree joinTree, int mapJoinPos) throws SemanticException {
+    // outer join cannot be performed on a table which is being cached
+    joinDesc desc = op.getConf();
+    org.apache.hadoop.hive.ql.plan.joinCond[] condns = desc.getConds();
+    for (org.apache.hadoop.hive.ql.plan.joinCond condn : condns) {
+      if (condn.getType() == joinDesc.FULL_OUTER_JOIN)
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      if ((condn.getType() == joinDesc.LEFT_OUTER_JOIN) && (condn.getLeft() != mapJoinPos))
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      if ((condn.getType() == joinDesc.RIGHT_OUTER_JOIN) && (condn.getRight() != mapJoinPos))
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+    }
+
+    RowResolver outputRS = new RowResolver();
+    Map<Byte, List<exprNodeDesc>> keyExprMap   = new HashMap<Byte, List<exprNodeDesc>>();
+    Map<Byte, List<exprNodeDesc>> valueExprMap = new HashMap<Byte, List<exprNodeDesc>>();
+
+    // Walk over all the sources (which are guaranteed to be reduce sink operators). 
+    // The join outputs a concatenation of all the inputs.
+    QBJoinTree leftSrc = joinTree.getJoinSrc();
+
+    List<Operator<? extends Serializable>> parentOps = op.getParentOperators();
+    List<Operator<? extends Serializable>> newParentOps = new ArrayList<Operator<? extends Serializable>>();
+    
+    // found a source which is not to be stored in memory
+    if (leftSrc != null) {
+      //      assert mapJoinPos == 0;
+      Operator<? extends Serializable> parentOp = parentOps.get(0);
+      assert parentOp.getParentOperators().size() == 1;
+      Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
+      
+      grandParentOp.removeChild(parentOp);
+      newParentOps.add(grandParentOp);
+    }
+
+    int pos = 0;
+    // Remove parent reduce-sink operators
+    for (String src : joinTree.getBaseSrc()) {
+      if (src != null) {
+        Operator<? extends Serializable> parentOp = parentOps.get(pos);
+        assert parentOp.getParentOperators().size() == 1;
+        Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
+        
+        grandParentOp.removeChild(parentOp);
+
+        newParentOps.add(grandParentOp);
+      }
+      pos++;
+    }
+
+    int keyLength = 0;
+    int outputPos = 0;
+
+    // create the map-join operator
+    for (pos = 0; pos < newParentOps.size(); pos++) {
+      RowResolver inputRS = pGraphContext.getOpParseCtx().get(newParentOps.get(pos)).getRR();
+    
+      List<exprNodeDesc> keys   = new ArrayList<exprNodeDesc>();
+      List<exprNodeDesc> values = new ArrayList<exprNodeDesc>();
+
+      // Compute join keys and store in reduceKeys
+      Vector<ASTNode> exprs = joinTree.getExpressions().get(pos);
+      for (int i = 0; i < exprs.size(); i++) {
+        ASTNode expr = exprs.get(i);
+        keys.add(SemanticAnalyzer.genExprNodeDesc(expr, inputRS));
+      }
+
+      if (pos == 0)
+        keyLength = keys.size();
+      else
+        assert (keyLength == keys.size());
+    
+      keyExprMap.put(new Byte((byte)pos), keys);
+
+      Iterator<String> keysIter = inputRS.getTableNames().iterator();
+      while (keysIter.hasNext())
+      {
+        String key = keysIter.next();
+        HashMap<String, ColumnInfo> rrMap = inputRS.getFieldMap(key);
+        Iterator<String> fNamesIter = rrMap.keySet().iterator();
+        while (fNamesIter.hasNext())
+        {
+          String field = fNamesIter.next();
+          ColumnInfo valueInfo = inputRS.get(key, field);
+          values.add(new exprNodeColumnDesc(valueInfo.getType(), valueInfo.getInternalName()));
+          if (outputRS.get(key, field) == null)
+            outputRS.put(key, field, new ColumnInfo((Integer.valueOf(outputPos++)).toString(), 
+                                                    valueInfo.getType()));
+        }
+      }
+      
+      valueExprMap.put(new Byte((byte)pos), values);      
+    }
+
+    // implicit type conversion hierarchy
+    for (int k = 0; k < keyLength; k++) {
+      // Find the common class for type conversion
+      TypeInfo commonType = keyExprMap.get(new Byte((byte)0)).get(k).getTypeInfo();
+      for (int i=1; i < newParentOps.size(); i++) {
+        TypeInfo a = commonType;
+        TypeInfo b = keyExprMap.get(new Byte((byte)i)).get(k).getTypeInfo(); 
+        commonType = FunctionRegistry.getCommonClass(a, b);
+        if (commonType == null) {
+          throw new SemanticException("Cannot do equality join on different types: " + a.getTypeName() + " and " + b.getTypeName());
+        }
+      }
+      
+      // Add implicit type conversion if necessary
+      for (int i=0; i < newParentOps.size(); i++) {
+        if (!commonType.equals(keyExprMap.get(new Byte((byte)i)).get(k).getTypeInfo())) {
+          keyExprMap.get(new Byte((byte)i)).set(k, TypeCheckProcFactory.DefaultExprProcessor.getFuncExprNodeDesc(commonType.getTypeName(), keyExprMap.get(new Byte((byte)i)).get(k)));
+        }
+      }
+    }
+    
+    org.apache.hadoop.hive.ql.plan.joinCond[] joinCondns = new org.apache.hadoop.hive.ql.plan.joinCond[joinTree.getJoinCond().length];
+    for (int i = 0; i < joinTree.getJoinCond().length; i++) {
+      joinCond condn = joinTree.getJoinCond()[i];
+      joinCondns[i] = new org.apache.hadoop.hive.ql.plan.joinCond(condn);
+    }
+
+    Operator[] newPar = new Operator[newParentOps.size()];
+    pos = 0;
+    for (Operator<? extends Serializable> o : newParentOps)
+      newPar[pos++] = o;
+
+    List<exprNodeDesc> keyCols = keyExprMap.get(new Byte((byte)0));
+    StringBuilder keyOrder = new StringBuilder();
+    for (int i=0; i < keyCols.size(); i++) {
+      keyOrder.append("+");
+    }
+    
+    tableDesc keyTableDesc = 
+      PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromColumnList(keyCols, "mapjoinkey"));
+
+    List<tableDesc> valueTableDescs = new ArrayList<tableDesc>();
+    
+    for (pos = 0; pos < newParentOps.size(); pos++) {
+      List<exprNodeDesc> valueCols = valueExprMap.get(new Byte((byte)pos));
+      keyOrder = new StringBuilder();
+      for (int i=0; i < valueCols.size(); i++) {
+        keyOrder.append("+");
+      }
+              
+      tableDesc valueTableDesc = 
+        PlanUtils.getLazySimpleSerDeTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
+    
+      valueTableDescs.add(valueTableDesc);
+    }
+      
+    MapJoinOperator mapJoinOp = (MapJoinOperator)putOpInsertMap(OperatorFactory.getAndMakeChild(
+      new mapJoinDesc(keyExprMap, keyTableDesc, valueExprMap, valueTableDescs, mapJoinPos, joinCondns),
+      new RowSchema(outputRS.getColumnInfos()), newPar), outputRS);
+    
+    // change the children of the original join operator to point to the map join operator
+    List<Operator<? extends Serializable>> childOps = op.getChildOperators();
+    for (Operator<? extends Serializable> childOp : childOps) 
+      childOp.replaceParent(op, mapJoinOp);
+    
+    // TODO: do as part of replaceParent
+    mapJoinOp.setChildOperators(childOps);
+    mapJoinOp.setParentOperators(newParentOps);
+    op.setChildOperators(null);
+    op.setParentOperators(null);
+
+    // create a dummy select to select all columns
+    genSelectPlan(pctx, mapJoinOp);
+  }
+
+  private void genSelectPlan(ParseContext pctx, Operator<? extends Serializable> input) {
+    List<Operator<? extends Serializable>> childOps = input.getChildOperators();
+    input.setChildOperators(null);
+
+    // create a dummy select - This select is needed by the walker to split the mapJoin later on
+  	RowResolver inputRR = pctx.getOpParseCtx().get(input).getRR();
+    SelectOperator sel = 
+      (SelectOperator)putOpInsertMap(OperatorFactory.getAndMakeChild(
+                       new selectDesc(true), new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+    
+    // Insert the select operator in between. 
+    sel.setChildOperators(childOps);
+    for (Operator<? extends Serializable> ch: childOps) {
+      ch.replaceParent(input, sel);
+    }
+  }
+
+  /**
+   * Is it a map-side join. 
+   * @param op join operator
+   * @param qbJoin qb join tree
+   * @return -1 if it cannot be converted to a map-side join, position of the map join node otherwise
+   */
+  private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticException {
+    int mapJoinPos = -1;
+    if (joinTree.isMapSideJoin()) {
+      int pos = 0;
+      // In a map-side join, exactly one table is not present in memory.
+      // The client provides the list of tables which can be cached in memory via a hint.
+      if (joinTree.getJoinSrc() != null) 
+        mapJoinPos = pos;
+      for (String src : joinTree.getBaseSrc()) {
+        if (src != null) {
+          if (!joinTree.getMapAliases().contains(src)) {
+            if (mapJoinPos >= 0) 
+              return -1;
+            mapJoinPos = pos;
+          }
+        }
+        pos++;
+      }
+      
+      // All tables are to be cached - this is not possible. In future, we can support this by randomly 
+      // leaving some table from the list of tables to be cached
+      if (mapJoinPos == -1) 
+        throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_HINT.getMsg(pGraphContext.getQB().getParseInfo().getHints()));
+    }
+
+    return mapJoinPos;
+  }
+
+  /**
+   * Transform the query tree. For each join, check if it is a map-side join (user specified). If yes, 
+   * convert it to a map-side join.
+   * @param pactx current parse context
+   */
+  public ParseContext transform(ParseContext pactx) throws SemanticException {
+    this.pGraphContext = pactx;
+
+    // traverse all the joins and convert them if necessary
+    if (pGraphContext.getJoinContext() != null) {
+      Map<JoinOperator, QBJoinTree> joinMap = new HashMap<JoinOperator, QBJoinTree>();
+      
+      Set<Map.Entry<JoinOperator, QBJoinTree>> joinCtx = pGraphContext.getJoinContext().entrySet();
+      Iterator<Map.Entry<JoinOperator, QBJoinTree>> joinCtxIter = joinCtx.iterator();
+      while (joinCtxIter.hasNext()) {
+        Map.Entry<JoinOperator, QBJoinTree> joinEntry = joinCtxIter.next();
+        JoinOperator joinOp = joinEntry.getKey();
+        QBJoinTree   qbJoin = joinEntry.getValue();
+        int mapJoinPos = mapSideJoin(joinOp, qbJoin);
+        if (mapJoinPos >= 0) {
+          convertMapJoin(pactx, joinOp, qbJoin, mapJoinPos);
+        }
+        else {
+          joinMap.put(joinOp, qbJoin);
+        }
+      }
+      
+      // store the new joinContext
+      pGraphContext.setJoinContext(joinMap);
+    }
+
+    return pGraphContext;
+	}
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Thu Jun  4 01:21:30 2009
@@ -50,6 +50,7 @@
     if (hiveConf.getBoolean("hive.optimize.ppd", false))
       transformations.add(new PredicatePushDown());
     transformations.add(new UnionProcessor());
+		transformations.add(new MapJoinProcessor());
 	}
 	
 	/**

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Thu Jun  4 01:21:30 2009
@@ -507,7 +507,7 @@
     prop.setProperty("columns.types", colTypes[1]);
 
     fetchWork fetch = new fetchWork(
-      ctx.getResFile(),
+      ctx.getResFile().toString(),
       new tableDesc(LazySimpleSerDe.class, TextInputFormat.class, IgnoreKeyTextOutputFormat.class, prop),
       -1
     );    

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java Thu Jun  4 01:21:30 2009
@@ -78,6 +78,9 @@
   INVALID_INPUT_FORMAT_TYPE("Input Format must implement InputFormat"),
   INVALID_OUTPUT_FORMAT_TYPE("Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat"),
   NO_VALID_PARTN("The query does not reference any valid partition. To run this query, set hive.mapred.mode=nonstrict"),
+  NO_OUTER_MAPJOIN("Map Join cannot be performed with Outer join"),
+  INVALID_MAPJOIN_HINT("neither table specified as map-table"),
+  INVALID_MAPJOIN_TABLE("result of a union cannot be a map table"),
   NON_BUCKETED_TABLE("Sampling Expression Needed for Non-Bucketed Table");
 
   private String mesg;
@@ -110,7 +113,7 @@
     return getText((ASTNode)tree.getChild(tree.getChildCount() - 1));
   }
 
-  String getMsg(ASTNode tree) {
+  public String getMsg(ASTNode tree) {
     return "line " + getLine(tree) + ":" + getCharPositionInLine(tree) + " " + mesg + " " + getText(tree);
   }
 
@@ -126,7 +129,7 @@
     return getMsg((ASTNode)tree, reason);
   }
 
-  String getMsg() {
+  public String getMsg() {
     return mesg;
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Thu Jun  4 01:21:30 2009
@@ -125,6 +125,10 @@
 TOK_LIMIT;
 TOK_TABLEPROPERTY;
 TOK_IFNOTEXISTS;
+TOK_HINTLIST;
+TOK_HINT;
+TOK_MAPJOIN;
+TOK_HINTARGLIST;
 }
 
 
@@ -624,7 +628,49 @@
 @init { msgs.push("select list"); }
 @after { msgs.pop(); }
     :
-    selectItem ( COMMA  selectItem )* -> selectItem+
+    hintClause? selectItem ( COMMA  selectItem )* -> hintClause? selectItem+
+    ;
+
+hintClause
+@init { msgs.push("hint clause"); }
+@after { msgs.pop(); }
+    :
+    DIVIDE STAR PLUS hintList STAR DIVIDE -> ^(TOK_HINTLIST hintList)
+    ;
+
+hintList
+@init { msgs.push("hint list"); }
+@after { msgs.pop(); }
+    :
+    hintItem (COMMA hintItem)* -> hintItem+
+    ;
+
+hintItem
+@init { msgs.push("hint item"); }
+@after { msgs.pop(); }
+    :
+    hintName (LPAREN hintArgs RPAREN)? -> ^(TOK_HINT hintName hintArgs)
+    ;
+
+hintName
+@init { msgs.push("hint name"); }
+@after { msgs.pop(); }
+    :
+    KW_MAPJOIN -> TOK_MAPJOIN
+    ;
+
+hintArgs
+@init { msgs.push("hint arguments"); }
+@after { msgs.pop(); }
+    :
+    hintArgName (COMMA hintArgName)* -> ^(TOK_HINTARGLIST hintArgName+)
+    ;
+
+hintArgName
+@init { msgs.push("hint argument name"); }
+@after { msgs.pop(); }
+    :
+    Identifier
     ;
 
 selectItem
@@ -1178,6 +1224,7 @@
 KW_THEN: 'THEN';
 KW_ELSE: 'ELSE';
 KW_END: 'END';
+KW_MAPJOIN: 'MAPJOIN';
 
 // Operators
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Thu Jun  4 01:21:30 2009
@@ -21,7 +21,9 @@
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.plan.loadFileDesc;
 import org.apache.hadoop.hive.ql.plan.loadTableDesc;
@@ -47,6 +49,7 @@
   private HashMap<String, Operator<? extends Serializable>> topOps;
   private HashMap<String, Operator<? extends Serializable>> topSelOps;
   private HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx;
+  private Map<JoinOperator, QBJoinTree> joinContext;
   private List<loadTableDesc> loadTableWork;
   private List<loadFileDesc> loadFileWork;
   private Context ctx;
@@ -82,6 +85,7 @@
       HashMap<String, Operator<? extends Serializable>> topOps,
       HashMap<String, Operator<? extends Serializable>> topSelOps,
       HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx,
+      Map<JoinOperator, QBJoinTree> joinContext,
       List<loadTableDesc> loadTableWork, List<loadFileDesc> loadFileWork,
       Context ctx, HashMap<String, String> idToTableNameMap, int destTableId, UnionProcContext uCtx) {
     this.conf = conf;
@@ -89,6 +93,7 @@
     this.ast = ast;
     this.aliasToPruner = aliasToPruner;
     this.aliasToSamplePruner = aliasToSamplePruner;
+    this.joinContext = joinContext;
     this.loadFileWork = loadFileWork;
     this.loadTableWork = loadTableWork;
     this.opParseCtx = opParseCtx;
@@ -292,4 +297,18 @@
     this.uCtx = uCtx;
   }
 
+  /**
+   * @return the joinContext
+   */
+  public Map<JoinOperator, QBJoinTree> getJoinContext() {
+    return joinContext;
+  }
+
+  /**
+   * @param joinContext the joinContext to set
+   */
+  public void setJoinContext(Map<JoinOperator, QBJoinTree> joinContext) {
+    this.joinContext = joinContext;
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionPruner.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionPruner.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionPruner.java Thu Jun  4 01:21:30 2009
@@ -416,13 +416,17 @@
     // unknown partitions - may/may not satisfy the partition criteria
     private Set<Partition>  unknownPartns;
 
+    // denied partitions - do not satisfy the partition criteria
+    private Set<Partition> deniedPartns;
+
     /**
      * @param confirmedPartns  confirmed paritions
      * @param unknownPartns    unknown partitions
      */
-    public PrunedPartitionList(Set<Partition> confirmedPartns, Set<Partition> unknownPartns) {
+    public PrunedPartitionList(Set<Partition> confirmedPartns, Set<Partition> unknownPartns, Set<Partition> deniedPartns) {
       this.confirmedPartns  = confirmedPartns;
       this.unknownPartns    = unknownPartns;
+      this.deniedPartns     = deniedPartns;
     }
 
     /**
@@ -442,6 +446,14 @@
     }
 
     /**
+     * get denied partitions
+     * @return deniedPartns  denied paritions
+     */
+    public Set<Partition>  getDeniedPartns() {
+      return deniedPartns;
+    }
+
+    /**
      * set confirmed partitions
      * @param confirmedPartns  confirmed paritions
      */
@@ -470,6 +482,7 @@
 
     LinkedHashSet<Partition> true_parts = new LinkedHashSet<Partition>();
     LinkedHashSet<Partition> unkn_parts = new LinkedHashSet<Partition>();
+    LinkedHashSet<Partition> denied_parts = new LinkedHashSet<Partition>();
 
     try {
       StructObjectInspector rowObjectInspector = (StructObjectInspector)this.tab.getDeserializer().getObjectInspector();
@@ -505,6 +518,10 @@
             Boolean r = (Boolean) ((PrimitiveObjectInspector)evaluateResultOI).getPrimitiveJavaObject(evaluateResultO);
             LOG.trace("prune result for partition " + partSpec + ": " + r);
             if (Boolean.FALSE.equals(r)) {
+              if (denied_parts.isEmpty()) {
+                Partition part = Hive.get().getPartition(tab, partSpec, Boolean.FALSE);
+                denied_parts.add(part);
+              }
               LOG.trace("pruned partition: " + partSpec);
             } else {
               Partition part = Hive.get().getPartition(tab, partSpec, Boolean.FALSE);
@@ -529,7 +546,7 @@
     }
 
     // Now return the set of partitions
-    return new PrunedPartitionList(true_parts, unkn_parts);
+    return new PrunedPartitionList(true_parts, unkn_parts, denied_parts);
   }
 
   public Table getTable() {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java Thu Jun  4 01:21:30 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.util.Vector;
+import java.util.List;
 
 /**
  * Internal representation of the join tree
@@ -40,6 +41,10 @@
 
   // filters
   private Vector<Vector<ASTNode>> filters;
+
+  // user asked for map-side join
+  private  boolean        mapSideJoin;
+  private  List<String>   mapAliases;
   
   /**
    * constructor 
@@ -140,6 +145,33 @@
 		this.filters = filters;
 	}
 
+  /**
+   * @return the mapSidejoin
+   */
+  public boolean isMapSideJoin() {
+    return mapSideJoin;
+  }
+
+  /**
+   * @param mapSidejoin the mapSidejoin to set
+   */
+  public void setMapSideJoin(boolean mapSideJoin) {
+    this.mapSideJoin = mapSideJoin;
+  }
+
+  /**
+   * @return the mapAliases
+   */
+  public List<String> getMapAliases() {
+    return mapAliases;
+  }
+
+  /**
+   * @param mapAliases the mapAliases to set
+   */
+  public void setMapAliases(List<String> mapAliases) {
+    this.mapAliases = mapAliases;
+  }
 }
 
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Thu Jun  4 01:21:30 2009
@@ -34,6 +34,7 @@
   private boolean isSubQ;
   private String alias;
   private ASTNode joinExpr;
+  private ASTNode hints;
   private HashMap<String, ASTNode> aliasToSrc;
   private HashMap<String, ASTNode> nameToDest;
   private HashMap<String, TableSample> nameToSample;
@@ -331,5 +332,12 @@
 
     return true;
   }
-  
+
+  public void setHints(ASTNode hint) {
+    this.hints = hint;
+  }
+
+  public ASTNode getHints() {
+    return hints;
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jun  4 01:21:30 2009
@@ -72,6 +72,7 @@
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
 import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
@@ -84,6 +85,7 @@
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
+import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink4;
 import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -141,6 +143,7 @@
   private HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx;
   private List<loadTableDesc> loadTableWork;
   private List<loadFileDesc> loadFileWork;
+  private Map<JoinOperator, QBJoinTree> joinContext;
   private QB qb;
   private ASTNode ast;
   private int destTableId;
@@ -171,6 +174,7 @@
     this.loadTableWork = new ArrayList<loadTableDesc>();
     this.loadFileWork = new ArrayList<loadFileDesc>();
     opParseCtx = new HashMap<Operator<? extends Serializable>, OpParseContext>();
+    joinContext = new HashMap<JoinOperator, QBJoinTree>();
     this.destTableId = 1;
     this.uCtx = null;
     
@@ -192,6 +196,9 @@
     qb = null;
     ast = null;
     uCtx = null;
+    this.aliasToSamplePruner.clear();
+    this.joinContext.clear();
+    this.opParseCtx.clear();
   }
 
   public void init(ParseContext pctx) {
@@ -202,15 +209,17 @@
     opParseCtx = pctx.getOpParseCtx();
     loadTableWork = pctx.getLoadTableWork();
     loadFileWork = pctx.getLoadFileWork();
+    joinContext = pctx.getJoinContext();
     ctx = pctx.getContext();
     destTableId = pctx.getDestTableId();
     idToTableNameMap = pctx.getIdToTableNameMap();
     this.uCtx = pctx.getUCtx();
+    qb = pctx.getQB();
   }
 
   public ParseContext getParseContext() {
     return new ParseContext(conf, qb, ast, aliasToPruner, aliasToSamplePruner, topOps, 
-                            topSelOps, opParseCtx, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx);
+                            topSelOps, opParseCtx, joinContext, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx);
   }
   
   @SuppressWarnings("nls")
@@ -424,6 +433,10 @@
       case HiveParser.TOK_SELECT:
         qb.countSel();
         qbp.setSelExprForClause(ctx_1.dest, ast);
+
+        if (((ASTNode)ast.getChild(0)).getToken().getType() == HiveParser.TOK_HINTLIST)
+          qbp.setHints((ASTNode)ast.getChild(0));
+
         LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast);
         qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
         qbp.setDistinctFuncExprForClause(ctx_1.dest,
@@ -1220,11 +1233,16 @@
     RowResolver inputRR = opParseCtx.get(input).getRR();
     // SELECT * or SELECT TRANSFORM(*)
     boolean selectStar = false;
+    int posn = 0;
+    boolean hintPresent = (selExprList.getChild(0).getType() == HiveParser.TOK_HINTLIST);
+    if (hintPresent) {
+      posn++;
+    }
 
-    boolean isInTransform = (selExprList.getChild(0).getChild(0).getType() 
+    boolean isInTransform = (selExprList.getChild(posn).getChild(0).getType() 
         == HiveParser.TOK_TRANSFORM);
     if (isInTransform) {
-      trfm = (ASTNode) selExprList.getChild(0).getChild(0);
+      trfm = (ASTNode) selExprList.getChild(posn).getChild(0);
     }
     
     // The list of expressions after SELECT or SELECT TRANSFORM.
@@ -1232,7 +1250,7 @@
 
     LOG.debug("genSelectPlan: input = " + inputRR.toString());
     // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM)
-    for (int i = 0; i < exprList.getChildCount(); ++i) {
+    for (int i = posn; i < exprList.getChildCount(); ++i) {
 
       // child can be EXPR AS ALIAS, or EXPR.
       ASTNode child = (ASTNode) exprList.getChild(i);
@@ -1294,8 +1312,7 @@
         pos = Integer.valueOf(pos.intValue() + 1);
       }
     }
-    selectStar = selectStar && exprList.getChildCount() == 1;
-
+    selectStar = selectStar && exprList.getChildCount() == posn + 1;
     
     Map<String, exprNodeDesc> colExprMap = new HashMap<String, exprNodeDesc>();
     for (int i=0; i<col_list.size(); i++) {
@@ -2553,7 +2570,7 @@
     int pos = 0;
     int outputPos = 0;
 
-    HashMap<Byte, ArrayList<exprNodeDesc>> exprMap = new HashMap<Byte, ArrayList<exprNodeDesc>>();
+    HashMap<Byte, List<exprNodeDesc>> exprMap = new HashMap<Byte, List<exprNodeDesc>>();
     Map<String, exprNodeDesc> colExprMap = new HashMap<String, exprNodeDesc>();
     HashMap<Integer, Set<String>> posToAliasMap = new HashMap<Integer, Set<String>>();
     for (Operator input : right)
@@ -2694,7 +2711,9 @@
     // Type checking and implicit type conversion for join keys
     genJoinOperatorTypeCheck(joinSrcOp, srcOps);
     
-    return genJoinOperatorChildren(joinTree, joinSrcOp, srcOps);
+    JoinOperator joinOp = (JoinOperator)genJoinOperatorChildren(joinTree, joinSrcOp, srcOps);
+    joinContext.put(joinOp, joinTree);
+    return joinOp;
   }
 
   private void genJoinOperatorTypeCheck(Operator left, Operator[] right) throws SemanticException {
@@ -2735,6 +2754,7 @@
     for (int i=0; i<right.length; i++) {
       Operator oi = (i==0 && right[i] == null ? left : right[i]);
       reduceSinkDesc now = ((ReduceSinkOperator)(oi)).getConf();
+
       now.setKeySerializeInfo(
           PlanUtils.getBinarySortableTableDesc(
               PlanUtils.getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey"),
@@ -2772,8 +2792,29 @@
       pos++;
     }
   }
+
+  private List<String> getMapSideJoinTables(QB qb) {
+    List<String> cols = null;
+    ASTNode hints = qb.getParseInfo().getHints();
+    for (int pos = 0; pos < hints.getChildCount(); pos++) {
+      ASTNode hint = (ASTNode)hints.getChild(pos);
+      if (((ASTNode)hint.getChild(0)).getToken().getType() == HiveParser.TOK_MAPJOIN) {
+        ASTNode hintTblNames = (ASTNode)hint.getChild(1);
+        int numCh = hintTblNames.getChildCount();
+        for (int tblPos = 0; tblPos < numCh; tblPos++) {
+          String tblName = ((ASTNode)hintTblNames.getChild(tblPos)).getText();
+          if (cols == null)
+            cols = new ArrayList<String>();
+          if (!cols.contains(tblName))
+            cols.add(tblName);
+        }
+      }
+    }
+    
+    return cols;
+  }
   
-  private QBJoinTree genJoinTree(ASTNode joinParseTree)
+  private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree)
       throws SemanticException {
     QBJoinTree joinTree = new QBJoinTree();
     joinCond[] condn = new joinCond[1];
@@ -2818,7 +2859,7 @@
       joinTree.setBaseSrc(children);
     }
     else if (isJoinToken(left)) {
-      QBJoinTree leftTree = genJoinTree(left);
+      QBJoinTree leftTree = genJoinTree(qb, left);
       joinTree.setJoinSrc(leftTree);
       String[] leftChildAliases = leftTree.getLeftAliases();
       String leftAliases[] = new String[leftChildAliases.length + 1];
@@ -2861,6 +2902,35 @@
     if (leftSrc.size() == 1)
       joinTree.setLeftAlias(leftSrc.get(0));
 
+    // check the hints to see if the user has specified a map-side join. This will be removed later on, once the cost-based 
+    // infrastructure is in place
+    if (qb.getParseInfo().getHints() != null) {
+      List<String> mapSideTables = getMapSideJoinTables(qb);
+      List<String> mapAliases    = joinTree.getMapAliases();
+
+      for (String mapTbl : mapSideTables) {
+        boolean mapTable = false;
+        for (String leftAlias : joinTree.getLeftAliases()) {
+          if (mapTbl.equals(leftAlias))
+            mapTable = true;
+        }
+        for (String rightAlias : joinTree.getRightAliases()) {
+          if (mapTbl.equals(rightAlias))
+            mapTable = true;
+        }
+        
+        if (mapTable) {
+          if (mapAliases == null) {
+            mapAliases = new ArrayList<String>();
+          }
+          mapAliases.add(mapTbl);
+          joinTree.setMapSideJoin(true);
+        }
+      }
+
+      joinTree.setMapAliases(mapAliases);
+    }
+
     return joinTree;
   }
 
@@ -2930,6 +3000,14 @@
     }
 
     target.setJoinCond(newCondns);
+    if (target.isMapSideJoin()) {
+      assert node.isMapSideJoin();
+      List<String> mapAliases = target.getMapAliases();
+      for (String mapTbl : node.getMapAliases())
+        if (!mapAliases.contains(mapTbl))
+          mapAliases.add(mapTbl);
+      target.setMapAliases(mapAliases);
+    }
   }
 
   private int findMergePos(QBJoinTree node, QBJoinTree target) {
@@ -3447,7 +3525,7 @@
     // process join
     if (qb.getParseInfo().getJoinExpr() != null) {
       ASTNode joinExpr = qb.getParseInfo().getJoinExpr();
-      QBJoinTree joinTree = genJoinTree(joinExpr);
+      QBJoinTree joinTree = genJoinTree(qb, joinExpr);
       qb.setQbJoinTree(joinTree);
       mergeJoinTree(qb);
 
@@ -3507,7 +3585,7 @@
       Table tab = ((Map.Entry<String, Table>)iter.next()).getValue();
       if (!tab.isPartitioned()) {
         if (qbParseInfo.getDestToWhereExpr().isEmpty())
-          fetch = new fetchWork(tab.getPath(), Utilities.getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit()); 
+          fetch = new fetchWork(tab.getPath().toString(), Utilities.getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit()); 
         inputs.add(new ReadEntity(tab));
       }
       else {
@@ -3515,7 +3593,7 @@
           Iterator<Map.Entry<String, PartitionPruner>> iterP = aliasToPruner.entrySet().iterator();
           PartitionPruner pr = ((Map.Entry<String, PartitionPruner>)iterP.next()).getValue();
           if (pr.onlyContainsPartitionCols()) {
-            List<Path> listP = new ArrayList<Path>();
+            List<String> listP = new ArrayList<String>();
             List<partitionDesc> partP = new ArrayList<partitionDesc>();
             PartitionPruner.PrunedPartitionList partsList = null;
             Set<Partition> parts = null;
@@ -3527,7 +3605,7 @@
                 Iterator<Partition> iterParts = parts.iterator();
                 while (iterParts.hasNext()) {
                   Partition part = iterParts.next();
-                  listP.add(part.getPartitionPath());
+                  listP.add(part.getPartitionPath().toString());
                   partP.add(Utilities.getPartitionDesc(part));
                   inputs.add(new ReadEntity(part));
                 }
@@ -3554,7 +3632,7 @@
         throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
       String cols = loadFileWork.get(0).getColumns();
     
-      fetch = new fetchWork(new Path(loadFileWork.get(0).getSourceDir()),
+      fetch = new fetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
       			                new tableDesc(LazySimpleSerDe.class, TextInputFormat.class,
                  			                		IgnoreKeyTextOutputFormat.class,
       			                           		Utilities.makeProperties(
@@ -3590,6 +3668,12 @@
     opRules.put(new RuleRegExp(new String("R4"), "FS%"), new GenMRFileSink1());
     opRules.put(new RuleRegExp(new String("R5"), "UNION%"), new GenMRUnion1());
     opRules.put(new RuleRegExp(new String("R6"), "UNION%.*RS%"), new GenMRRedSink3());
+    opRules.put(new RuleRegExp(new String("R6"), "MAPJOIN%.*RS%"), new GenMRRedSink4());
+    opRules.put(new RuleRegExp(new String("R7"), "TS%.*MAPJOIN%"), MapJoinFactory.getTableScanMapJoin());
+    opRules.put(new RuleRegExp(new String("R8"), "RS%.*MAPJOIN%"), MapJoinFactory.getReduceSinkMapJoin());
+    opRules.put(new RuleRegExp(new String("R9"), "UNION%.*MAPJOIN%"), MapJoinFactory.getUnionMapJoin());
+    opRules.put(new RuleRegExp(new String("R10"), "MAPJOIN%.*MAPJOIN%"), MapJoinFactory.getMapJoinMapJoin());
+    opRules.put(new RuleRegExp(new String("R11"), "MAPJOIN%SEL%"), MapJoinFactory.getMapJoin());
 
     // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, procCtx);
@@ -3616,8 +3700,9 @@
     if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) {
       HashMap<String, Operator<? extends Serializable>> opMap = ((mapredWork)task.getWork()).getAliasToWork();
       if (!opMap.isEmpty())
-        for (Operator<? extends Serializable> op: opMap.values())
+        for (Operator<? extends Serializable> op: opMap.values()) {
           breakOperatorTree(op);
+        }
     }
 
     if (task.getChildTasks() == null)
@@ -3687,7 +3772,7 @@
     
 
     ParseContext pCtx = new ParseContext(conf, qb, ast, aliasToPruner, aliasToSamplePruner, topOps, 
-    		                                 topSelOps, opParseCtx, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx);
+    		                                 topSelOps, opParseCtx, joinContext, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx);
   
     Optimizer optm = new Optimizer();
     optm.setPctx(pCtx);
@@ -3723,7 +3808,7 @@
    * @throws SemanticException
    */
   @SuppressWarnings("nls")
-  private exprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input)
+  public static exprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input)
   throws SemanticException {
     //  We recursively create the exprNodeDesc.  Base cases:  when we encounter 
     //  a column ref, we convert that into an exprNodeColumnDesc;  when we encounter 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Thu Jun  4 01:21:30 2009
@@ -53,7 +53,7 @@
                           new tableDesc(),
                           new ArrayList<tableDesc> (),
                           null,
-                          Integer.valueOf (1));
+                          Integer.valueOf (1), null);
   }
   
   /** 
@@ -155,7 +155,7 @@
   /** 
    * Convert the ColumnList to FieldSchema list.
    */
-  public static List<FieldSchema> getFieldSchemasFromColumnList(ArrayList<exprNodeDesc> cols, 
+  public static List<FieldSchema> getFieldSchemasFromColumnList(List<exprNodeDesc> cols, 
       String fieldPrefix) {
     List<FieldSchema> schemas = new ArrayList<FieldSchema>(cols.size());
     for (int i=0; i<cols.size(); i++) {
@@ -205,9 +205,9 @@
                                                  int numReducers) {
     
     return new reduceSinkDesc(keyCols, valueCols, tag, partitionCols, numReducers, 
-        getBinarySortableTableDesc(getFieldSchemasFromColumnList(keyCols, "reducesinkkey"), order),
-        // Revert to DynamicSerDe: getBinaryTableDesc(getFieldSchemasFromColumnList(valueCols, "reducesinkvalue")));
-        getLazySimpleSerDeTableDesc(getFieldSchemasFromColumnList(valueCols, "reducesinkvalue")));
+      getBinarySortableTableDesc(getFieldSchemasFromColumnList(keyCols, "reducesinkkey"), order),
+      // Revert to DynamicSerDe: getBinaryTableDesc(getFieldSchemasFromColumnList(valueCols, "reducesinkvalue")));
+      getLazySimpleSerDeTableDesc(getFieldSchemasFromColumnList(valueCols, "reducesinkvalue")));
   }
 
   /**