You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/08 05:32:02 UTC

svn commit: r1530140 - in /hive/branches/tez: eclipse-templates/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/java/org/apache/hadoop/hive/ql/o...

Author: gunther
Date: Tue Oct  8 03:32:00 2013
New Revision: 1530140

URL: http://svn.apache.org/r1530140
Log:
HIVE-5270: Enable hash joins using tez (Gunther Hagleitner)

Modified:
    hive/branches/tez/eclipse-templates/.classpath
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java

Modified: hive/branches/tez/eclipse-templates/.classpath
URL: http://svn.apache.org/viewvc/hive/branches/tez/eclipse-templates/.classpath?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/eclipse-templates/.classpath (original)
+++ hive/branches/tez/eclipse-templates/.classpath Tue Oct  8 03:32:00 2013
@@ -50,6 +50,7 @@
   <classpathentry kind="lib" path="build/ivy/lib/default/jline-@jline.version@.jar"/>
   <classpathentry kind="lib" path="build/ivy/lib/default/json-@json.version@.jar"/>
   <classpathentry kind="lib" path="build/ivy/lib/default/commons-compress-@commons-compress.version@.jar"/>
+  <classpathentry kind="lib" path="build/ivy/lib/default/commons-codec-@commons-codec.version@.jar"/>
   <classpathentry kind="lib" path="build/ivy/lib/default/commons-lang-@commons-lang.version@.jar"/>
   <classpathentry kind="lib" path="build/ivy/lib/default/commons-logging-@commons-logging.version@.jar"/>
   <classpathentry kind="lib" path="build/ivy/lib/default/commons-logging-api-@commons-logging-api.version@.jar"/>
@@ -93,15 +94,15 @@
   <classpathentry kind="lib" path="build/ivy/lib/default/tempus-fugit-@tempus-fugit.version@.jar"/>
   <classpathentry kind="lib" path="build/ivy/lib/default/tez-common-@tez.version@.jar"/>
   <classpathentry kind="lib" path="build/ivy/lib/default/tez-dag-@tez.version@.jar"/>
-  <classpathentry kind="lib" path="build/ivy/lib/default/tez-dag-api-@tez.version@.jar"/>
-  <classpathentry kind="lib" path="build/ivy/lib/default/tez-engine-@tez.version@.jar"/>
-  <classpathentry kind="lib" path="build/ivy/lib/default/tez-engine-api-@tez.version@.jar"/>
+  <classpathentry kind="lib" path="build/ivy/lib/default/tez-api-@tez.version@.jar"/>
+  <classpathentry kind="lib" path="build/ivy/lib/default/tez-runtime-library-@tez.version@.jar"/>
   <classpathentry kind="lib" path="build/ivy/lib/default/tez-mapreduce-@tez.version@.jar"/>
   <classpathentry kind="src" path="build/contrib/test/src"/>
   <classpathentry kind="src" path="build/metastore/gen/antlr/gen-java"/>
   <classpathentry kind="lib" path="build/testutils/hive-testutils-@HIVE_VERSION@.jar"/>
   <classpathentry kind="src" path="build/ql/test/src"/>
   <classpathentry kind="src" path="build/ql/gen/antlr/gen-java"/>
+  <classpathentry kind="src" path="build/ql/gen/vector"/>
   <classpathentry kind="src" path="beeline/src/java"/>
   <classpathentry kind="src" path="beeline/src/test"/>
   <classpathentry kind="src" path="cli/src/java"/>

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Tue Oct  8 03:32:00 2013
@@ -68,6 +68,20 @@ public class MapJoinOperator extends Abs
     super(mjop);
   }
 
