You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/24 09:03:38 UTC

svn commit: r1627235 [2/9] - in /hive/trunk: itests/src/test/resources/ itests/util/src/main/java/org/apache/hadoop/hive/ql/ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Wed Sep 24 07:03:35 2014
@@ -20,6 +20,23 @@ package org.apache.hadoop.hive.ql.exec.t
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -32,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -47,10 +65,12 @@ import org.apache.hadoop.hive.ql.io.merg
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -90,12 +110,16 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
 import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
@@ -104,21 +128,6 @@ import org.apache.tez.runtime.library.co
 import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 
-import javax.security.auth.login.LoginException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
  * map and reduce work to tez vertices and edges. It handles configuration
@@ -130,6 +139,11 @@ public class DagUtils {
   private static final Log LOG = LogFactory.getLog(DagUtils.class.getName());
   private static final String TEZ_DIR = "_tez_scratch_dir";
   private static DagUtils instance;
+  // The merge file being currently processed.
+  public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX =
+      "hive.tez.current.merge.file.prefix";
+  // "A comma separated list of work names used as prefix.
+  public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes";
 
   private void addCredentials(MapWork mapWork, DAG dag) {
     Set<String> paths = mapWork.getPathToAliases().keySet();
@@ -238,8 +252,8 @@ public class DagUtils {
    * endpoints.
    */
   @SuppressWarnings("rawtypes")
-  public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
-      Vertex w, TezEdgeProperty edgeProp)
+  public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w,
+      TezEdgeProperty edgeProp, VertexType vertexType)
     throws IOException {
 
     Class mergeInputClass;
@@ -254,10 +268,14 @@ public class DagUtils {
     case CUSTOM_EDGE: {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
+      CustomVertexConfiguration vertexConf =
+          new CustomVertexConfiguration(numBuckets, vertexType, "");
+      DataOutputBuffer dob = new DataOutputBuffer();
+      vertexConf.write(dob);
       VertexManagerPluginDescriptor desc =
           VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
-      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
-      userPayload.flip();
+      byte[] userPayloadBytes = dob.getData();
+      ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
       desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
@@ -289,17 +307,21 @@ public class DagUtils {
    * @param w The second vertex (sink)
    * @return
    */
-  public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
-      TezEdgeProperty edgeProp)
+  public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp,
+      VertexType vertexType)
     throws IOException {
 
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
-      userPayload.flip();
+      CustomVertexConfiguration vertexConf =
+          new CustomVertexConfiguration(numBuckets, vertexType, "");
+      DataOutputBuffer dob = new DataOutputBuffer();
+      vertexConf.write(dob);
       VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
           CustomPartitionVertex.class.getName());
+      byte[] userPayloadBytes = dob.getData();
+      ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
       desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
@@ -443,12 +465,61 @@ public class DagUtils {
     return MRHelpers.getJavaOptsForMRMapper(conf);
   }
 
+  private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
+      List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx,
+      VertexType vertexType)
+      throws Exception {
+    Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
+    if (mergeJoinWork.getMainWork() instanceof MapWork) {
+      List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
+      MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
+      CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
+      Vertex mergeVx =
+          createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
+
+      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
+      // here would cause pre-mature grouping which would be incorrect.
+      Class inputFormatClass = HiveInputFormat.class;
+      conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
+      // mapreduce.tez.input.initializer.serialize.event.payload should be set
+      // to false when using this plug-in to avoid getting a serialized event at run-time.
+      conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false);
+      for (int i = 0; i < mapWorkList.size(); i++) {
+
+        mapWork = (MapWork) (mapWorkList.get(i));
+        conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName());
+        conf.set(Utilities.INPUT_NAME, mapWork.getName());
+        LOG.info("Going through each work and adding MultiMRInput");
+        mergeVx.addDataSource(mapWork.getName(),
+            MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build());
+      }
+
+      VertexManagerPluginDescriptor desc =
+        VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+      CustomVertexConfiguration vertexConf =
+          new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf()
+              .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias());
+      DataOutputBuffer dob = new DataOutputBuffer();
+      vertexConf.write(dob);
+      byte[] userPayload = dob.getData();
+      desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
+      mergeVx.setVertexManagerPlugin(desc);
+      return mergeVx;
+    } else {
+      Vertex mergeVx =
+          createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs,
+              mrScratchDir, ctx);
+      return mergeVx;
+    }
+  }
+
   /*
    * Helper function to create Vertex from MapWork.
    */
   private Vertex createVertex(JobConf conf, MapWork mapWork,
       LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
-      Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception {
+      Path mrScratchDir, Context ctx, VertexType vertexType)
+      throws Exception {
 
     Path tezDir = getTezDir(mrScratchDir);
 
@@ -470,15 +541,8 @@ public class DagUtils {
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
-    boolean vertexHasCustomInput = false;
-    if (tezWork != null) {
-      for (BaseWork baseWork : tezWork.getParents(mapWork)) {
-        if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) {
-          vertexHasCustomInput = true;
-        }
-      }
-    }
-
+    boolean vertexHasCustomInput = VertexType.isCustomInputType(vertexType);
+    LOG.info("Vertex has custom input? " + vertexHasCustomInput);
     if (vertexHasCustomInput) {
       groupSplitsInInputInitializer = false;
       // grouping happens in execution phase. The input payload should not enable grouping here,
@@ -513,6 +577,8 @@ public class DagUtils {
       }
     }
 
+    // remember mapping of plan to input
+    conf.set(Utilities.INPUT_NAME, mapWork.getName());
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
 
@@ -593,6 +659,7 @@ public class DagUtils {
       Path mrScratchDir, Context ctx) throws Exception {
 
     // set up operator plan
+    conf.set(Utilities.INPUT_NAME, reduceWork.getName());
     Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false);
 
     // create the directories FileSinkOperators need
@@ -937,12 +1004,22 @@ public class DagUtils {
       return initializeVertexConf(conf, context, (MapWork)work);
     } else if (work instanceof ReduceWork) {
       return initializeVertexConf(conf, context, (ReduceWork)work);
+    } else if (work instanceof MergeJoinWork) {
+      return initializeVertexConf(conf, context, (MergeJoinWork) work);
     } else {
       assert false;
       return null;
     }
   }
 
+  private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) {
+    if (work.getMainWork() instanceof MapWork) {
+      return initializeVertexConf(conf, context, (MapWork) (work.getMainWork()));
+    } else {
+      return initializeVertexConf(conf, context, (ReduceWork) (work.getMainWork()));
+    }
+  }
+
   /**
    * Create a vertex from a given work object.
    *
@@ -958,18 +1035,21 @@ public class DagUtils {
    */
   public Vertex createVertex(JobConf conf, BaseWork work,
       Path scratchDir, LocalResource appJarLr,
-      List<LocalResource> additionalLr,
-      FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception {
+      List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,
+      TezWork tezWork, VertexType vertexType) throws Exception {
 
     Vertex v = null;
     // simply dispatch the call to the right method for the actual (sub-) type of
     // BaseWork.
     if (work instanceof MapWork) {
-      v = createVertex(conf, (MapWork) work, appJarLr,
-          additionalLr, fileSystem, scratchDir, ctx, tezWork);
+      v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx,
+              vertexType);
     } else if (work instanceof ReduceWork) {
       v = createVertex(conf, (ReduceWork) work, appJarLr,
           additionalLr, fileSystem, scratchDir, ctx);
+    } else if (work instanceof MergeJoinWork) {
+      v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
+              ctx, vertexType);
     } else {
       // something is seriously wrong if this is happening
       throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Wed Sep 24 07:03:35 2014
@@ -152,8 +152,21 @@ public class HiveSplitGenerator extends 
   public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
       Configuration conf, InputSplit[] splits, float waves, int availableSlots)
       throws Exception {
+    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null);
+  }
+
+  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+      Configuration conf, InputSplit[] splits, float waves, int availableSlots,
+      String inputName) throws Exception {
 
-    MapWork work = Utilities.getMapWork(jobConf);
+    MapWork work = null;
+    if (inputName != null) {
+      work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
+      // work can still be null if there is no merge work for this input
+    }
+    if (work == null) {
+      work = Utilities.getMapWork(jobConf);
+    }
 
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Sep 24 07:03:35 2014
@@ -17,14 +17,20 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -36,15 +42,17 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
+import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -58,27 +66,61 @@ public class MapRecordProcessor extends 
 
 
   private MapOperator mapOp;
+  private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>();
   public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
-  private final ExecMapperContext execContext = new ExecMapperContext();
+  private MapRecordSource[] sources;
+  private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>();
+  private int position = 0;
+  private boolean foundCachedMergeWork = false;
+  MRInputLegacy legacyMRInput = null;
+  private ExecMapperContext execContext = null;
   private boolean abort = false;
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MapWork mapWork;
+  List<MapWork> mergeWorkList = null;
+  private static Map<Integer, DummyStoreOperator> connectOps =
+      new TreeMap<Integer, DummyStoreOperator>();
 
-  public MapRecordProcessor(JobConf jconf) {
+  public MapRecordProcessor(JobConf jconf) throws Exception {
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+    execContext = new ExecMapperContext(jconf);
     execContext.setJc(jconf);
     // create map and fetch operators
     mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
     if (mapWork == null) {
       mapWork = Utilities.getMapWork(jconf);
       cache.cache(MAP_PLAN_KEY, mapWork);
-      l4j.info("Plan: "+mapWork);
+      l4j.debug("Plan: " + mapWork);
       for (String s: mapWork.getAliases()) {
-        l4j.info("Alias: "+s);
+        l4j.debug("Alias: " + s);
       }
     } else {
       Utilities.setMapWork(jconf, mapWork);
     }
+
+    String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+    if (prefixes != null) {
+      mergeWorkList = new ArrayList<MapWork>();
+      for (String prefix : prefixes.split(",")) {
+        MapWork mergeMapWork = (MapWork) cache.retrieve(prefix);
+        if (mergeMapWork != null) {
+          l4j.info("Found merge work in cache");
+          foundCachedMergeWork = true;
+          mergeWorkList.add(mergeMapWork);
+          continue;
+        }
+        if (foundCachedMergeWork) {
+          throw new Exception(
+              "Should find all work in cache else operator pipeline will be in non-deterministic state");
+        }
+
+        if ((prefix != null) && (prefix.isEmpty() == false)) {
+          mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, prefix);
+          mergeWorkList.add(mergeMapWork);
+          cache.cache(prefix, mergeMapWork);
+        }
+      }
+    }
   }
 
   @Override
@@ -88,8 +130,8 @@ public class MapRecordProcessor extends 
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
 
     //Update JobConf using MRInput, info like filename comes via this
-    MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
-    Configuration updatedConf = mrInput.getConfigUpdates();
+    legacyMRInput = getMRInput(inputs);
+    Configuration updatedConf = legacyMRInput.getConfigUpdates();
     if (updatedConf != null) {
       for (Entry<String, String> entry : updatedConf) {
         jconf.set(entry.getKey(), entry.getValue());
@@ -99,20 +141,52 @@ public class MapRecordProcessor extends 
     createOutputMap();
     // Start all the Outputs.
     for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
-      l4j.info("Starting Output: " + outputEntry.getKey());
+      l4j.debug("Starting Output: " + outputEntry.getKey());
       outputEntry.getValue().start();
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
 
     try {
+
       if (mapWork.getVectorMode()) {
         mapOp = new VectorMapOperator();
       } else {
         mapOp = new MapOperator();
       }
 
+      connectOps.clear();
+      if (mergeWorkList != null) {
+        MapOperator mergeMapOp = null;
+        for (MapWork mergeMapWork : mergeWorkList) {
+          processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs
+              .get(mergeMapWork.getName()))));
+          if (mergeMapWork.getVectorMode()) {
+            mergeMapOp = new VectorMapOperator();
+          } else {
+            mergeMapOp = new MapOperator();
+          }
+
+          mergeMapOpList.add(mergeMapOp);
+          // initialize the merge operators first.
+          if (mergeMapOp != null) {
+            mergeMapOp.setConf(mergeMapWork);
+            l4j.info("Input name is " + mergeMapWork.getName());
+            jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
+            mergeMapOp.setChildren(jconf);
+            if (foundCachedMergeWork == false) {
+              DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
+              connectOps.put(mergeMapWork.getTag(), dummyOp);
+            }
+            mergeMapOp.setExecContext(new ExecMapperContext(jconf));
+            mergeMapOp.initializeLocalWork(jconf);
+          }
+        }
+      }
+
       // initialize map operator
       mapOp.setConf(mapWork);
+      l4j.info("Main input name is " + mapWork.getName());
+      jconf.set(Utilities.INPUT_NAME, mapWork.getName());
       mapOp.setChildren(jconf);
       l4j.info(mapOp.dump(0));
 
@@ -121,12 +195,21 @@ public class MapRecordProcessor extends 
       ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
       mapOp.setExecContext(execContext);
       mapOp.initializeLocalWork(jconf);
+
+      initializeMapRecordSources();
       mapOp.initialize(jconf, null);
+      if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
+        for (MapOperator mergeMapOp : mergeMapOpList) {
+          jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
+          mergeMapOp.initialize(jconf, null);
+        }
+      }
 
       // Initialization isn't finished until all parents of all operators
       // are initialized. For broadcast joins that means initializing the
       // dummy parent operators as well.
       List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps();
