You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2012/12/19 02:20:59 UTC

svn commit: r1423731 [1/2] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/optimizer/ java/org/apache/hadoop/hive/ql/parse/ java/org/apache/hadoop/hive/ql/plan/ test/queries/clientpositive/ test/results/clie...

Author: kevinwilfong
Date: Wed Dec 19 01:20:56 2012
New Revision: 1423731

URL: http://svn.apache.org/viewvc?rev=1423731&view=rev
Log:
HIVE-3633. sort-merge join does not work with sub-queries. (njain via kevinwilfong)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java
    hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_14.q.out
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java?rev=1423731&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java Wed Dec 19 01:20:56 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+
+/**
+ * For SortMerge joins, this is a dummy operator, which stores the row for the
+ * small table before it reaches the sort merge join operator.
+ *
+ * Consider a query like:
+ *
+ * select * from
+ *   (subq1 --> has a filter)
+ *   join
+ *   (subq2 --> has a filter)
+ * on some key
+ *
+ * Let us assume that subq1 is the small table (either specified by the user or inferred
+ * automatically). Since there can be multiple buckets/partitions for the table corresponding
+ * to subq1 given a file in subq2, a priority queue is present in SMBMapJoinOperator to scan the
+ * various buckets and fetch the least row (corresponding to the join key). The tree corresponding
+ * to subq1 needs to be evaluated in order to compute the join key (since the select list for the
+ * join key can move across different object inspectors).
+ *
+ * Therefore the following operator tree is created:
+ *
+ * TableScan (subq1) --> Select --> Filter --> DummyStore
+ *                                                         \
+ *                                                          \     SMBJoin
+ *                                                          /
+ *                                                         /
+ * TableScan (subq2) --> Select --> Filter
+ *
+ * In order to fetch the row with the least join key from the small table, the row from subq1
+ * is partially processed, and stored in DummyStore. For the actual processing of the join,
+ * SMBJoin (child of DummyStore) is processed for the transformed row. Note that in the absence of
+ * support for joins for sub-queries, this was not needed, since all transformations were done
+ * after SMBJoin, or for the small tables, nothing could have been present between TableScan and
+ * SMBJoin.
+ */
+public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Serializable {
+
+  private transient InspectableObject result;
+
+  public DummyStoreOperator() {
+    super();
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    outputObjInspector = inputObjInspectors[0];
+    result = new InspectableObject(null, outputObjInspector);
+    initializeChildren(hconf);
+  }
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    // Store the row
+    result.o = row;
+  }
+
+  @Override
+  public void reset() {
+    result = new InspectableObject(null, result.oi);
+  }
+
+  public InspectableObject getResult() {
+    return result;
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.FORWARD;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Dec 19 01:20:56 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
@@ -91,6 +92,8 @@ public final class OperatorFactory {
         HashTableDummyOperator.class));
     opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class,
         HashTableSinkOperator.class));
+    opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class,
+        DummyStoreOperator.class));
   }
 
   public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Wed Dec 19 01:20:56 2012
@@ -142,7 +142,7 @@ public class SMBMapJoinOperator extends 
     super.initializeLocalWork(hconf);
   }
 
-  public void initializeMapredLocalWork(MapJoinDesc conf, Configuration hconf,
+  public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf,
       MapredLocalWork localWork, Log l4j) throws HiveException {
     if (localWork == null || localWorkInited) {
       return;
@@ -154,7 +154,13 @@ public class SMBMapJoinOperator extends 
     // create map local operators
     Map<String,FetchWork> aliasToFetchWork = localWork.getAliasToFetchWork();
     Map<String, Operator<? extends OperatorDesc>> aliasToWork = localWork.getAliasToWork();
+    Map<String, DummyStoreOperator> aliasToSinkWork = conf.getAliasToSink();
 
+    // The operator tree till the sink operator needs to be processed while
+    // fetching the next row to fetch from the priority queue (possibly containing
+    // multiple files in the small table given a file in the big table). The remaining
+    // tree will be processed while processing the join.
+    // Look at comments in DummyStoreOperator for additional explanation.
     for (Map.Entry<String, FetchWork> entry : aliasToFetchWork.entrySet()) {
       String alias = entry.getKey();
       FetchWork fetchWork = entry.getValue();
@@ -167,7 +173,9 @@ public class SMBMapJoinOperator extends 
       forwardOp.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()});
       fetchOp.clearFetchContext();
 
-      MergeQueue mergeQueue = new MergeQueue(alias, fetchWork, jobClone);
+      DummyStoreOperator sinkOp = aliasToSinkWork.get(alias);
+
+      MergeQueue mergeQueue = new MergeQueue(alias, fetchWork, jobClone, forwardOp, sinkOp);
 
       aliasToMergeQueue.put(alias, mergeQueue);
       l4j.info("fetch operators for " + alias + " initialized");
@@ -515,15 +523,20 @@ public class SMBMapJoinOperator extends 
     String table = tagToAlias.get(tag);
     MergeQueue mergeQueue = aliasToMergeQueue.get(table);
 
-    Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
-        .get(table);
+    // The operator tree till the sink operator has already been processed while
+    // fetching the next row to fetch from the priority queue (possibly containing
+    // multiple files in the small table given a file in the big table). Now, process
+    // the remaining tree. Look at comments in DummyStoreOperator for additional
+    // explanation.
+    Operator<? extends OperatorDesc> forwardOp =
+        conf.getAliasToSink().get(table).getChildOperators().get(0);
     try {
       InspectableObject row = mergeQueue.getNextRow();
       if (row == null) {
         fetchDone[tag] = true;
         return;
       }
-      forwardOp.process(row.o, 0);
+      forwardOp.process(row.o, tag);
       // check if any operator had a fatal error or early exit during
       // execution
       if (forwardOp.getDone()) {
@@ -624,15 +637,21 @@ public class SMBMapJoinOperator extends 
     transient FetchOperator[] segments;
     transient List<ExprNodeEvaluator> keyFields;
     transient List<ObjectInspector> keyFieldOIs;
+    transient Operator<? extends OperatorDesc> forwardOp;
+    transient DummyStoreOperator sinkOp;
 
     // index of FetchOperator which is providing smallest one
     transient Integer currentMinSegment;
     transient ObjectPair<List<Object>, InspectableObject>[] keys;
 
-    public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf) {
+    public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf,
+        Operator<? extends OperatorDesc> forwardOp,
+        DummyStoreOperator sinkOp) {
       this.alias = alias;
       this.fetchWork = fetchWork;
       this.jobConf = jobConf;
+      this.forwardOp = forwardOp;
+      this.sinkOp = sinkOp;
     }
 
     // paths = bucket files of small table for current bucket file of big table
@@ -684,6 +703,7 @@ public class SMBMapJoinOperator extends 
       }
     }
 