+  /*
+   * We need the base (operator.java) implementation of start/endGroup.
+   * The parent class has functionality in those that map join can't use.
+   */
+  @Override
+  public void endGroup() throws HiveException {
+    defaultEndGroup();
+  }
+
+  @Override
+  public void startGroup() throws HiveException {
+    defaultStartGroup();
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
@@ -126,7 +140,8 @@ public class MapJoinOperator extends Abs
 
   private void loadHashTable() throws HiveException {
 
-    if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+    if (this.getExecContext().getLocalWork() == null
+        || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
       if (hashTblInitedOnce) {
         return;
       } else {
@@ -159,8 +174,8 @@ public class MapJoinOperator extends Abs
   public void processOp(Object row, int tag) throws HiveException {
     try {
       if (firstRow) {
-        // generate the map metadata
         generateMapMetaData();
+        loadHashTable();
         firstRow = false;
       }
       alias = (byte)tag;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct  8 03:32:00 2013
@@ -515,8 +515,7 @@ public abstract class Operator<T extends
     }
   }
 
-  // If a operator wants to do some work at the beginning of a group
-  public void startGroup() throws HiveException {
+  protected final void defaultStartGroup() throws HiveException {
     LOG.debug("Starting group");
 
     if (childOperators == null) {
@@ -535,8 +534,7 @@ public abstract class Operator<T extends
     LOG.debug("Start group Done");
   }
 
-  // If an operator wants to do some work at the end of a group
-  public void endGroup() throws HiveException {
+  protected final void defaultEndGroup() throws HiveException {
     LOG.debug("Ending group");
 
     if (childOperators == null) {
@@ -555,6 +553,16 @@ public abstract class Operator<T extends
     LOG.debug("End group Done");
   }
 
+  // If a operator wants to do some work at the beginning of a group
+  public void startGroup() throws HiveException {
+    defaultStartGroup();
+  }
+
+  // If an operator wants to do some work at the end of a group
+  public void endGroup() throws HiveException {
+    defaultEndGroup();
+  }
+
   // an blocking operator (e.g. GroupByOperator and JoinOperator) can
   // override this method to forward its outputs
   public void flush() throws HiveException {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java Tue Oct  8 03:32:00 2013
@@ -79,21 +79,27 @@ public class MapJoinKey {
       return false;
     return true;
   }
-  @SuppressWarnings("unchecked")
-  public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) 
+
+  public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
   throws IOException, SerDeException {
-    SerDe serde = context.getSerDe();
     container.readFields(in);
+    read(context, container);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void read(MapJoinObjectSerDeContext context, Writable container) throws SerDeException {
+    SerDe serde = context.getSerDe();
     List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
         serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+
     if(value == null) {
       key = EMPTY_OBJECT_ARRAY;
     } else {
       key = value.toArray();
     }
   }
-  
-  public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) 
+
+  public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
   throws IOException, SerDeException {
     SerDe serde = context.getSerDe();
     ObjectInspector objectInspector = context.getStandardOI();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java Tue Oct  8 03:32:00 2013
@@ -104,30 +104,34 @@ public class MapJoinRowContainer extends
     }
     return result;
   }
-  
-  @SuppressWarnings({"unchecked"})
-  public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) 
+
+  public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
   throws IOException, SerDeException {
     clear();
-    SerDe serde = context.getSerDe();
     long numRows = in.readLong();
     for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
-      container.readFields(in);      
-      List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
-          serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
-      if(value == null) {
-        add(toList(EMPTY_OBJECT_ARRAY));
-      } else {
-        Object[] valuesArray = value.toArray();
-        if (context.hasFilterTag()) {
-          aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get();
-        }
-        add(toList(valuesArray));
+      container.readFields(in);
+      read(context, container);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void read(MapJoinObjectSerDeContext context, Writable currentValue) throws SerDeException {
+    SerDe serde = context.getSerDe();
+    List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(currentValue),
+        serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+    if(value == null) {
+      add(toList(EMPTY_OBJECT_ARRAY));
+    } else {
+      Object[] valuesArray = value.toArray();
+      if (context.hasFilterTag()) {
+        aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get();
       }
+      add(toList(valuesArray));
     }
   }
-  
-  public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) 
+
+  public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
   throws IOException, SerDeException {
     SerDe serde = context.getSerDe();
     ObjectInspector valueObjectInspector = context.getStandardOI();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Tue Oct  8 03:32:00 2013
@@ -40,6 +40,14 @@ public class MapJoinTableContainerSerDe 
     this.keyContext = keyContext;
     this.valueContext = valueContext;
   }
+
+  public MapJoinObjectSerDeContext getKeyContext() {
+    return keyContext;
+  }
+  public MapJoinObjectSerDeContext getValueContext() {
+    return valueContext;
+  }
+
   @SuppressWarnings({"unchecked"})
   public MapJoinTableContainer load(ObjectInputStream in) 
       throws HiveException {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Tue Oct  8 03:32:00 2013
@@ -17,18 +17,27 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
  * HashTableLoader for Tez constructs the hashtable from records read from
@@ -41,7 +50,6 @@ public class HashTableLoader implements 
   public HashTableLoader() {
   }
 
-  @SuppressWarnings("unused")
   @Override
   public void load(ExecMapperContext context,
       Configuration hconf,
@@ -49,8 +57,43 @@ public class HashTableLoader implements 
       byte posBigTable,
       MapJoinTableContainer[] mapJoinTables,
       MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+
     TezContext tezContext = (TezContext) MapredContext.get();
     Map<Integer, String> parentToInput = desc.getParentToInput();
+    int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
+    float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
+        HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
+
+    for (int pos = 0; pos < mapJoinTables.length; pos++) {
+      if (pos == posBigTable) {
+        continue;
+      }
+
+      LogicalInput input = tezContext.getInput(parentToInput.get(pos));
+
+      try {
+        KeyValueReader kvReader = (KeyValueReader) input.getReader();
+
+        MapJoinTableContainer tableContainer = new HashMapWrapper(hashTableThreshold,
+            hashTableLoadFactor);
+
+        // simply read all the kv pairs into the hashtable.
+        while (kvReader.next()) {
+          MapJoinKey key = new MapJoinKey();
+          key.read(mapJoinTableSerdes[pos].getKeyContext(), (Writable)kvReader.getCurrentKey());
+          MapJoinRowContainer values = new MapJoinRowContainer();
+          values.read(mapJoinTableSerdes[pos].getValueContext(), (Writable)kvReader.getCurrentValue());
+          tableContainer.put(key, values);
+        }
+
+        mapJoinTables[pos] = tableContainer;
+      } catch (IOException e) {
+        throw new HiveException(e);
+      } catch (SerDeException e) {
+        throw new HiveException(e);
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+    }
   }
-
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Oct  8 03:32:00 2013
@@ -18,20 +18,24 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -53,6 +57,7 @@ public class MapRecordProcessor  extends
   private final ExecMapperContext execContext = new ExecMapperContext();
   private boolean abort = false;
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
+  private MapWork mapWork;
 
   @Override
   void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
@@ -73,15 +78,15 @@ public class MapRecordProcessor  extends
 
       execContext.setJc(jconf);
       // create map and fetch operators
-      MapWork mrwork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
-      if (mrwork == null) {
-        mrwork = Utilities.getMapWork(jconf);
-        cache.cache(MAP_PLAN_KEY, mrwork);
+      mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+      if (mapWork == null) {
+        mapWork = Utilities.getMapWork(jconf);
+        cache.cache(MAP_PLAN_KEY, mapWork);
       }
       mapOp = new MapOperator();
 
       // initialize map operator
-      mapOp.setConf(mrwork);
+      mapOp.setConf(mapWork);
       mapOp.setChildren(jconf);
       l4j.info(mapOp.dump(0));
 
@@ -91,6 +96,17 @@ public class MapRecordProcessor  extends
       mapOp.initializeLocalWork(jconf);
       mapOp.initialize(jconf, null);
 
+      // Initialization isn't finished until all parents of all operators
+      // are initialized. For broadcast joins that means initializing the
+      // dummy parent operators as well.
+      List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps();
+      if (dummyOps != null) {
+        for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+          dummyOp.setExecContext(execContext);
+          dummyOp.initialize(jconf, null);
+        }
+      }
+
       mapOp.setOutputCollector(out);
       mapOp.setReporter(reporter);
       MapredContext.get().setReporter(reporter);
@@ -124,10 +140,6 @@ public class MapRecordProcessor  extends
 
   @Override
   void run() throws IOException{
-    if (inputs.size() != 1) {
-      throw new IllegalArgumentException("MapRecordProcessor expects single input"
-          + ", inputCount=" + inputs.size());
-    }
 
     MRInput in = getMRInput(inputs);
     KeyValueReader reader = in.getReader();
@@ -186,6 +198,17 @@ public class MapRecordProcessor  extends
     // detecting failed executions by exceptions thrown by the operator tree
     try {
       mapOp.close(abort);
+
+      // Need to close the dummyOps as well. The operator pipeline
+      // is not considered "closed/done" unless all operators are
+      // done. For broadcast joins that includes the dummy parents.
+      List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps();
+      if (dummyOps != null) {
+        for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+          dummyOp.close(abort);
+        }
+      }
+
       if (isLogInfoEnabled) {
         logCloseInfo();
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Oct  8 03:32:00 2013
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -77,6 +79,8 @@ public class ReduceRecordProcessor  exte
   private Object keyObject = null;
   private BytesWritable groupKey;
 
+  private ReduceWork redWork;
+
   List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
 
   @Override
@@ -90,7 +94,7 @@ public class ReduceRecordProcessor  exte
     ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
     ObjectInspector keyObjectInspector;
 
-    ReduceWork redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY);
+    redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY);
     if (redWork == null) {
       redWork = Utilities.getReduceWork(jconf);
       cache.cache(REDUCE_PLAN_KEY, redWork);
@@ -134,6 +138,18 @@ public class ReduceRecordProcessor  exte
     try {
       l4j.info(reducer.dump(0));
       reducer.initialize(jconf, rowObjectInspector);
+
+      // Initialization isn't finished until all parents of all operators
+      // are initialized. For broadcast joins that means initializing the
+      // dummy parent operators as well.
+      List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
+      if (dummyOps != null) {
+        for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+          dummyOp.setExecContext(execContext);
+          dummyOp.initialize(jconf, null);
+        }
+      }
+
     } catch (Throwable e) {
       abort = true;
       if (e instanceof OutOfMemoryError) {
@@ -153,10 +169,6 @@ public class ReduceRecordProcessor  exte
 
   @Override
   void run() throws IOException{
-    if (inputs.size() != 1) {
-      throw new IllegalArgumentException("ReduceRecordProcessor expects single input"
-          + ", inputCount=" + inputs.size());
-    }
 
     //TODO - changes this for joins
     ShuffledMergedInput in = (ShuffledMergedInput)inputs.values().iterator().next();
@@ -299,6 +311,16 @@ public class ReduceRecordProcessor  exte
       }
 
       reducer.close(abort);
+
+      // Need to close the dummyOps as well. The operator pipeline
+      // is not considered "closed/done" unless all operators are
+      // done. For broadcast joins that includes the dummy parents.
+      List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
+      if (dummyOps != null) {
+        for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+          dummyOp.close(abort);
+        }
+      }
       reportStats rps = new reportStats(reporter);
       reducer.preorderMap(rps);
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Tue Oct  8 03:32:00 2013
@@ -527,6 +527,7 @@ abstract public class AbstractSMBJoinPro
     SortBucketJoinProcCtx joinContext,
     ParseContext parseContext) throws SemanticException {
     MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin(
+      parseContext.getConf(), 
       parseContext.getOpParseCtx(),
       joinOp,
       pGraphContext.getJoinContext().get(joinOp),

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Tue Oct  8 03:32:00 2013
@@ -95,7 +95,7 @@ public class ConvertJoinMapJoin implemen
 
       long inputSize = currInputStat.getNumberOfBytes();
       if ((bigInputStat == null) ||
-          ((bigInputStat != null) && 
+          ((bigInputStat != null) &&
            (inputSize > bigInputStat.getNumberOfBytes()))) {
 
         if (bigTableFound) {
@@ -141,7 +141,7 @@ public class ConvertJoinMapJoin implemen
     }
 
     if (bigTablePosition == -1) {
-      // all tables have size 0. We let the suffle join handle this case.
+      // all tables have size 0. We let the shuffle join handle this case.
       return null;
     }
 
@@ -161,7 +161,7 @@ public class ConvertJoinMapJoin implemen
     // convert to a map join operator with this information
     ParseContext parseContext = context.parseContext;
     MapJoinOperator mapJoinOp = MapJoinProcessor.
-      convertJoinOpMapJoinOp(parseContext.getOpParseCtx(),
+      convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
       joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true, false);
 
     Operator<? extends OperatorDesc> parentBigTableOp

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Tue Oct  8 03:32:00 2013
@@ -237,13 +237,14 @@ public class MapJoinProcessor implements
    * @return the alias to the big table
    * @throws SemanticException
    */
-  public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
+  public static String genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork, 
+    JoinOperator op, int mapJoinPos)
       throws SemanticException {
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
         newWork.getMapWork().getOpParseCtxMap();
     QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
     // generate the map join operator; already checked the map join
-    MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
+    MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(conf, opParseCtxMap, op,
         newJoinTree, mapJoinPos, true, false);
     return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
   }
@@ -316,7 +317,7 @@ public class MapJoinProcessor implements
    *          are cached in memory
    * @param noCheckOuterJoin
    */
-  public static MapJoinOperator convertMapJoin(
+  public static MapJoinOperator convertMapJoin(HiveConf conf,
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
     JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
     boolean validateMapJoinTree)
@@ -374,7 +375,7 @@ public class MapJoinProcessor implements
     }
 
     // create the map-join operator
-    MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(opParseCtxMap,
+    MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
         op, joinTree, mapJoinPos, noCheckOuterJoin, validateMapJoinTree);
 
 
@@ -395,7 +396,7 @@ public class MapJoinProcessor implements
     return mapJoinOp;
   }
 
-  public static MapJoinOperator convertJoinOpMapJoinOp(
+  public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
       LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
       JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
       boolean validateMapJoinTree)
@@ -433,9 +434,6 @@ public class MapJoinProcessor implements
       if (src != null) {
         Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
         assert parentOp.getParentOperators().size() == 1;
-        Operator<? extends OperatorDesc> grandParentOp =
-          parentOp.getParentOperators().get(0);
-
         oldReduceSinkParentOps.add(parentOp);
       }
       pos++;
@@ -536,8 +534,8 @@ public class MapJoinProcessor implements
     }
 
     List<String> outputColumnNames = op.getConf().getOutputColumnNames();
-    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
-        .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
+        PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
     JoinCondDesc[] joinCondns = op.getConf().getConds();
     MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
         valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns,
@@ -589,14 +587,14 @@ public class MapJoinProcessor implements
    *          are cached in memory
    * @param noCheckOuterJoin
    */
-  public static MapJoinOperator convertSMBJoinToMapJoin(
+  public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf,
     Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
     SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin)
     throws SemanticException {
     // Create a new map join operator
     SMBJoinDesc smbJoinDesc = smbJoinOp.getConf();
     List<ExprNodeDesc> keyCols = smbJoinDesc.getKeys().get(Byte.valueOf((byte) 0));
-    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
+    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, PlanUtils
         .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
     MapJoinDesc mapJoinDesc = new MapJoinDesc(smbJoinDesc.getKeys(),
         keyTableDesc, smbJoinDesc.getExprs(),
@@ -644,8 +642,8 @@ public class MapJoinProcessor implements
 
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
         .getOpParseCtx();
-    MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos,
-        noCheckOuterJoin, true);
+    MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op,
+        joinTree, mapJoinPos, noCheckOuterJoin, true);
     // create a dummy select to select all columns
     genSelectPlan(pctx, mapJoinOp);
     return mapJoinOp;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Tue Oct  8 03:32:00 2013
@@ -2,21 +2,29 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.GenTezProcContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 
@@ -51,6 +59,8 @@ public class ReduceSinkMapJoinProc imple
       context.mapJoinParentMap.put(mapJoinOp, parents);
     }
 
+    BaseWork myWork = null;
+
     while (childOp != null) {
       if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) {
         /*
@@ -63,9 +73,9 @@ public class ReduceSinkMapJoinProc imple
          *
          */
 
-        BaseWork myWork = context.operatorWorkMap.get(childOp);
+        myWork = context.operatorWorkMap.get(childOp);
         BaseWork parentWork = context.operatorWorkMap.get(parentRS);
-          
+
         // set the link between mapjoin and parent vertex
         int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS);
         if (pos == -1) {
@@ -97,8 +107,56 @@ public class ReduceSinkMapJoinProc imple
       }
     }
 
+    // create the dummy operators
+    List<Operator<? extends OperatorDesc>> dummyOperators =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+
+    // create an new operator: HashTableDummyOperator, which share the table desc
+    HashTableDummyDesc desc = new HashTableDummyDesc();
+    @SuppressWarnings("unchecked")
+    HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc);
+    TableDesc tbl;
+
+    // need to create the correct table descriptor for key/value
+    RowSchema rowSchema = parentRS.getParentOperators().get(0).getSchema();
+    tbl = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromRowSchema(rowSchema, ""));
+    dummyOp.getConf().setTbl(tbl);
+
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = mapJoinOp.getConf().getKeys();
+    List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
+    StringBuffer keyOrder = new StringBuffer();
+    for (ExprNodeDesc k: keyCols) {
+      keyOrder.append("+");
+    }
+    TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils
+        .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString());
+    mapJoinOp.getConf().setKeyTableDesc(keyTableDesc);
+
+    // let the dummy op be the parent of mapjoin op
+    mapJoinOp.replaceParent(parentRS, dummyOp);
+    List<Operator<? extends OperatorDesc>> dummyChildren =
+      new ArrayList<Operator<? extends OperatorDesc>>();
+    dummyChildren.add(mapJoinOp);
+    dummyOp.setChildOperators(dummyChildren);
+    dummyOperators.add(dummyOp);
+
     // cut the operator tree so as to not retain connections from the parent RS downstream
-    parentRS.removeChild(mapJoinOp);
+    List<Operator<? extends OperatorDesc>> childOperators = parentRS.getChildOperators();
+    int childIndex = childOperators.indexOf(mapJoinOp);
+    childOperators.remove(childIndex);
+
+    // the "work" needs to know about the dummy operators. They have to be separately initialized
+    // at task startup
+    if (myWork != null) {
+      myWork.addDummyOp(dummyOp);
+    } else {
+      List<Operator<?>> dummyList = dummyOperators;
+      if (context.linkChildOpWithDummyOp.containsKey(childOp)) {
+        dummyList = context.linkChildOpWithDummyOp.get(childOp);
+      }
+      dummyList.add(dummyOp);
+      context.linkChildOpWithDummyOp.put(childOp, dummyList);
+    }
     return true;
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Tue Oct  8 03:32:00 2013
@@ -189,7 +189,8 @@ public class CommonJoinTaskDispatcher ex
 
     // optimize this newWork given the big table position
     String bigTableAlias =
-        MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, bigTablePosition);
+        MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(),
+            newWork, newJoinOp, bigTablePosition);
     return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Tue Oct  8 03:32:00 2013
@@ -432,7 +432,8 @@ public class SortMergeJoinTaskDispatcher
     opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
 
     // generate the map join operator
-    return MapJoinProcessor.convertSMBJoinToMapJoin(opParseContextMap, newSMBJoinOp,
+    return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(),
+        opParseContextMap, newSMBJoinOp,
         joinTree, mapJoinPos, true);
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Tue Oct  8 03:32:00 2013
@@ -94,6 +94,9 @@ public class GenTezProcContext implement
   // what position in the mapjoin the different parent work items will have.
   public final Map<MapJoinOperator, List<Operator<?>>> mapJoinParentMap;
 
+  // remember the dummy ops we created
+  public final Map<Operator<?>, List<Operator<?>>> linkChildOpWithDummyOp;
+
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -111,5 +114,6 @@ public class GenTezProcContext implement
     this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
     this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
     this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
+    this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Tue Oct  8 03:32:00 2013
@@ -24,6 +24,7 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -118,7 +119,7 @@ public class GenTezWork implements NodeP
       GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
 
       // remember which parent belongs to which tag
-      reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), 
+      reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
            context.preceedingWork.getName());
 
       tezWork.add(reduceWork);
@@ -151,12 +152,12 @@ public class GenTezWork implements NodeP
       ReduceSinkOperator rs = (ReduceSinkOperator) operator;
       ReduceWork rWork = (ReduceWork) followingWork;
       GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
-      
+
       // remember which parent belongs to which tag
       rWork.getTagToInput().put(rs.getConf().getTag(), work.getName());
 
       // add dependency between the two work items
-      tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator), 
+      tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator),
          EdgeType.SIMPLE_EDGE);
     }
 