+      jconf.set(Utilities.INPUT_NAME, mapWork.getName());
       if (dummyOps != null) {
         for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
           dummyOp.setExecContext(execContext);
@@ -151,54 +234,46 @@ public class MapRecordProcessor extends 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
   }
 
-  @Override
-  void run() throws IOException{
-
-    MRInputLegacy in = TezProcessor.getMRInput(inputs);
-    KeyValueReader reader = in.getReader();
+  private void initializeMapRecordSources() throws Exception {
+    int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself
+    sources = new MapRecordSource[size];
+    KeyValueReader reader = legacyMRInput.getReader();
+    position = mapOp.getConf().getTag();
+    sources[position] = new MapRecordSource();
+    sources[position].init(jconf, mapOp, reader);
+    for (MapOperator mapOp : mergeMapOpList) {
+      int tag = mapOp.getConf().getTag();
+      sources[tag] = new MapRecordSource();
+      String inputName = mapOp.getConf().getName();
+      MultiMRInput multiMRInput = multiMRInputMap.get(inputName);
+      Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders();
+      l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName);
+      List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders);
+      reader = new KeyValueInputMerger(kvReaderList);
+      sources[tag].init(jconf, mapOp, reader);
+    }
+    ((TezContext) MapredContext.get()).setRecordSources(sources);
+  }
 