+    @Override
     protected boolean lessThan(Object a, Object b) {
       return compareKeys(keys[(Integer) a].getFirst(), keys[(Integer)b].getFirst()) < 0;
     }
@@ -730,20 +750,31 @@ public class SMBMapJoinOperator extends 
     // return true if current min segment(FetchOperator) has next row
     private boolean next(Integer current) throws IOException, HiveException {
       if (keyFields == null) {
-        // joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime
         byte tag = tagForAlias(alias);
+        // joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime
         keyFields = joinKeys.get(tag);
         keyFieldOIs = joinKeysObjectInspectors.get(tag);
       }
       InspectableObject nextRow = segments[current].getNextRow();
-      if (nextRow != null) {
+      while (nextRow != null) {
+        sinkOp.reset();
         if (keys[current] == null) {
           keys[current] = new ObjectPair<List<Object>, InspectableObject>();
         }
-        // todo this should be changed to be evaluated lazily, especially for single segment case
-        keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs));
-        keys[current].setSecond(nextRow);
-        return true;
+
+        // Pass the row though the operator tree. It is guaranteed that not more than 1 row can
+        // be produced from a input row.
+        forwardOp.process(nextRow.o, 0);
+        nextRow = sinkOp.getResult();
+
+        // It is possible that the row got absorbed in the operator tree.
+        if (nextRow.o != null) {
+          // todo this should be changed to be evaluated lazily, especially for single segment case
+          keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs));
+          keys[current].setSecond(nextRow);
+          return true;
+        }
+        nextRow = segments[current].getNextRow();
       }
       keys[current] = null;
       return false;

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1423731&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Wed Dec 19 01:20:56 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+/**
+ * this transformation does bucket map join optimization.
+ */
+abstract public class AbstractBucketJoinProc implements NodeProcessor {
+
+  private static final Log LOG = LogFactory.getLog(AbstractBucketJoinProc.class.getName());
+
+  public AbstractBucketJoinProc() {
+  }
+
+  @Override
+  abstract public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+      Object... nodeOutputs) throws SemanticException;
+
+  public List<String> toColumns(List<ExprNodeDesc> keys) {
+    List<String> columns = new ArrayList<String>();
+    for (ExprNodeDesc key : keys) {
+      if (!(key instanceof ExprNodeColumnDesc)) {
+        return null;
+      }
+      columns.add(((ExprNodeColumnDesc) key).getColumn());
+    }
+    return columns;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Wed Dec 19 01:20:56 2012
@@ -59,15 +59,15 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
- *this transformation does bucket map join optimization.
+ * this transformation does bucket map join optimization.
  */
 public class BucketMapJoinOptimizer implements Transform {
 
@@ -82,21 +82,21 @@ public class BucketMapJoinOptimizer impl
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx =
-      new BucketMapjoinOptProcCtx(pctx.getConf());
+        new BucketMapjoinOptProcCtx(pctx.getConf());
 
     // process map joins with no reducers pattern
     opRules.put(new RuleRegExp("R1",
-      MapJoinOperator.getOperatorName() + "%"),
-      getBucketMapjoinProc(pctx));
+        MapJoinOperator.getOperatorName() + "%"),
+        getBucketMapjoinProc(pctx));
     opRules.put(new RuleRegExp("R2",
-      ReduceSinkOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName()),
-      getBucketMapjoinRejectProc(pctx));
+        ReduceSinkOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName()),
+        getBucketMapjoinRejectProc(pctx));
     opRules.put(new RuleRegExp(new String("R3"),
-      UnionOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
-      getBucketMapjoinRejectProc(pctx));
+        UnionOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
+        getBucketMapjoinRejectProc(pctx));
     opRules.put(new RuleRegExp(new String("R4"),
-      MapJoinOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
-      getBucketMapjoinRejectProc(pctx));
+        MapJoinOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
+        getBucketMapjoinRejectProc(pctx));
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -113,7 +113,7 @@ public class BucketMapJoinOptimizer impl
   }
 
   private NodeProcessor getBucketMapjoinRejectProc(ParseContext pctx) {
-    return new NodeProcessor () {
+    return new NodeProcessor() {
       @Override
       public Object process(Node nd, Stack<Node> stack,
           NodeProcessorCtx procCtx, Object... nodeOutputs)
@@ -141,7 +141,7 @@ public class BucketMapJoinOptimizer impl
     };
   }
 
-  class BucketMapjoinOptProc implements NodeProcessor {
+  class BucketMapjoinOptProc extends AbstractBucketJoinProc implements NodeProcessor {
 
     protected ParseContext pGraphContext;
 
@@ -156,12 +156,12 @@ public class BucketMapJoinOptimizer impl
       BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
       HiveConf conf = context.getConf();
 
-      if(context.getListOfRejectedMapjoins().contains(mapJoinOp)) {
+      if (context.getListOfRejectedMapjoins().contains(mapJoinOp)) {
         return false;
       }
 
       QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext().get(mapJoinOp);
-      if(joinCxt == null) {
+      if (joinCxt == null) {
         return false;
       }
 
@@ -170,19 +170,27 @@ public class BucketMapJoinOptimizer impl
       String[] left = joinCxt.getLeftAliases();
       List<String> mapAlias = joinCxt.getMapAliases();
       String baseBigAlias = null;
-      for(String s : left) {
-        if(s != null && !joinAliases.contains(s)) {
-          joinAliases.add(s);
-          if(!mapAlias.contains(s)) {
-            baseBigAlias = s;
+
+      for (String s : left) {
+        if (s != null) {
+          String subQueryAlias = QB.getAppendedAliasFromId(joinCxt.getId(), s);
+          if (!joinAliases.contains(subQueryAlias)) {
+            joinAliases.add(subQueryAlias);
+            if(!mapAlias.contains(s)) {
+              baseBigAlias = subQueryAlias;
+            }
           }
         }
       }
-      for(String s : srcs) {
-        if(s != null && !joinAliases.contains(s)) {
-          joinAliases.add(s);
-          if(!mapAlias.contains(s)) {
-            baseBigAlias = s;
+
+      for (String s : srcs) {
+        if (s != null) {
+          String subQueryAlias = QB.getAppendedAliasFromId(joinCxt.getId(), s);
+          if (!joinAliases.contains(subQueryAlias)) {
+            joinAliases.add(subQueryAlias);
+            if(!mapAlias.contains(s)) {
+              baseBigAlias = subQueryAlias;
+            }
           }
         }
       }
@@ -194,7 +202,7 @@ public class BucketMapJoinOptimizer impl
           new LinkedHashMap<String, List<List<String>>>();
 
       Map<String, Operator<? extends OperatorDesc>> topOps =
-        this.pGraphContext.getTopOps();
+          this.pGraphContext.getTopOps();
       Map<TableScanOperator, Table> topToTable = this.pGraphContext.getTopToTable();
 
       // (partition to bucket file names) and (partition to bucket number) for
@@ -206,26 +214,60 @@ public class BucketMapJoinOptimizer impl
       boolean bigTablePartitioned = true;
       for (int index = 0; index < joinAliases.size(); index++) {
         String alias = joinAliases.get(index);
-        TableScanOperator tso = (TableScanOperator) topOps.get(alias);
-        if (tso == null) {
+        Operator<? extends OperatorDesc> topOp = joinCxt.getAliasToOpInfo().get(alias);
+        if (topOp == null) {
           return false;
         }
         List<String> keys = toColumns(mjDesc.getKeys().get((byte) index));
         if (keys == null || keys.isEmpty()) {
           return false;
         }
+        int oldKeySize = keys.size();
+        TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, keys);
+        if (tso == null) {
+          return false;
+        }
+
+        // For nested sub-queries, the alias mapping is not maintained in QB currently.
+        if (topOps.containsValue(tso)) {
+          for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry : topOps.entrySet()) {
+            if (topOpEntry.getValue() == tso) {
+              String newAlias = topOpEntry.getKey();
+              joinAliases.set(index, newAlias);
+              if (baseBigAlias.equals(alias)) {
+                baseBigAlias = newAlias;
+              }
+              alias = newAlias;
+              break;
+            }
+          }
+        }
+        else {
+          // Ideally, this should never happen, and this should be an assert.
+          return false;
+        }
+
+        // The join keys cannot be transformed in the sub-query currently.
+        // TableAccessAnalyzer.genRootTableScan will only return the base table scan
+        // if the join keys are constants or a column. Even a simple cast of the join keys
+        // will result in a null table scan operator. In case of constant join keys, they would
+        // be removed, and the size before and after the genRootTableScan will be different.
+        if (keys.size() != oldKeySize) {
+          return false;
+        }
         if (orders == null) {
           orders = new Integer[keys.size()];
         }
 
         Table tbl = topToTable.get(tso);
-        if(tbl.isPartitioned()) {
+        if (tbl.isPartitioned()) {
           PrunedPartitionList prunedParts;
           try {
             prunedParts = pGraphContext.getOpToPartList().get(tso);
             if (prunedParts == null) {
-              prunedParts = PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso), pGraphContext.getConf(), alias,
-                pGraphContext.getPrunedPartitions());
+              prunedParts = PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso),
+                  pGraphContext.getConf(), alias,
+                  pGraphContext.getPrunedPartitions());
               pGraphContext.getOpToPartList().put(tso, prunedParts);
             }
           } catch (HiveException e) {
@@ -238,7 +280,7 @@ public class BucketMapJoinOptimizer impl
           // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
           if (partitions.isEmpty()) {
             if (!alias.equals(baseBigAlias)) {
-              aliasToPartitionBucketNumberMapping.put(alias, Arrays.<Integer>asList());
+              aliasToPartitionBucketNumberMapping.put(alias, Arrays.<Integer> asList());
               aliasToPartitionBucketFileNamesMapping.put(alias, new ArrayList<List<String>>());
             }
           } else {
@@ -253,10 +295,10 @@ public class BucketMapJoinOptimizer impl
               int bucketCount = p.getBucketCount();
               if (fileNames.size() != bucketCount) {
                 String msg = "The number of buckets for table " +
-                  tbl.getTableName() + " partition " + p.getName() + " is " +
-                  p.getBucketCount() + ", whereas the number of files is " + fileNames.size();
+                    tbl.getTableName() + " partition " + p.getName() + " is " +
+                    p.getBucketCount() + ", whereas the number of files is " + fileNames.size();
                 throw new SemanticException(
-                  ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
+                    ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
               }
               if (alias.equals(baseBigAlias)) {
                 bigTblPartsToBucketFileNames.put(p, fileNames);
@@ -280,10 +322,10 @@ public class BucketMapJoinOptimizer impl
           // The number of files for the table should be same as number of buckets.
           if (fileNames.size() != num) {
             String msg = "The number of buckets for table " +
-              tbl.getTableName() + " is " + tbl.getNumBuckets() +
-              ", whereas the number of files is " + fileNames.size();
+                tbl.getTableName() + " is " + tbl.getNumBuckets() +
+                ", whereas the number of files is " + fileNames.size();
             throw new SemanticException(
-              ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
+                ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
           }
           if (alias.equals(baseBigAlias)) {
             bigTblPartsToBucketFileNames.put(null, fileNames);
@@ -308,10 +350,10 @@ public class BucketMapJoinOptimizer impl
       MapJoinDesc desc = mapJoinOp.getConf();
 
       Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
-        new LinkedHashMap<String, Map<String, List<String>>>();
+          new LinkedHashMap<String, Map<String, List<String>>>();
 
-      //sort bucket names for the big table
-      for(List<String> partBucketNames : bigTblPartsToBucketFileNames.values()) {
+      // sort bucket names for the big table
+      for (List<String> partBucketNames : bigTblPartsToBucketFileNames.values()) {
         Collections.sort(partBucketNames);
       }
 
@@ -333,7 +375,7 @@ public class BucketMapJoinOptimizer impl
 
         // for each bucket file in big table, get the corresponding bucket file
         // name in the small table.
-        //more than 1 partition in the big table, do the mapping for each partition
+        // more than 1 partition in the big table, do the mapping for each partition
         Iterator<Entry<Partition, List<String>>> bigTblPartToBucketNames =
             bigTblPartsToBucketFileNames.entrySet().iterator();
         Iterator<Entry<Partition, Integer>> bigTblPartToBucketNum = bigTblPartsToBucketNumber
@@ -376,17 +418,6 @@ public class BucketMapJoinOptimizer impl
       return null;
     }
 
-    private List<String> toColumns(List<ExprNodeDesc> keys) {
-      List<String> columns = new ArrayList<String>();
-      for (ExprNodeDesc key : keys) {
-        if (!(key instanceof ExprNodeColumnDesc)) {
-          return null;
-        }
-        columns.add(((ExprNodeColumnDesc) key).getColumn());
-      }
-      return columns;
-    }
-
     // convert partition to partition spec string
     private Map<String, List<String>> convert(Map<Partition, List<String>> mapping) {
       Map<String, List<String>> converted = new HashMap<String, List<String>>();
@@ -406,7 +437,7 @@ public class BucketMapJoinOptimizer impl
 
       for (int bindex = 0; bindex < bigTblBucketNameList.size(); bindex++) {
         ArrayList<String> resultFileNames = new ArrayList<String>();
-        for (int sindex = 0 ; sindex < smallTblBucketNums.size(); sindex++) {
+        for (int sindex = 0; sindex < smallTblBucketNums.size(); sindex++) {
           int smallTblBucketNum = smallTblBucketNums.get(sindex);
           List<String> smallTblFileNames = smallTblFilesList.get(sindex);
           if (bigTblBucketNum >= smallTblBucketNum) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java Wed Dec 19 01:20:56 2012
@@ -30,7 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -50,14 +50,12 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 
 //try to replace a bucket map join with a sorted merge map join
 public class SortedMergeBucketMapJoinOptimizer implements Transform {
@@ -104,7 +102,7 @@ public class SortedMergeBucketMapJoinOpt
     };
   }
 
-  class SortedMergeBucketMapjoinProc implements NodeProcessor {
+  class SortedMergeBucketMapjoinProc extends AbstractBucketJoinProc implements NodeProcessor {
     private ParseContext pGraphContext;
 
     public SortedMergeBucketMapjoinProc(ParseContext pctx) {
@@ -134,7 +132,9 @@ public class SortedMergeBucketMapJoinOpt
         return false;
       }
       String[] srcs = joinCxt.getBaseSrc();
-      int pos = 0;
+      for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
+        srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]);
+      }
 
       // All the tables/partitions columns should be sorted in the same order
       // For example, if tables A and B are being joined on columns c1, c2 and c3
@@ -142,15 +142,14 @@ public class SortedMergeBucketMapJoinOpt
       // c1, c2 and c3 are sorted in the same order.
       List<Order> sortColumnsFirstTable = new ArrayList<Order>();
 
-      for (String src : srcs) {
+      for (int pos = 0; pos < srcs.length; pos++) {
         tableSorted = tableSorted
             && isTableSorted(this.pGraphContext,
                              mapJoinOp,
                              joinCxt,
-                             src,
                              pos,
-                             sortColumnsFirstTable);
-        pos++;
+                             sortColumnsFirstTable,
+                             srcs);
       }
       if (!tableSorted) {
         //this is a mapjoin but not suit for a sort merge bucket map join. check outer joins
@@ -196,13 +195,55 @@ public class SortedMergeBucketMapJoinOpt
         this.pGraphContext.getListMapJoinOpsNoReducer().add(indexInListMapJoinNoReducer, smbJop);
       }
 
+      Map<String, DummyStoreOperator> aliasToSink =
+          new HashMap<String, DummyStoreOperator>();
+      // For all parents (other than the big table), insert a dummy store operator
+      /* Consider a query like:
+        *
+        * select * from
+        *   (subq1 --> has a filter)
+        *   join
+        *   (subq2 --> has a filter)
+        * on some key
+        *
+        * Let us assume that subq1 is the small table (either specified by the user or inferred
+        * automatically). The following operator tree will be created:
+        *
+        * TableScan (subq1) --> Select --> Filter --> DummyStore
+        *                                                         \
+        *                                                          \     SMBJoin
+        *                                                          /
+        *                                                         /
+        * TableScan (subq2) --> Select --> Filter
+        */
       List<? extends Operator> parentOperators = mapJoinOp.getParentOperators();
       for (int i = 0; i < parentOperators.size(); i++) {
         Operator par = parentOperators.get(i);
         int index = par.getChildOperators().indexOf(mapJoinOp);
         par.getChildOperators().remove(index);
-        par.getChildOperators().add(index, smbJop);
+        if (i == smbJoinDesc.getPosBigTable()) {
+          par.getChildOperators().add(index, smbJop);
+        }
+        else {
+          DummyStoreOperator dummyStoreOp = new DummyStoreOperator();
+          par.getChildOperators().add(index, dummyStoreOp);
+
+          List<Operator<? extends OperatorDesc>> childrenOps =
+              new ArrayList<Operator<? extends OperatorDesc>>();
+          childrenOps.add(smbJop);
+          dummyStoreOp.setChildOperators(childrenOps);
+
+          List<Operator<? extends OperatorDesc>> parentOps =
+              new ArrayList<Operator<? extends OperatorDesc>>();
+          parentOps.add(par);
+          dummyStoreOp.setParentOperators(parentOps);
+
+          aliasToSink.put(srcs[i], dummyStoreOp);
+          smbJop.getParentOperators().remove(i);
+          smbJop.getParentOperators().add(i, dummyStoreOp);
+        }
       }
+      smbJoinDesc.setAliasToSink(aliasToSink);
       List<? extends Operator> childOps = mapJoinOp.getChildOperators();
       for (int i = 0; i < childOps.size(); i++) {
         Operator child = childOps.get(i);
@@ -229,40 +270,74 @@ public class SortedMergeBucketMapJoinOpt
     private boolean isTableSorted(ParseContext pctx,
       MapJoinOperator op,
       QBJoinTree joinTree,
-      String alias,
       int pos,
-      List<Order> sortColumnsFirstTable)
+      List<Order> sortColumnsFirstTable,
+      String[] aliases)
       throws SemanticException {
-
-      Map<String, Operator<? extends OperatorDesc>> topOps = this.pGraphContext
-          .getTopOps();
+      String alias = aliases[pos];
       Map<TableScanOperator, Table> topToTable = this.pGraphContext
           .getTopToTable();
-      TableScanOperator tso = (TableScanOperator) topOps.get(alias);
+
+      /*
+       * Consider a query like:
+       *
+       * select -- mapjoin(subq1) --  * from
+       * (select a.key, a.value from tbl1 a) subq1
+       *   join
+       * (select a.key, a.value from tbl2 a) subq2
+       * on subq1.key = subq2.key;
+       *
+       * aliasToOpInfo contains the SelectOperator for subq1 and subq2.
+       * We need to traverse the tree (using TableAccessAnalyzer) to get to the base
+       * table. If the object being map-joined is a base table, then aliasToOpInfo
+       * contains the TableScanOperator, and TableAccessAnalyzer is a no-op.
+       */
+      Operator<? extends OperatorDesc> topOp = joinTree.getAliasToOpInfo().get(alias);
+      if (topOp == null) {
+        return false;
+      }
+      List<String> joinCols = toColumns(op.getConf().getKeys().get((byte) pos));
+      if (joinCols == null || joinCols.isEmpty()) {
+        return false;
+      }
+      TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, joinCols);
       if (tso == null) {
         return false;
       }
 
-      List<ExprNodeDesc> keys = op.getConf().getKeys().get((byte) pos);
-      // get all join columns from join keys stored in MapJoinDesc
-      List<String> joinCols = new ArrayList<String>();
-      List<ExprNodeDesc> joinKeys = new ArrayList<ExprNodeDesc>();
-      joinKeys.addAll(keys);
-      while (joinKeys.size() > 0) {
-        ExprNodeDesc node = joinKeys.remove(0);
-        if (node instanceof ExprNodeColumnDesc) {
-          joinCols.addAll(node.getCols());
-        } else if (node instanceof ExprNodeGenericFuncDesc) {
-          ExprNodeGenericFuncDesc udfNode = ((ExprNodeGenericFuncDesc) node);
-          GenericUDF udf = udfNode.getGenericUDF();
-          if (!FunctionRegistry.isDeterministic(udf)) {
-            return false;
+      // For nested sub-queries, the alias mapping is not maintained in QB currently.
+      /*
+       * Consider a query like:
+       *
+       * select count(*) from
+       *   (
+       *     select key, count(*) from
+       *       (
+       *         select --mapjoin(a)-- a.key as key, a.value as val1, b.value as val2
+       *         from tbl1 a join tbl2 b on a.key = b.key
+       *       ) subq1
+       *     group by key
+       *   ) subq2;
+       *
+       * The table alias should be subq2:subq1:a which needs to be fetched from topOps.
+       */
+      if (pGraphContext.getTopOps().containsValue(tso)) {
+        for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry :
+          this.pGraphContext.getTopOps().entrySet()) {
+          if (topOpEntry.getValue() == tso) {
+            alias = topOpEntry.getKey();
+            aliases[pos] = alias;
+            break;
           }
-          joinKeys.addAll(0, udfNode.getChildExprs());
         }
       }
+      else {
+        // Ideally, this should never happen, and this should be an assert.
+        return false;
+      }
 
       Table tbl = topToTable.get(tso);
+
       if (tbl.isPartitioned()) {
         PrunedPartitionList prunedParts = null;
         try {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Wed Dec 19 01:20:56 2012
@@ -76,7 +76,20 @@ public class QB {
     }
     qbp = new QBParseInfo(alias, isSubQ);
     qbm = new QBMetaData();
-    id = (outer_id == null ? alias : outer_id + ":" + alias);
+    id = getAppendedAliasFromId(outer_id, alias);
+  }
+
+  // For sub-queries, the id. and alias should be appended since same aliases can be re-used
+  // within different sub-queries.
+  // For a query like:
+  // select ...
+  //   (select * from T1 a where ...) subq1
+  //  join
+  //   (select * from T2 a where ...) subq2
+  // ..
+  // the alias is modified to subq1:a and subq2:a from a, to identify the right sub-query.
+  public static String getAppendedAliasFromId(String outer_id, String alias) {
+    return (outer_id == null ? alias : outer_id + ":" + alias);
   }
 
   public QBParseInfo getParseInfo() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java Wed Dec 19 01:20:56 2012
@@ -22,8 +22,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
 /**
  * Internal representation of the join tree.
  *
@@ -39,6 +43,11 @@ public class QBJoinTree implements Seria
   private JoinCond[] joinCond;
   private boolean noOuterJoin;
   private boolean noSemiJoin;
+  private Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo;
+
+  // The subquery identifier from QB.
+  // It is of the form topSubQuery:innerSubQuery:....:innerMostSubQuery
+  private String id;
 
   // keeps track of the right-hand-side table name of the left-semi-join, and
   // its list of join keys
@@ -74,6 +83,7 @@ public class QBJoinTree implements Seria
     noOuterJoin = true;
     noSemiJoin = true;
     rhsSemijoin = new HashMap<String, ArrayList<ASTNode>>();
+    aliasToOpInfo = new HashMap<String, Operator<? extends OperatorDesc>>();
   }
 
   /**
@@ -320,4 +330,20 @@ public class QBJoinTree implements Seria
   public void setFilterMap(int[][] filterMap) {
     this.filterMap = filterMap;
   }
+
+  public Map<String, Operator<? extends OperatorDesc>> getAliasToOpInfo() {
+    return aliasToOpInfo;
+  }
+
+  public void setAliasToOpInfo(Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo) {
+    this.aliasToOpInfo = aliasToOpInfo;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Dec 19 01:20:56 2012
@@ -5630,7 +5630,7 @@ public class SemanticAnalyzer extends Ba
   }
 
   private Operator genJoinOperator(QB qb, QBJoinTree joinTree,
-      HashMap<String, Operator> map) throws SemanticException {
+      Map<String, Operator> map) throws SemanticException {
     QBJoinTree leftChild = joinTree.getJoinSrc();
     Operator joinSrcOp = null;
     if (leftChild != null) {
@@ -5829,7 +5829,7 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
-  private Operator genJoinPlan(QB qb, HashMap<String, Operator> map)
+  private Operator genJoinPlan(QB qb, Map<String, Operator> map)
       throws SemanticException {
     QBJoinTree joinTree = qb.getQbJoinTree();
     Operator joinOp = genJoinOperator(qb, joinTree, map);
@@ -5841,7 +5841,7 @@ public class SemanticAnalyzer extends Ba
    * source operators. This procedure traverses the query tree recursively,
    */
   private void pushJoinFilters(QB qb, QBJoinTree joinTree,
-      HashMap<String, Operator> map) throws SemanticException {
+      Map<String, Operator> map) throws SemanticException {
     if (joinTree.getJoinSrc() != null) {
       pushJoinFilters(qb, joinTree.getJoinSrc(), map);
     }
@@ -5881,7 +5881,15 @@ public class SemanticAnalyzer extends Ba
     return cols;
   }
 
-  private QBJoinTree genUniqueJoinTree(QB qb, ASTNode joinParseTree)
+  // The join alias is modified before being inserted for consumption by sort-merge
+  // join queries. If the join is part of a sub-query the alias is modified to include
+  // the sub-query alias.
+  private String getModifiedAlias(QB qb, String alias) {
+    return QB.getAppendedAliasFromId(qb.getId(), alias);
+  }
+
+  private QBJoinTree genUniqueJoinTree(QB qb, ASTNode joinParseTree,
+      Map<String, Operator> aliasToOpInfo)
       throws SemanticException {
     QBJoinTree joinTree = new QBJoinTree();
     joinTree.setNoOuterJoin(false);
@@ -5920,6 +5928,9 @@ public class SemanticAnalyzer extends Ba
         } else {
           rightAliases.add(alias);
         }
+        joinTree.getAliasToOpInfo().put(
+            getModifiedAlias(qb, alias), aliasToOpInfo.get(alias));
+        joinTree.setId(qb.getId());
         baseSrc.add(alias);
 
         preserved.add(lastPreserved);
@@ -5977,7 +5988,8 @@ public class SemanticAnalyzer extends Ba
     return joinTree;
   }
 
-  private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree)
+  private QBJoinTree genJoinTree(QB qb, ASTNode joinParseTree,
+      Map<String, Operator> aliasToOpInfo)
       throws SemanticException {
     QBJoinTree joinTree = new QBJoinTree();
     JoinCond[] condn = new JoinCond[1];
@@ -6024,8 +6036,11 @@ public class SemanticAnalyzer extends Ba
       String[] children = new String[2];
       children[0] = alias;
       joinTree.setBaseSrc(children);
+      joinTree.setId(qb.getId());
+      joinTree.getAliasToOpInfo().put(
+          getModifiedAlias(qb, alias), aliasToOpInfo.get(alias));
     } else if (isJoinToken(left)) {
-      QBJoinTree leftTree = genJoinTree(qb, left);
+      QBJoinTree leftTree = genJoinTree(qb, left, aliasToOpInfo);
       joinTree.setJoinSrc(leftTree);
       String[] leftChildAliases = leftTree.getLeftAliases();
       String leftAliases[] = new String[leftChildAliases.length + 1];
@@ -6054,6 +6069,10 @@ public class SemanticAnalyzer extends Ba
       }
       children[1] = alias;
       joinTree.setBaseSrc(children);
+      aliasToOpInfo.get(alias);
+      joinTree.setId(qb.getId());
+      joinTree.getAliasToOpInfo().put(
+          getModifiedAlias(qb, alias), aliasToOpInfo.get(alias));
       // remember rhs table for semijoin
       if (joinTree.getNoSemiJoin() == false) {
         joinTree.addRHSSemijoin(alias);
@@ -6158,6 +6177,7 @@ public class SemanticAnalyzer extends Ba
       rightAliases[i + trgtRightAliases.length] = nodeRightAliases[i];
     }
     target.setRightAliases(rightAliases);
+    target.getAliasToOpInfo().putAll(node.getAliasToOpInfo());
 
     String[] nodeBaseSrc = node.getBaseSrc();
     String[] trgtBaseSrc = target.getBaseSrc();
@@ -7477,7 +7497,7 @@ public class SemanticAnalyzer extends Ba
   public Operator genPlan(QB qb) throws SemanticException {
 
     // First generate all the opInfos for the elements in the from clause
-    HashMap<String, Operator> aliasToOpInfo = new HashMap<String, Operator>();
+    Map<String, Operator> aliasToOpInfo = new HashMap<String, Operator>();
 
     // Recurse over the subqueries to fill the subquery part of the plan
     for (String alias : qb.getSubqAliases()) {
@@ -7503,10 +7523,10 @@ public class SemanticAnalyzer extends Ba
       ASTNode joinExpr = qb.getParseInfo().getJoinExpr();
 
       if (joinExpr.getToken().getType() == HiveParser.TOK_UNIQUEJOIN) {
-        QBJoinTree joinTree = genUniqueJoinTree(qb, joinExpr);
+        QBJoinTree joinTree = genUniqueJoinTree(qb, joinExpr, aliasToOpInfo);
         qb.setQbJoinTree(joinTree);
       } else {
-        QBJoinTree joinTree = genJoinTree(qb, joinExpr);
+        QBJoinTree joinTree = genJoinTree(qb, joinExpr, aliasToOpInfo);
         qb.setQbJoinTree(joinTree);
         mergeJoinTree(qb);
       }
@@ -7542,7 +7562,7 @@ public class SemanticAnalyzer extends Ba
    * @throws SemanticException
    */
 
-  void genLateralViewPlans(HashMap<String, Operator> aliasToOpInfo, QB qb)
+  void genLateralViewPlans(Map<String, Operator> aliasToOpInfo, QB qb)
       throws SemanticException {
     Map<String, ArrayList<ASTNode>> aliasToLateralViews = qb.getParseInfo()
         .getAliasToLateralViews();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java Wed Dec 19 01:20:56 2012
@@ -226,10 +226,9 @@ public class TableAccessAnalyzer {
    * names on that table that map to the keys used for the input
    * operator (which is currently only a join or group by).
    */
-  private static TableScanOperator genRootTableScan(
+  public static TableScanOperator genRootTableScan(
       Operator<? extends OperatorDesc> op, List<String> keyNames) {
 
-    boolean complexTree = false;
     Operator<? extends OperatorDesc> currOp = op;
     List<String> currColNames = keyNames;
     List<Operator<? extends OperatorDesc>> parentOps = null;
@@ -238,26 +237,24 @@ public class TableAccessAnalyzer {
     // along the way that changes the rows from the table through
     // joins or aggregations. Only allowed operators are selects
     // and filters.
-    while (!complexTree) {
+    while (true) {
       parentOps = currOp.getParentOperators();
       if (parentOps == null) {
-        break;
+        return (TableScanOperator) currOp;
       }
 
       if (parentOps.size() > 1 ||
           !(currOp.columnNamesRowResolvedCanBeObtained())) {
-        complexTree = true;
+        return null;
       } else {
         // Generate the map of the input->output column name for the keys
         // we are about
         if (!TableAccessAnalyzer.genColNameMap(currOp, currColNames)) {
-          complexTree = true;
+          return null;
         }
         currOp = parentOps.get(0);
       }
     }
-
-    return complexTree? null: (TableScanOperator) currOp;
   }
 
   /*

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java?rev=1423731&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DummyStoreDesc.java Wed Dec 19 01:20:56 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+
+/**
+ * Dummy Store Desc. This is only used by sort-merge joins to store the
+ * result for the small table (sub-query) being scanned.
+ */
+@Explain(displayName = "Dummy Store")
+public class DummyStoreDesc extends AbstractOperatorDesc {
+  private static final long serialVersionUID = 1L;
+
+  public DummyStoreDesc() {
+  }
+
+  @Override
+  public DummyStoreDesc clone() {
+    return new DummyStoreDesc();
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java?rev=1423731&r1=1423730&r2=1423731&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java Wed Dec 19 01:20:56 2012
@@ -20,16 +20,20 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 
 @Explain(displayName = "Sorted Merge Bucket Map Join Operator")
 public class SMBJoinDesc extends MapJoinDesc implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  
+
   private MapredLocalWork localWork;
-  
-  //keep a mapping from tag to the fetch operator alias 
+
+  //keep a mapping from tag to the fetch operator alias
   private HashMap<Byte, String> tagToAlias;
+  private Map<String, DummyStoreOperator> aliasToSink;
 
   public SMBJoinDesc(MapJoinDesc conf) {
     super(conf);
@@ -53,5 +57,12 @@ public class SMBJoinDesc extends MapJoin
   public void setTagToAlias(HashMap<Byte, String> tagToAlias) {
     this.tagToAlias = tagToAlias;
   }
-  
+
+  public Map<String, DummyStoreOperator> getAliasToSink() {
+    return aliasToSink;
+  }
+
+  public void setAliasToSink(Map<String, DummyStoreOperator> aliasToSink) {
+    this.aliasToSink = aliasToSink;
+  }
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q?rev=1423731&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q Wed Dec 19 01:20:56 2012
@@ -0,0 +1,280 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 1;
+
+CREATE TABLE tbl1(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE tbl2(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+insert overwrite table tbl1
+select * from src where key < 10;
+
+insert overwrite table tbl2
+select * from src where key < 10;
+
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+
+-- The mapjoin is being performed as part of sub-query. It should be converted to a sort-merge join
+explain
+select count(*) from (
+  select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1;
+
+select count(*) from (
+  select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1;
+
+-- The mapjoin is being performed as part of sub-query. It should be converted to a sort-merge join
+-- Add a order by at the end to make the results deterministic.
+explain
+select key, count(*) from 
+(
+  select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1
+group by key
+order by key;
+
+select key, count(*) from 
+(
+  select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1
+group by key
+order by key;
+
+-- The mapjoin is being performed as part of more than one sub-query. It should be converted to a sort-merge join
+explain
+select count(*) from
+(
+  select key, count(*) from 
+  (
+    select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq1
+  group by key
+) subq2;
+
+select count(*) from
+(
+  select key, count(*) from 
+  (
+    select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq1
+  group by key
+) subq2;
+
+-- A join is being performed across different sub-queries, where a mapjoin is being performed in each of them.
+-- Each sub-query should be converted to a sort-merge join.
+explain
+select src1.key, src1.cnt1, src2.cnt1 from
+(
+  select key, count(*) as cnt1 from 
+  (
+    select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq1 group by key
+) src1
+join
+(
+  select key, count(*) as cnt1 from 
+  (
+    select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq2 group by key
+) src2
+on src1.key = src2.key
+order by src1.key, src1.cnt1, src2.cnt1;
+
+select src1.key, src1.cnt1, src2.cnt1 from
+(
+  select key, count(*) as cnt1 from 
+  (
+    select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq1 group by key
+) src1
+join
+(
+  select key, count(*) as cnt1 from 
+  (
+    select /*+mapjoin(a)*/ a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq2 group by key
+) src2
+on src1.key = src2.key
+order by src1.key, src1.cnt1, src2.cnt1;
+
+-- The subquery itself is being map-joined. Since the sub-query only contains selects and filters, it should 
+-- be converted to a sort-merge join.
+explain
+select /*+mapjoin(subq1)*/ count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+  on subq1.key = subq2.key;
+
+select /*+mapjoin(subq1)*/ count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+  on subq1.key = subq2.key;
+
+-- The subquery itself is being map-joined. Since the sub-query only contains selects and filters, it should 
+-- be converted to a sort-merge join, although there is more than one level of sub-query
+explain
+select /*+mapjoin(subq2)*/ count(*) from 
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1 
+  where key < 6
+  ) subq2
+  join tbl2 b
+  on subq2.key = b.key;
+
+select /*+mapjoin(subq2)*/ count(*) from 
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1 
+  where key < 6
+  ) subq2
+  join tbl2 b
+  on subq2.key = b.key;
+
+-- Both the big table and the small table are nested sub-queries i.e more then 1 level of sub-query.
+-- The join should be converted to a sort-merge join
+explain
+select /*+mapjoin(subq2)*/ count(*) from 
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1 
+  where key < 6
+  ) subq2
+  join
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq3 
+  where key < 6
+  ) subq4
+  on subq2.key = subq4.key;
+
+select /*+mapjoin(subq2)*/ count(*) from 
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1 
+  where key < 6
+  ) subq2
+  join
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq3 
+  where key < 6
+  ) subq4
+  on subq2.key = subq4.key;
+
+-- The subquery itself is being map-joined. Since the sub-query only contains selects and filters and the join key
+-- is not getting modified, it should be converted to a sort-merge join. Note that the sub-query modifies one 
+-- item, but that is not part of the join key.
+explain
+select /*+mapjoin(subq1)*/ count(*) from 
+  (select a.key as key, concat(a.value, a.value) as value from tbl1 a where key < 8) subq1 
+    join
+  (select a.key as key, concat(a.value, a.value) as value from tbl2 a where key < 8) subq2
+  on subq1.key = subq2.key;
+
+select /*+mapjoin(subq1)*/ count(*) from 
+  (select a.key as key, concat(a.value, a.value) as value from tbl1 a where key < 8) subq1 
+    join
+  (select a.key as key, concat(a.value, a.value) as value from tbl2 a where key < 8) subq2
+  on subq1.key = subq2.key;
+
+-- Since the join key is modified by the sub-query, neither sort-merge join not bucketized map-side
+-- join should be performed
+explain
+select /*+mapjoin(subq1)*/ count(*) from 
+  (select a.key +1 as key, concat(a.value, a.value) as value from tbl1 a) subq1 
+    join
+  (select a.key +1 as key, concat(a.value, a.value) as value from tbl2 a) subq2
+  on subq1.key = subq2.key;
+
+select /*+mapjoin(subq1)*/ count(*) from 
+  (select a.key +1 as key, concat(a.value, a.value) as value from tbl1 a) subq1 
+    join
+  (select a.key +1 as key, concat(a.value, a.value) as value from tbl2 a) subq2
+  on subq1.key = subq2.key;
+
+-- The small table is a sub-query and the big table is not.
+-- It should be converted to a sort-merge join.
+explain
+select /*+mapjoin(subq1)*/ count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join tbl2 a on subq1.key = a.key;
+
+select /*+mapjoin(subq1)*/ count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join tbl2 a on subq1.key = a.key;
+
+-- The big table is a sub-query and the small table is not.
+-- It should be converted to a sort-merge join.
+explain
+select /*+mapjoin(a)*/ count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join tbl2 a on subq1.key = a.key;
+
+select /*+mapjoin(a)*/ count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join tbl2 a on subq1.key = a.key;
+
+-- There are more than 2 inputs to the join, all of them being sub-queries. 
+-- It should be converted to to a sort-merge join
+explain
+select /*+mapjoin(subq1, subq2)*/ count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+  on (subq1.key = subq2.key)
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq3
+  on (subq1.key = subq3.key);
+
+select /*+mapjoin(subq1, subq2)*/ count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+  on subq1.key = subq2.key
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq3
+  on (subq1.key = subq3.key);
+
+-- The mapjoin is being performed on a nested sub-query, and an aggregation is performed after that.
+-- The join should be converted to a sort-merge join
+explain
+select count(*) from (
+  select /*+mapjoin(subq2)*/ subq2.key as key, subq2.value as value1, b.value as value2 from
+  (
+    select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1
+    where key < 6
+  ) subq2
+join tbl2 b
+on subq2.key = b.key) a;
+
+select count(*) from (
+  select /*+mapjoin(subq2)*/ subq2.key as key, subq2.value as value1, b.value as value2 from
+  (
+    select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1
+    where key < 6
+  ) subq2
+join tbl2 b
+on subq2.key = b.key) a;