@@ -200,6 +201,11 @@ public class GenTezWork implements NodeP
     context.operatorWorkMap.put(operator, work);
     List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(operator);
     if (linkWorkList != null) {
+      if (context.linkChildOpWithDummyOp.containsKey(operator)) {
+        for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(operator)) {
+          work.addDummyOp((HashTableDummyOperator) dummy);
+        }
+      }
       for (BaseWork parentWork : linkWorkList) {
         tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Tue Oct  8 03:32:00 2013
@@ -19,10 +19,10 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 
 /**
@@ -32,6 +32,12 @@ import org.apache.hadoop.hive.ql.exec.Op
 @SuppressWarnings({"serial", "deprecation"})
 public abstract class BaseWork extends AbstractOperatorDesc {
 
+  // dummyOps is a reference to all the HashTableDummy operators in the
+  // plan. These have to be separately initialized when we setup a task.
+  // Their funtion is mainly as root ops to give the mapjoin the correct
+  // schema info.
+  List<HashTableDummyOperator> dummyOps;
+
   public BaseWork() {}
 
   public BaseWork(String name) {
@@ -58,6 +64,21 @@ public abstract class BaseWork extends A
     this.name = name;
   }
 
+  public List<HashTableDummyOperator> getDummyOps() {
+    return dummyOps;
+  }
+
+  public void setDummyOps(List<HashTableDummyOperator> dummyOps) {
+    this.dummyOps = dummyOps;
+  }
+
+  public void addDummyOp(HashTableDummyOperator dummyOp) {
+    if (dummyOps == null) {
+      dummyOps = new LinkedList<HashTableDummyOperator>();
+    }
+    dummyOps.add(dummyOp);
+  }
+
   protected abstract List<Operator<?>> getAllRootOperators();
 
   public List<Operator<?>> getAllOperators() {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Tue Oct  8 03:32:00 2013
@@ -29,7 +29,9 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -376,14 +378,34 @@ public final class PlanUtils {
   /**
    * Generate the table descriptor for Map-side join key.
    */
-  public static TableDesc getMapJoinKeyTableDesc(List<FieldSchema> fieldSchemas) {
-    return new TableDesc(SequenceFileInputFormat.class,
-        SequenceFileOutputFormat.class, Utilities.makeProperties("columns",
-        MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas),
-        "columns.types", MetaStoreUtils
-        .getColumnTypesFromFieldSchema(fieldSchemas),
-        serdeConstants.ESCAPE_CHAR, "\\",
-        serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName()));
+  public static TableDesc getMapJoinKeyTableDesc(Configuration conf,
+      List<FieldSchema> fieldSchemas) {
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_OPTIMIZE_TEZ)) {
+      // In tez we use a different way of transmitting the hash table.
+      // We basically use ReduceSinkOperators and set the transfer to
+      // be broadcast (instead of partitioned). As a consequence we use
+      // a different SerDe than in the MR mapjoin case.
+      StringBuffer order = new StringBuffer();
+      for (FieldSchema f: fieldSchemas) {
+        order.append("+");
+      }
+      return new TableDesc(
+          SequenceFileInputFormat.class, SequenceFileOutputFormat.class,
+          Utilities.makeProperties(serdeConstants.LIST_COLUMNS, MetaStoreUtils
+              .getColumnNamesFromFieldSchema(fieldSchemas),
+              serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils
+              .getColumnTypesFromFieldSchema(fieldSchemas),
+              serdeConstants.SERIALIZATION_SORT_ORDER, order.toString(),
+              serdeConstants.SERIALIZATION_LIB, BinarySortableSerDe.class.getName()));
+    } else {
+      return new TableDesc(SequenceFileInputFormat.class,
+          SequenceFileOutputFormat.class, Utilities.makeProperties("columns",
+              MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas),
+              "columns.types", MetaStoreUtils
+              .getColumnTypesFromFieldSchema(fieldSchemas),
+              serdeConstants.ESCAPE_CHAR, "\\",
+              serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName()));
+    }
   }
 
   /**
@@ -391,13 +413,14 @@ public final class PlanUtils {
    */
   public static TableDesc getMapJoinValueTableDesc(
       List<FieldSchema> fieldSchemas) {
-    return new TableDesc(SequenceFileInputFormat.class,
-        SequenceFileOutputFormat.class, Utilities.makeProperties("columns",
-        MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas),
-        "columns.types", MetaStoreUtils
-        .getColumnTypesFromFieldSchema(fieldSchemas),
-        serdeConstants.ESCAPE_CHAR, "\\",
-        serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName()));
+      return new TableDesc(SequenceFileInputFormat.class,
+          SequenceFileOutputFormat.class, Utilities.makeProperties(
+              serdeConstants.LIST_COLUMNS, MetaStoreUtils
+              .getColumnNamesFromFieldSchema(fieldSchemas),
+              serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils
+              .getColumnTypesFromFieldSchema(fieldSchemas),
+              serdeConstants.ESCAPE_CHAR, "\\",
+              serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName()));
   }
 
   /**