-    //process records until done
-    while(reader.next()){
-      //ignore the key for maps -  reader.getCurrentKey();
-      Object value = reader.getCurrentValue();
-      boolean needMore = processRow(value);
-      if(!needMore){
-        break;
+  private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) {
+    for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) {
+      if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) {
+        return (DummyStoreOperator) childOp;
+      } else {
+        return getJoinParentOp(childOp);
       }
     }
+    return null;
   }
 
+  @Override
+  void run() throws Exception {
 
-  /**
-   * @param value  value to process
-   * @return true if it is not done and can take more inputs
-   */
-  private boolean processRow(Object value) {
-    // reset the execContext for each new row
-    execContext.resetRow();
-
-    try {
-      if (mapOp.getDone()) {
-        return false; //done
-      } else {
-        // Since there is no concept of a group, we don't invoke
-        // startGroup/endGroup for a mapper
-        mapOp.process((Writable)value);
-        if (isLogInfoEnabled) {
-          logProgress();
-        }
-      }
-    } catch (Throwable e) {
-      abort = true;
-      if (e instanceof OutOfMemoryError) {
-        // Don't create a new object if we are already out of memory
-        throw (OutOfMemoryError) e;
-      } else {
-        l4j.fatal(StringUtils.stringifyException(e));
-        throw new RuntimeException(e);
+    while (sources[position].pushRecord()) {
+      if (isLogInfoEnabled) {
+        logProgress();
       }
     }
-    return true; //give me more
   }
 
   @Override
@@ -214,6 +289,11 @@ public class MapRecordProcessor extends 
         return;
       }
       mapOp.close(abort);
+      if (mergeMapOpList.isEmpty() == false) {
+        for (MapOperator mergeMapOp : mergeMapOpList) {
+          mergeMapOp.close(abort);
+        }
+      }
 
       // Need to close the dummyOps as well. The operator pipeline
       // is not considered "closed/done" unless all operators are
@@ -242,4 +322,27 @@ public class MapRecordProcessor extends 
       MapredContext.close();
     }
   }
+
+  public static Map<Integer, DummyStoreOperator> getConnectOps() {
+    return connectOps;
+  }
+
+  private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
+    // there should be only one MRInput
+    MRInputLegacy theMRInput = null;
+    l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray()));
+    for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
+      if (inp.getValue() instanceof MRInputLegacy) {
+        if (theMRInput != null) {
+          throw new IllegalArgumentException("Only one MRInput is expected");
+        }
+        // a better logic would be to find the alias
+        theMRInput = (MRInputLegacy) inp.getValue();
+      } else if (inp.getValue() instanceof MultiMRInput) {
+        multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue());
+      }
+    }
+    theMRInput.init();
+    return theMRInput;
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan Just pump the records
+ * through the query plan.
+ */
+
+public class MapRecordSource implements RecordSource {
+
+  public static final Log LOG = LogFactory.getLog(MapRecordSource.class);
+  private ExecMapperContext execContext = null;
+  private MapOperator mapOp = null;
+  private KeyValueReader reader = null;
+  private final boolean grouped = false;
+
+  void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException {
+    execContext = new ExecMapperContext(jconf);
+    this.mapOp = mapOp;
+    this.reader = reader;
+  }
+
+  @Override
+  public final boolean isGrouped() {
+    return grouped;
+  }
+
+  @Override
+  public boolean pushRecord() throws HiveException {
+    execContext.resetRow();
+
+    try {
+      if (reader.next()) {
+        Object value;
+        try {
+          value = reader.getCurrentValue();
+        } catch (IOException e) {
+          throw new HiveException(e);
+        }
+        return processRow(value);
+      }
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
+    return false;
+  }
+
+  private boolean processRow(Object value) {
+    try {
+      if (mapOp.getDone()) {
+        return false; // done
+      } else {
+        // Since there is no concept of a group, we don't invoke
+        // startGroup/endGroup for a mapper
+        mapOp.process((Writable) value);
+      }
+    } catch (Throwable e) {
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        LOG.fatal(StringUtils.stringifyException(e));
+        throw new RuntimeException(e);
+      }
+    }
+    return true; // give me more
+  }
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Wed Sep 24 07:03:35 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.t
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -40,7 +41,9 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
+import java.io.IOException;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Record processor for fast merging of files.
@@ -51,11 +54,12 @@ public class MergeFileRecordProcessor ex
       .getLog(MergeFileRecordProcessor.class);
 
   protected Operator<? extends OperatorDesc> mergeOp;
-  private final ExecMapperContext execContext = new ExecMapperContext();
+  private ExecMapperContext execContext = null;
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MergeFileWork mfWork;
+  MRInputLegacy mrInput = null;
   private boolean abort = false;
-  private Object[] row = new Object[2];
+  private final Object[] row = new Object[2];
 
   @Override
   void init(JobConf jconf, ProcessorContext processorContext,
@@ -63,16 +67,16 @@ public class MergeFileRecordProcessor ex
       Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
+    execContext = new ExecMapperContext(jconf);
 
     //Update JobConf using MRInput, info like filename comes via this
-    MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
+    mrInput = getMRInput(inputs);
     Configuration updatedConf = mrInput.getConfigUpdates();
     if (updatedConf != null) {
       for (Map.Entry<String, String> entry : updatedConf) {
         jconf.set(entry.getKey(), entry.getValue());
       }
     }
-
     createOutputMap();
     // Start all the Outputs.
     for (Map.Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
@@ -127,8 +131,7 @@ public class MergeFileRecordProcessor ex
 
   @Override
   void run() throws Exception {
-    MRInputLegacy in = TezProcessor.getMRInput(inputs);
-    KeyValueReader reader = in.getReader();
+    KeyValueReader reader = mrInput.getReader();
 
     //process records until done
     while (reader.next()) {
@@ -205,4 +208,23 @@ public class MergeFileRecordProcessor ex
     return true; //give me more
   }
 
+  private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
+    // there should be only one MRInput
+    MRInputLegacy theMRInput = null;
+    for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
+      if (inp.getValue() instanceof MRInputLegacy) {
+        if (theMRInput != null) {
+          throw new IllegalArgumentException("Only one MRInput is expected");
+        }
+        // a better logic would be to find the alias
+        theMRInput = (MRInputLegacy) inp.getValue();
+      } else {
+        throw new IOException("Expecting only one input of type MRInputLegacy. Found type: "
+            + inp.getClass().getCanonicalName());
+      }
+    }
+    theMRInput.init();
+
+    return theMRInput;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java Wed Sep 24 07:03:35 2014
@@ -39,12 +39,6 @@ public class MergeFileTezProcessor exten
   public void run(Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs) throws Exception {
     rproc = new MergeFileRecordProcessor();
-    MRInputLegacy mrInput = getMRInput(inputs);
-    try {
-      mrInput.init();
-    } catch (IOException e) {
-      throw new RuntimeException("Failed while initializing MRInput", e);
-    }
     initializeAndRunProcessor(inputs, outputs);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Sep 24 07:03:35 2014
@@ -115,8 +115,7 @@ public abstract class RecordProcessor  {
    */
   protected void logCloseInfo() {
     long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-    l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
-        + used_memory);
+    l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory);
   }
 
   /**
@@ -126,8 +125,7 @@ public abstract class RecordProcessor  {
     numRows++;
     if (numRows == nextUpdateCntr) {
       long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-      l4j.info("ExecMapper: processing " + numRows
-          + " rows: used memory = " + used_memory);
+      l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory);
       nextUpdateCntr = getNextUpdateRecordCounter(numRows);
     }
   }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public interface RecordSource {
+  public boolean pushRecord() throws HiveException;
+  public boolean isGrouped();
+}
\ No newline at end of file

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Sep 24 07:03:35 2014
@@ -17,9 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -35,31 +33,13 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
-import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -76,39 +56,16 @@ public class ReduceRecordProcessor  exte
   private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
 
   public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
-  private final ExecMapperContext execContext = new ExecMapperContext();
-  private boolean abort = false;
-  private Deserializer inputKeyDeserializer;
-
-  // Input value serde needs to be an array to support different SerDe
-  // for different tags
-  private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
 
-  TableDesc keyTableDesc;
-  TableDesc[] valueTableDesc;
+  private ReduceWork redWork;
 
-  ObjectInspector[] rowObjectInspector;
   private Operator<?> reducer;
-  private boolean isTagged = false;
-
-  private Object keyObject = null;
-  private BytesWritable groupKey;
-
-  private ReduceWork redWork;
 
-  private boolean vectorized = false;
+  private ReduceRecordSource[] sources;
 
-  List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+  private final byte position = 0;
 
-  private DataOutputBuffer buffer;
-  private VectorizedRowBatch[] batches;
-  // number of columns pertaining to keys in a vectorized row batch
-  private int keysColumnOffset;
-  private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
-  private StructObjectInspector keyStructInspector;
-  private StructObjectInspector[] valueStructInspectors;
-  /* this is only used in the error code path */
-  private List<VectorExpressionWriter>[] valueStringWriters;
+  private boolean abort;
 
   @Override
   void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
@@ -118,10 +75,6 @@ public class ReduceRecordProcessor  exte
 
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
 
-    rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
-    ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
-    ObjectInspector keyObjectInspector;
-
     redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY);
     if (redWork == null) {
       redWork = Utilities.getReduceWork(jconf);
@@ -131,95 +84,35 @@ public class ReduceRecordProcessor  exte
     }
 
     reducer = redWork.getReducer();
-    reducer.setParentOperators(null); // clear out any parents as reducer is the
-    // root
-    isTagged = redWork.getNeedsTagging();
-    vectorized = redWork.getVectorMode();
+    reducer.getParentOperators().clear();
+    reducer.setParentOperators(null); // clear out any parents as reducer is the root
 
-    try {
-      keyTableDesc = redWork.getKeyDesc();
-      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
-          .getDeserializerClass(), null);
-      SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
-      keyObjectInspector = inputKeyDeserializer.getObjectInspector();
-      reducer.setGroupKeyObjectInspector(keyObjectInspector);
-      valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()];
-
-      if(vectorized) {
-        final int maxTags = redWork.getTagToValueDesc().size();
-        keyStructInspector = (StructObjectInspector)keyObjectInspector;
-        batches = new VectorizedRowBatch[maxTags];
-        valueStructInspectors = new StructObjectInspector[maxTags];
-        valueStringWriters = new List[maxTags];
-        keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
-        buffer = new DataOutputBuffer();
-      }
+    int numTags = redWork.getTagToValueDesc().size();
 
-      for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
-        // We should initialize the SerDe with the TypeInfo when available.
-        valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag);
-        inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
-            valueTableDesc[tag].getDeserializerClass(), null);
-        SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
-                                   valueTableDesc[tag].getProperties(), null);
-        valueObjectInspector[tag] = inputValueDeserializer[tag]
-            .getObjectInspector();
-
-        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-
-        if(vectorized) {
-          /* vectorization only works with struct object inspectors */
-          valueStructInspectors[tag] = (StructObjectInspector)valueObjectInspector[tag];
-
-          batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
-              valueStructInspectors[tag]);
-          final int totalColumns = keysColumnOffset +
-              valueStructInspectors[tag].getAllStructFieldRefs().size();
-          valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
-          valueStringWriters[tag].addAll(Arrays
-              .asList(VectorExpressionWriterFactory
-                  .genVectorStructExpressionWritables(keyStructInspector)));
-          valueStringWriters[tag].addAll(Arrays
-              .asList(VectorExpressionWriterFactory
-                  .genVectorStructExpressionWritables(valueStructInspectors[tag])));
-
-          /*
-           * The row object inspector used by ReduceWork needs to be a **standard**
-           * struct object inspector, not just any struct object inspector.
-           */
-          ArrayList<String> colNames = new ArrayList<String>();
-          List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
-          for (StructField field: fields) {
-            colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
-            ois.add(field.getFieldObjectInspector());
-          }
-          fields = valueStructInspectors[tag].getAllStructFieldRefs();
-          for (StructField field: fields) {
-            colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
-            ois.add(field.getFieldObjectInspector());
-          }
-          rowObjectInspector[tag] = ObjectInspectorFactory
-                  .getStandardStructObjectInspector(colNames, ois);
-        } else {
-          ois.add(keyObjectInspector);
-          ois.add(valueObjectInspector[tag]);
-          rowObjectInspector[tag] = ObjectInspectorFactory
-                  .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois);
-        }
+    ObjectInspector[] ois = new ObjectInspector[numTags];
+    sources = new ReduceRecordSource[numTags];
 
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
+      TableDesc keyTableDesc = redWork.getKeyDesc();
+      TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag);
+      KeyValuesReader reader =
+          (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader();
+
+      sources[tag] = new ReduceRecordSource();
+      sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc,
+          reader, tag == position, (byte) tag);
+      ois[tag] = sources[tag].getObjectInspector();
     }
 
     MapredContext.init(false, new JobConf(jconf));
     ((TezContext) MapredContext.get()).setInputs(inputs);
     ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
+    ((TezContext) MapredContext.get()).setRecordSources(sources);
 
     // initialize reduce operator tree
     try {
       l4j.info(reducer.dump(0));
-      reducer.initialize(jconf, rowObjectInspector);
+      reducer.initialize(jconf, ois);
 
       // Initialization isn't finished until all parents of all operators
       // are initialized. For broadcast joins that means initializing the
@@ -227,7 +120,6 @@ public class ReduceRecordProcessor  exte
       List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
       if (dummyOps != null) {
         for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
-          dummyOp.setExecContext(execContext);
           dummyOp.initialize(jconf, null);
         }
       }
@@ -271,28 +163,12 @@ public class ReduceRecordProcessor  exte
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
 
-    KeyValuesReader kvsReader;
-    try {
-      if(shuffleInputs.size() == 1){
-        //no merging of inputs required
-        kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader();
-      }else {
-        //get a sort merged input
-        kvsReader = new InputMerger(shuffleInputs);
-      }
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    while(kvsReader.next()){
-      Object key = kvsReader.getCurrentKey();
-      Iterable<Object> values = kvsReader.getCurrentValues();
-      boolean needMore = processRows(key, values);
-      if(!needMore){
-        break;
+    // run the operator pipeline
+    while (sources[position].pushRecord()) {
+      if (isLogInfoEnabled) {
+        logProgress();
       }
     }
-
   }
 
   /**
@@ -302,212 +178,22 @@ public class ReduceRecordProcessor  exte
    */
   private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) {
     //the reduce plan inputs have tags, add all inputs that have tags
-    Map<Integer, String> tag2input = redWork.getTagToInput();
+    Map<Integer, String> tagToinput = redWork.getTagToInput();
     ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
-    for(String inpStr : tag2input.values()){
+    for(String inpStr : tagToinput.values()){
+      if (inputs.get(inpStr) == null) {
+        throw new AssertionError("Cound not find input: " + inpStr);
+      }
       shuffleInputs.add(inputs.get(inpStr));
     }
     return shuffleInputs;
   }
 
-  /**
-   * @param key
-   * @param values
-   * @return true if it is not done and can take more inputs
-   */
-  private boolean processRows(Object key, Iterable<Object> values) {
-    if(reducer.getDone()){
-      //done - no more records needed
-      return false;
-    }
-
-    // reset the execContext for each new row
-    execContext.resetRow();
-
-    try {
-      BytesWritable keyWritable = (BytesWritable) key;
-      byte tag = 0;
-
-      if (isTagged) {
-        // remove the tag from key coming out of reducer
-        // and store it in separate variable.
-        int size = keyWritable.getLength() - 1;
-        tag = keyWritable.getBytes()[size];
-        keyWritable.setSize(size);
-      }
-
-      //Set the key, check if this is a new group or same group
-      if (!keyWritable.equals(this.groupKey)) {
-        // If a operator wants to do some work at the beginning of a group
-        if (groupKey == null) { // the first group
-          this.groupKey = new BytesWritable();
-        } else {
-          // If a operator wants to do some work at the end of a group
-          if(isLogTraceEnabled) {
-            l4j.trace("End Group");
-          }
-          reducer.endGroup();
-        }
-
-        try {
-          this.keyObject = inputKeyDeserializer.deserialize(keyWritable);
-        } catch (Exception e) {
-          throw new HiveException(
-              "Hive Runtime Error: Unable to deserialize reduce input key from "
-              + Utilities.formatBinaryString(keyWritable.getBytes(), 0,
-              keyWritable.getLength()) + " with properties "
-              + keyTableDesc.getProperties(), e);
-        }
-        groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength());
-        if (isLogTraceEnabled) {
-          l4j.trace("Start Group");
-        }
-        reducer.setGroupKeyObject(keyObject);
-        reducer.startGroup();
-      }
-      /* this.keyObject passed via reference */
-      if(vectorized) {
-        return processVectors(values, tag);
-      } else {
-        return processKeyValues(values, tag);
-      }
-    } catch (Throwable e) {
-      abort = true;
-      if (e instanceof OutOfMemoryError) {
-        // Don't create a new object if we are already out of memory
-        throw (OutOfMemoryError) e;
-      } else {
-        l4j.fatal(StringUtils.stringifyException(e));
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException {
-    try {
-      return inputValueDeserializer[tag].deserialize(valueWritable);
-    } catch (SerDeException e) {
-      throw new HiveException(
-          "Hive Runtime Error: Unable to deserialize reduce input value (tag="
-              + tag
-              + ") from "
-              + Utilities.formatBinaryString(valueWritable.getBytes(), 0,
-                  valueWritable.getLength()) + " with properties "
-              + valueTableDesc[tag].getProperties(), e);
-    }
-  }
-
-  /**
-   * @param values
-   * @return true if it is not done and can take more inputs
-   */
-  private boolean processKeyValues(Iterable<Object> values, byte tag) throws HiveException {
-
-    for (Object value : values) {
-      BytesWritable valueWritable = (BytesWritable) value;
-
-      row.clear();
-      row.add(this.keyObject);
-      row.add(deserializeValue(valueWritable, tag));
-
-      try {
-        reducer.processOp(row, tag);
-      } catch (Exception e) {
-        String rowString = null;
-        try {
-          rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
-        } catch (Exception e2) {
-          rowString = "[Error getting row data with exception "
-              + StringUtils.stringifyException(e2) + " ]";
-        }
-        throw new HiveException("Hive Runtime Error while processing row (tag="
-            + tag + ") " + rowString, e);
-      }
-      if (isLogInfoEnabled) {
-        logProgress();
-      }
-    }
-    return true; //give me more
-  }
-
-  /**
-   * @param values
-   * @return true if it is not done and can take more inputs
-   */
-  private boolean processVectors(Iterable<Object> values, byte tag) throws HiveException {
-    VectorizedRowBatch batch = batches[tag];
-    batch.reset();
-    buffer.reset();
-
-    /* deserialize key into columns */
-    VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector,
-        0, 0, batch, buffer);
-    for(int i = 0; i < keysColumnOffset; i++) {
-      VectorizedBatchUtil.setRepeatingColumn(batch, i);
-    }
-
-    int rowIdx = 0;
-    try {
-      for (Object value : values) {
-        /* deserialize value into columns */
-        BytesWritable valueWritable = (BytesWritable) value;
-        Object valueObj = deserializeValue(valueWritable, tag);
-
-        VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag],
-            rowIdx, keysColumnOffset, batch, buffer);
-        rowIdx++;
-        if (rowIdx >= BATCH_SIZE) {
-          VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-          reducer.processOp(batch, tag);
-          rowIdx = 0;
-          buffer.reset();
-          if (isLogInfoEnabled) {
-            logProgress();
-          }
-        }
-      }
-      if (rowIdx > 0) {
-        VectorizedBatchUtil.setBatchSize(batch, rowIdx);
-        reducer.processOp(batch, tag);
-        buffer.reset();
-      }
-      if (isLogInfoEnabled) {
-        logProgress();
-      }
-    } catch (Exception e) {
-      String rowString = null;
-      try {
-        /* batch.toString depends on this */
-        batch.setValueWriters(valueStringWriters[tag]
-            .toArray(new VectorExpressionWriter[0]));
-        rowString = batch.toString();
-      } catch (Exception e2) {
-        rowString = "[Error getting row data with exception "
-            + StringUtils.stringifyException(e2) + " ]";
-      }
-      throw new HiveException("Hive Runtime Error while processing vector batch (tag="
-          + tag + ") " + rowString, e);
-    }
-    return true; // give me more
-  }
-
   @Override
   void close(){
-    // check if there are IOExceptions
-    if (!abort) {
-      abort = execContext.getIoCxt().getIOExceptions();
-    }
-
     try {
-      if (groupKey != null) {
-        // If a operator wants to do some work at the end of a group
-        if(isLogTraceEnabled) {
-          l4j.trace("End Group");
-        }
-        reducer.endGroup();
-      }
-      if (isLogInfoEnabled) {
-        logCloseInfo();
+      for (ReduceRecordSource rs: sources) {
+        abort = abort && rs.close();
       }
 
       reducer.close(abort);

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan
+ * Just pump the records through the query plan.
+ */
+@SuppressWarnings("deprecation")
+public class ReduceRecordSource implements RecordSource {
+
+  public static final Log l4j = LogFactory.getLog(ReduceRecordSource.class);
+
+  private static final String CLASS_NAME = ReduceRecordSource.class.getName();
+
+  private byte tag;
+
+  private boolean abort = false;
+
+  private static Deserializer inputKeyDeserializer;
+
+  // Input value serde needs to be an array to support different SerDe
+  // for different tags
+  private SerDe inputValueDeserializer;
+
+  TableDesc keyTableDesc;
+  TableDesc valueTableDesc;
+
+  ObjectInspector rowObjectInspector;
+  private Operator<?> reducer;
+
+  private Object keyObject = null;
+  private BytesWritable groupKey;
+
+  private boolean vectorized = false;
+
+  List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+
+  private DataOutputBuffer buffer;
+  private VectorizedRowBatch batch;
+
+  // number of columns pertaining to keys in a vectorized row batch
+  private int keysColumnOffset;
+  private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
+
+  private StructObjectInspector keyStructInspector;
+  private StructObjectInspector valueStructInspectors;
+
+  /* this is only used in the error code path */
+  private List<VectorExpressionWriter> valueStringWriters;
+
+  private KeyValuesReader reader;
+
+  private boolean handleGroupKey;
+
+  private ObjectInspector valueObjectInspector;
+
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+
+  private Iterable<Object> valueWritables;
+  
+  private final boolean grouped = true;
+
+  void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
+      TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag)
+      throws Exception {
+
+    ObjectInspector keyObjectInspector;
+
+    this.reducer = reducer;
+    this.vectorized = vectorized;
+    this.keyTableDesc = keyTableDesc;
+    this.reader = reader;
+    this.handleGroupKey = handleGroupKey;
+    this.tag = tag;
+
+    try {
+      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+          .getDeserializerClass(), null);
+      SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
+      keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+      reducer.setGroupKeyObjectInspector(keyObjectInspector);
+
+      if(vectorized) {
+        keyStructInspector = (StructObjectInspector) keyObjectInspector;
+        keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+        buffer = new DataOutputBuffer();
+      }
+
+      // We should initialize the SerDe with the TypeInfo when available.
+      this.valueTableDesc = valueTableDesc;
+      inputValueDeserializer = (SerDe) ReflectionUtils.newInstance(
+          valueTableDesc.getDeserializerClass(), null);
+      SerDeUtils.initializeSerDe(inputValueDeserializer, null,
+          valueTableDesc.getProperties(), null);
+      valueObjectInspector = inputValueDeserializer.getObjectInspector();
+
+      ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+
+      if(vectorized) {
+        /* vectorization only works with struct object inspectors */
+        valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+
+        batch = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
+            valueStructInspectors);
+
+        final int totalColumns = keysColumnOffset +
+            valueStructInspectors.getAllStructFieldRefs().size();
+        valueStringWriters = new ArrayList<VectorExpressionWriter>(totalColumns);
+        valueStringWriters.addAll(Arrays
+            .asList(VectorExpressionWriterFactory
+                .genVectorStructExpressionWritables(keyStructInspector)));
+        valueStringWriters.addAll(Arrays
+            .asList(VectorExpressionWriterFactory
+                .genVectorStructExpressionWritables(valueStructInspectors)));
+
+        /*
+         * The row object inspector used by ReduceWork needs to be a **standard**
+         * struct object inspector, not just any struct object inspector.
+         */
+        ArrayList<String> colNames = new ArrayList<String>();
+        List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
+        for (StructField field: fields) {
+          colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+          ois.add(field.getFieldObjectInspector());
+        }
+        fields = valueStructInspectors.getAllStructFieldRefs();
+        for (StructField field: fields) {
+          colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+          ois.add(field.getFieldObjectInspector());
+        }
+        rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
+      } else {
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector);
+        rowObjectInspector =
+            ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+                ois);
+      }
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        throw new RuntimeException("Reduce operator initialization failed", e);
+      }
+    }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+  }
+  
+  @Override
+  public final boolean isGrouped() {
+    return grouped;
+  }
+
+  @Override
+  public boolean pushRecord() throws HiveException {
+    BytesWritable keyWritable;
+
+    try {
+      if (!reader.next()) {
+        return false;
+      } else {
+        keyWritable = (BytesWritable) reader.getCurrentKey();
+        valueWritables = reader.getCurrentValues();
+      }
+
+      //Set the key, check if this is a new group or same group
+      try {
+        keyObject = inputKeyDeserializer.deserialize(keyWritable);
+      } catch (Exception e) {
+        throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input key from "
+            + Utilities.formatBinaryString(keyWritable.getBytes(), 0, keyWritable.getLength())
+            + " with properties " + keyTableDesc.getProperties(), e);
+      }
+
+      if (handleGroupKey && !keyWritable.equals(this.groupKey)) {
+        // If a operator wants to do some work at the beginning of a group
+        if (groupKey == null) { // the first group
+          this.groupKey = new BytesWritable();
+        } else {
+          // If a operator wants to do some work at the end of a group
+          reducer.endGroup();
+        }
+
+        groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength());
+        reducer.setGroupKeyObject(keyObject);
+        reducer.startGroup();
+      }
+
+      /* this.keyObject passed via reference */
+      if(vectorized) {
+        processVectors(valueWritables, tag);
+      } else {
+        processKeyValues(valueWritables, tag);
+      }
+      return true;
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        l4j.fatal(StringUtils.stringifyException(e));
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private Object deserializeValue(BytesWritable valueWritable, byte tag)
+      throws HiveException {
+
+    try {
+      return inputValueDeserializer.deserialize(valueWritable);
+    } catch (SerDeException e) {
+      throw new HiveException(
+          "Hive Runtime Error: Unable to deserialize reduce input value (tag="
+              + tag
+              + ") from "
+          + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength())
+          + " with properties " + valueTableDesc.getProperties(), e);
+    }
+  }
+
+  /**
+   * @param values
+   * @return true if it is not done and can take more inputs
+   */
+  private void processKeyValues(Iterable<Object> values, byte tag) throws HiveException {
+    List<Object> passDownKey = null;
+    for (Object value : values) {
+      BytesWritable valueWritable = (BytesWritable) value;
+
+      row.clear();
+      if (passDownKey == null) {
+        row.add(this.keyObject);
+      } else {
+        row.add(passDownKey.get(0));
+      }
+      if ((passDownKey == null) && (reducer instanceof CommonMergeJoinOperator)) {
+        passDownKey =
+            (List<Object>) ObjectInspectorUtils.copyToStandardObject(row,
+                reducer.getInputObjInspectors()[tag], ObjectInspectorCopyOption.WRITABLE);
+        row.remove(0);
+        row.add(0, passDownKey.get(0));
+      }
+
+      row.add(deserializeValue(valueWritable, tag));
+
+      try {
+        reducer.processOp(row, tag);
+      } catch (Exception e) {
+        String rowString = null;
+        try {
+          rowString = SerDeUtils.getJSONString(row, rowObjectInspector);
+        } catch (Exception e2) {
+          rowString = "[Error getting row data with exception "
+              + StringUtils.stringifyException(e2) + " ]";
+        }
+        throw new HiveException("Hive Runtime Error while processing row (tag="
+            + tag + ") " + rowString, e);
+      }
+    }
+  }
+
+  /**
+   * @param values
+   * @return true if it is not done and can take more inputs
+   */
+  private void processVectors(Iterable<Object> values, byte tag) throws HiveException {
+    batch.reset();
+
+    /* deserialize key into columns */
+    VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector,
+        0, 0, batch, buffer);
+    for(int i = 0; i < keysColumnOffset; i++) {
+      VectorizedBatchUtil.setRepeatingColumn(batch, i);
+    }
+
+    int rowIdx = 0;
+    try {
+      for (Object value : values) {
+        /* deserialize value into columns */
+        BytesWritable valueWritable = (BytesWritable) value;
+        Object valueObj = deserializeValue(valueWritable, tag);
+
+        VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors,
+            rowIdx, keysColumnOffset, batch, buffer);
+        rowIdx++;
+        if (rowIdx >= BATCH_SIZE) {
+          VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+          reducer.processOp(batch, tag);
+          rowIdx = 0;
+        }
+      }
+      if (rowIdx > 0) {
+        VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+        reducer.processOp(batch, tag);
+      }
+    } catch (Exception e) {
+      String rowString = null;
+      try {
+        /* batch.toString depends on this */
+        batch.setValueWriters(valueStringWriters
+            .toArray(new VectorExpressionWriter[0]));
+        rowString = batch.toString();
+      } catch (Exception e2) {
+        rowString = "[Error getting row data with exception "
+            + StringUtils.stringifyException(e2) + " ]";
+      }
+      throw new HiveException("Hive Runtime Error while processing vector batch (tag="
+          + tag + ") " + rowString, e);
+    }
+  }
+
+  boolean close() throws Exception {
+    try {
+      if (handleGroupKey && groupKey != null) {
+        // If a operator wants to do some work at the end of a group
+        reducer.endGroup();
+      }
+    } catch (Exception e) {
+      if (!abort) {
+        // signal new failure to map-reduce
+        l4j.error("Hit error while closing operators - failing tree");
+        throw new RuntimeException("Hive Runtime Error while closing operators: "
+            + e.getMessage(), e);
+      }
+    }
+    return abort;
+  }
+
+  public ObjectInspector getObjectInspector() {
+    return rowObjectInspector;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Wed Sep 24 07:03:35 2014
@@ -37,6 +37,8 @@ public class TezContext extends MapredCo
 
   private ProcessorContext processorContext;
 
+  private RecordSource[] sources;
+
   public TezContext(boolean isMap, JobConf jobConf) {
     super(isMap, jobConf);
   }
@@ -70,4 +72,12 @@ public class TezContext extends MapredCo
   public ProcessorContext getTezProcessorContext() {
     return processorContext;
   }
+
+  public RecordSource[] getRecordSources() {
+    return sources;
+  }
+
+  public void setRecordSources(RecordSource[] sources) {
+    this.sources = sources;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Sep 24 07:03:35 2014
@@ -17,6 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -26,6 +34,7 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
@@ -34,11 +43,6 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Hive processor for Tez that forms the vertices in Tez and processes the data.
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -90,7 +94,8 @@ public class TezProcessor extends Abstra
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
     Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     this.jobConf = new JobConf(conf);
-    setupMRLegacyConfigs(getContext());
+    this.processorContext = getContext();
+    setupMRLegacyConfigs(processorContext);
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
   }
 
@@ -130,12 +135,6 @@ public class TezProcessor extends Abstra
 
       if (isMap) {
         rproc = new MapRecordProcessor(jobConf);
-        MRInputLegacy mrInput = getMRInput(inputs);
-        try {
-          mrInput.init();
-        } catch (IOException e) {
-          throw new RuntimeException("Failed while initializing MRInput", e);
-        }
       } else {
         rproc = new ReduceRecordProcessor();
       }
@@ -148,18 +147,6 @@ public class TezProcessor extends Abstra
       throws Exception {
     Throwable originalThrowable = null;
     try {
-      TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
-      // Start the actual Inputs. After MRInput initialization.
-      for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
-        if (!cacheAccess.isInputCached(inputEntry.getKey())) {
-          LOG.info("Input: " + inputEntry.getKey() + " is not cached");
-          inputEntry.getValue().start();
-        } else {
-          LOG.info("Input: " + inputEntry.getKey() +
-              " is already cached. Skipping start");
-        }
-      }
-
       // Outputs will be started later by the individual Processors.
 
       MRTaskReporter mrReporter = new MRTaskReporter(getContext());
@@ -214,19 +201,4 @@ public class TezProcessor extends Abstra
       writer.write(key, value);
     }
   }
-
-  static  MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) {
-    //there should be only one MRInput
-    MRInputLegacy theMRInput = null;
-    for(LogicalInput inp : inputs.values()){
-      if(inp instanceof MRInputLegacy){
-        if(theMRInput != null){
-          throw new IllegalArgumentException("Only one MRInput is expected");
-        }
-        //a better logic would be to find the alias
-        theMRInput = (MRInputLegacy)inp;
-      }
-    }
-    return theMRInput;
-  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Wed Sep 24 07:03:35 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
@@ -313,15 +314,16 @@ public class TezTask extends Task<TezWor
         for (BaseWork v: children) {
           // finally we can create the grouped edge
           GroupInputEdge e = utils.createEdge(group, parentConf,
-               workToVertex.get(v), work.getEdgeProperty(w, v));
+               workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v));
 
           dag.addEdge(e);
         }
       } else {
         // Regular vertices
         JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
-        Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr,
-          additionalLr, fs, ctx, !isFinal, work);
+        Vertex wx =
+            utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal,
+                work, work.getVertexType(w));
         dag.addVertex(wx);
         utils.addCredentials(w, dag);
         perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
@@ -335,7 +337,7 @@ public class TezTask extends Task<TezWor
 
           TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
 
-          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp);
+          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v));
           dag.addEdge(e);
         }
       }
@@ -386,6 +388,9 @@ public class TezTask extends Task<TezWor
     try {
       List<BaseWork> ws = work.getAllWork();
       for (BaseWork w: ws) {
+        if (w instanceof MergeJoinWork) {
+          w = ((MergeJoinWork) w).getMainWork();
+        }
         for (Operator<?> op: w.getAllOperators()) {
           op.jobClose(conf, rc == 0);
         }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez.tools;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * A KeyValuesReader implementation that returns a sorted stream of key-values
+ * by doing a sorted merge of the key-value in LogicalInputs.
+ * Tags are in the last byte of the key, so no special handling for tags is required.
+ * Uses a priority queue to pick the KeyValuesReader of the input that is next in
+ * sort order.
+ */
+public class KeyValueInputMerger extends KeyValueReader {
+
+  public static final Log l4j = LogFactory.getLog(KeyValueInputMerger.class);
+  private PriorityQueue<KeyValueReader> pQueue = null;
+  private KeyValueReader nextKVReader = null;
+
+  public KeyValueInputMerger(List<KeyValueReader> multiMRInputs) throws Exception {
+    //get KeyValuesReaders from the LogicalInput and add them to priority queue
+    int initialCapacity = multiMRInputs.size();
+    pQueue = new PriorityQueue<KeyValueReader>(initialCapacity, new KVReaderComparator());
+    l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size());
+    for (KeyValueReader input : multiMRInputs) {
+      addToQueue(input);
+    }
+  }
+
+  /**
+   * Add KeyValueReader to queue if it has more key-value
+   *
+   * @param kvReader
+   * @throws IOException
+   */
+  private void addToQueue(KeyValueReader kvReader) throws IOException {
+    if (kvReader.next()) {
+      pQueue.add(kvReader);
+    }
+  }
+
+  /**
+   * @return true if there are more key-values and advances to next key-values
+   * @throws IOException
+   */
+  @Override
+  public boolean next() throws IOException {
+    //add the previous nextKVReader back to queue
+    if(nextKVReader != null){
+      addToQueue(nextKVReader);
+    }
+
+    //get the new nextKVReader with lowest key
+    nextKVReader = pQueue.poll();
+    return nextKVReader != null;
+  }
+
+  @Override
+  public Object getCurrentKey() throws IOException {
+    return nextKVReader.getCurrentKey();
+  }
+
+  @Override
+  public Object getCurrentValue() throws IOException {
+    return nextKVReader.getCurrentValue();
+  }
+
+  /**
+   * Comparator that compares KeyValuesReader on their current key
+   */
+  class KVReaderComparator implements Comparator<KeyValueReader> {
+
+    @Override
+    public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) {
+      try {
+        BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue();
+        BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue();
+        return key1.compareTo(key2);
+      } catch (IOException e) {
+        l4j.error("Caught exception while reading shuffle input", e);
+        //die!
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}