You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 05:44:26 UTC

svn commit: r1629562 [4/38] - in /hive/branches/spark: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ contrib/src...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Mon Oct  6 03:44:13 2014
@@ -33,10 +33,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -182,7 +181,7 @@ public class MapOperator extends Operato
 
     PartitionDesc pd = ctx.partDesc;
     TableDesc td = pd.getTableDesc();
-
+    
     MapOpCtx opCtx = new MapOpCtx();
     // Use table properties in case of unpartitioned tables,
     // and the union of table properties and partition properties, with partition
@@ -206,42 +205,42 @@ public class MapOperator extends Operato
 
     opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
         partRawRowObjectInspector, opCtx.tblRawRowObjectInspector);
-
+    
     // Next check if this table has partitions and if so
     // get the list of partition names as well as allocate
     // the serdes for the partition columns
     String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-
+    
     if (pcols != null && pcols.length() > 0) {
       String[] partKeys = pcols.trim().split("/");
       String pcolTypes = overlayedProps
           .getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
       String[] partKeyTypes = pcolTypes.trim().split(":");
-
+      
       if (partKeys.length > partKeyTypes.length) {
           throw new HiveException("Internal error : partKeys length, " +partKeys.length +
                   " greater than partKeyTypes length, " + partKeyTypes.length);
       }
-
+      
       List<String> partNames = new ArrayList<String>(partKeys.length);
       Object[] partValues = new Object[partKeys.length];
       List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
-
+      
       for (int i = 0; i < partKeys.length; i++) {
         String key = partKeys[i];
         partNames.add(key);
         ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector
             (TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
-
+        
         // Partitions do not exist for this table
         if (partSpec == null) {
           // for partitionless table, initialize partValue to null
           partValues[i] = null;
         } else {
-            partValues[i] =
+            partValues[i] = 
                 ObjectInspectorConverters.
                 getConverter(PrimitiveObjectInspectorFactory.
-                    javaStringObjectInspector, oi).convert(partSpec.get(key));
+                    javaStringObjectInspector, oi).convert(partSpec.get(key)); 
         }
         partObjectInspectors.add(oi);
       }
@@ -338,8 +337,13 @@ public class MapOperator extends Operato
     return tableDescOI;
   }
 
+  private boolean isPartitioned(PartitionDesc pd) {
+    return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty();
+  }
+
   public void setChildren(Configuration hconf) throws HiveException {
-    Path fpath = IOContext.get(hconf.get(Utilities.INPUT_NAME)).getInputPath();
+
+    Path fpath = IOContext.get().getInputPath();
 
     boolean schemeless = fpath.toUri().getScheme() == null;
 
@@ -635,8 +639,4 @@ public class MapOperator extends Operato
     return null;
   }
 
-  @Override
-  public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
-    return MapRecordProcessor.getConnectOps();
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Oct  6 03:44:13 2014
@@ -353,7 +353,6 @@ public class MoveTask extends Task<MoveW
               pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
             }
 
-            long startTime = System.currentTimeMillis();
             // load the list of DP partitions and return the list of partition specs
             // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
             // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
@@ -361,7 +360,7 @@ public class MoveTask extends Task<MoveW
             // iterate over it and call loadPartition() here.
             // The reason we don't do inside HIVE-1361 is the latter is large and we
             // want to isolate any potential issue it may introduce.
-            Map<Map<String, String>, Partition> dp =
+            ArrayList<LinkedHashMap<String, String>> dp =
               db.loadDynamicPartitions(
                 tbd.getSourcePath(),
                 tbd.getTable().getTableName(),
@@ -371,19 +370,16 @@ public class MoveTask extends Task<MoveW
                 tbd.getHoldDDLTime(),
                 isSkewedStoredAsDirs(tbd),
                 work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
-            console.printInfo("\t Time taken for load dynamic partitions : "  +
-                (System.currentTimeMillis() - startTime));
 
             if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
               throw new HiveException("This query creates no partitions." +
                   " To turn off this error, set hive.error.on.empty.partition=false.");
             }
 
-            startTime = System.currentTimeMillis();
             // for each partition spec, get the partition
             // and put it to WriteEntity for post-exec hook
-            for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
-              Partition partn = entry.getValue();
+            for (LinkedHashMap<String, String> partSpec: dp) {
+              Partition partn = db.getPartition(table, partSpec, false);
 
               if (bucketCols != null || sortCols != null) {
                 updatePartitionBucketSortColumns(table, partn, bucketCols, numBuckets, sortCols);
@@ -416,10 +412,8 @@ public class MoveTask extends Task<MoveW
                     table.getCols());
               }
 
-              console.printInfo("\tLoading partition " + entry.getKey());
+              console.printInfo("\tLoading partition " + partSpec);
             }
-            console.printInfo("\t Time taken for adding to write entity : " +
-                (System.currentTimeMillis() - startTime));
             dc = null; // reset data container to prevent it being added again.
           } else { // static partitions
             List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Mon Oct  6 03:44:13 2014
@@ -146,7 +146,6 @@ public abstract class Operator<T extends
   /**
    * Implements the getChildren function for the Node Interface.
    */
-  @Override
   public ArrayList<Node> getChildren() {
 
     if (getChildOperators() == null) {
@@ -852,7 +851,6 @@ public abstract class Operator<T extends
    *
    * @return the name of the operator
    */
-  @Override
   public String getName() {
     return getOperatorName();
   }
@@ -1063,7 +1061,7 @@ public abstract class Operator<T extends
 
     if (parents != null) {
       for (Operator<? extends OperatorDesc> parent : parents) {
-        parentClones.add((parent.clone()));
+        parentClones.add((Operator<? extends OperatorDesc>)(parent.clone()));
       }
     }
 
@@ -1084,8 +1082,8 @@ public abstract class Operator<T extends
   public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
     T descClone = (T) conf.clone();
     Operator<? extends OperatorDesc> ret =
-        OperatorFactory.getAndMakeChild(
-        descClone, getSchema());
+        (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
+            descClone, getSchema());
     return ret;
   }
 
@@ -1256,15 +1254,15 @@ public abstract class Operator<T extends
     }
     return null;
   }
-
+  
   public OpTraits getOpTraits() {
     if (conf != null) {
       return conf.getOpTraits();
     }
-
+    
     return null;
   }
-
+  
   public void setOpTraits(OpTraits metaInfo) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Setting traits ("+metaInfo+") on "+this);
@@ -1301,17 +1299,7 @@ public abstract class Operator<T extends
 
   private static class DummyOperator extends Operator {
     public DummyOperator() { super("dummy"); }
-    @Override
     public void processOp(Object row, int tag) { }
-    @Override
     public OperatorType getType() { return null; }
   }
-
-  public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
-    if ((parentOperators == null) || (parentOperators.size() == 0)) {
-      return null;
-    }
-    Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree();
-    return dummyOps;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Mon Oct  6 03:44:13 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -32,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
-import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
@@ -116,16 +114,10 @@ public final class OperatorFactory {
         RCFileMergeOperator.class));
     opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
         OrcFileMergeOperator.class));
-    opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class,
-        CommonMergeJoinOperator.class));
   }
 
   static {
     vectorOpvec = new ArrayList<OpTuple>();
-    vectorOpvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
-        VectorAppMasterEventOperator.class));
-    vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
-        VectorAppMasterEventOperator.class));
     vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
     vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
     vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class));

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Mon Oct  6 03:44:13 2014
@@ -46,9 +46,6 @@ public class OperatorUtils {
   public static <T> Set<T> findOperators(Collection<Operator<?>> starts, Class<T> clazz) {
     Set<T> found = new HashSet<T>();
     for (Operator<?> start : starts) {
-      if (start == null) {
-        continue;
-      }
       findOperators(start, clazz, found);
     }
     return found;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Oct  6 03:44:13 2014
@@ -89,7 +89,6 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -202,8 +201,6 @@ public final class Utilities {
   public static String HADOOP_LOCAL_FS = "file:///";
   public static String MAP_PLAN_NAME = "map.xml";
   public static String REDUCE_PLAN_NAME = "reduce.xml";
-  public static String MERGE_PLAN_NAME = "merge.xml";
-  public static final String INPUT_NAME = "iocontext.input.name";
   public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
   public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
 
@@ -294,39 +291,6 @@ public final class Utilities {
     return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
   }
 
-  public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir,
-      boolean useCache) {
-    for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) {
-      setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache);
-      String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
-      if (prefixes == null) {
-        prefixes = baseWork.getName();
-      } else {
-        prefixes = prefixes + "," + baseWork.getName();
-      }
-      conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes);
-    }
-
-    // nothing to return
-    return null;
-  }
-
-  public static BaseWork getMergeWork(JobConf jconf) {
-    if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
-        || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
-      return null;
-    }
-    return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
-  }
-
-  public static BaseWork getMergeWork(JobConf jconf, String prefix) {
-    if (prefix == null || prefix.isEmpty()) {
-      return null;
-    }
-
-    return getBaseWork(jconf, prefix + MERGE_PLAN_NAME);
-  }
-
   public static void cacheBaseWork(Configuration conf, String name, BaseWork work,
       Path hiveScratchDir) {
     try {
@@ -411,8 +375,6 @@ public final class Utilities {
             throw new RuntimeException("unable to determine work from configuration ."
                 + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
           }
-        } else if (name.contains(MERGE_PLAN_NAME)) {
-          gWork = deserializePlan(in, MapWork.class, conf);
         }
         gWorkMap.put(path, gWork);
       } else {
@@ -646,14 +608,8 @@ public final class Utilities {
   }
 
   public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
-    String useName = conf.get(INPUT_NAME);
-    if (useName == null) {
-      useName = "mapreduce";
-    }
-    conf.set(INPUT_NAME, useName);
     setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
     if (w.getReduceWork() != null) {
-      conf.set(INPUT_NAME, useName);
       setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
     }
   }
@@ -1890,7 +1846,7 @@ public final class Utilities {
 
       for (int i = 0; i < parts.length; ++i) {
         assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
-            + " is not a directory";
+            + " is not a direcgtory";
         FileStatus[] items = fs.listStatus(parts[i].getPath());
 
         // remove empty directory since DP insert should not generate empty partitions.

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Mon Oct  6 03:44:13 2014
@@ -78,11 +78,10 @@ public class ExecMapper extends MapReduc
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
 
-  private ExecMapperContext execContext = null;
+  private final ExecMapperContext execContext = new ExecMapperContext();
 
   @Override
   public void configure(JobConf job) {
-    execContext = new ExecMapperContext(job);
     // Allocate the bean at the beginning -
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
@@ -293,7 +292,6 @@ public class ExecMapper extends MapReduc
       this.rp = rp;
     }
 
-    @Override
     public void func(Operator op) {
       Map<Enum<?>, Long> opStats = op.getStats();
       for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Mon Oct  6 03:44:13 2014
@@ -22,7 +22,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.mapred.JobConf;
@@ -61,9 +60,8 @@ public class ExecMapperContext {
     this.currentBigBucketFile = currentBigBucketFile;
   }
 
-  public ExecMapperContext(JobConf jc) {
-    this.jc = jc;
-    ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME));
+  public ExecMapperContext() {
+    ioCxt = IOContext.get();
   }
 
   public void clear() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Mon Oct  6 03:44:13 2014
@@ -91,7 +91,7 @@ public class MapredLocalTask extends Tas
 
   // not sure we need this exec context; but all the operators in the work
   // will pass this context throught
-  private ExecMapperContext execContext = null;
+  private ExecMapperContext execContext = new ExecMapperContext();
 
   private Process executor;
 
@@ -113,7 +113,6 @@ public class MapredLocalTask extends Tas
   public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
     super.initialize(conf, queryPlan, driverContext);
     job = new JobConf(conf, ExecDriver.class);
-    execContext = new ExecMapperContext(job);
     //we don't use the HadoopJobExecHooks for local tasks
     this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null);
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Mon Oct  6 03:44:13 2014
@@ -59,14 +59,16 @@ public class SparkMapRecordHandler exten
   private static final String PLAN_KEY = "__MAP_PLAN__";
   private MapOperator mo;
   public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
+  private boolean done;
+
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
 
-  private ExecMapperContext execContext;
+  private final ExecMapperContext execContext = new ExecMapperContext();
 
   public void init(JobConf job, OutputCollector output, Reporter reporter) {
     super.init(job, output, reporter);
-    execContext = new ExecMapperContext(job);
+
     isLogInfoEnabled = l4j.isInfoEnabled();
     ObjectCache cache = ObjectCacheFactory.getCache(job);
 
@@ -87,8 +89,6 @@ public class SparkMapRecordHandler exten
         mo = new MapOperator();
       }
       mo.setConf(mrwork);
-      l4j.info("Main input name is " + mrwork.getName());
-      jc.set(Utilities.INPUT_NAME, mrwork.getName());
       // initialize map operator
       mo.setChildren(job);
       l4j.info(mo.dump(0));

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Mon Oct  6 03:44:13 2014
@@ -197,7 +197,6 @@ public class SparkPlanGenerator {
     }
     if (work instanceof MapWork) {
       List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false);
-      cloned.set(Utilities.INPUT_NAME, work.getName());
       Utilities.setInputPaths(cloned, inputPaths);
       Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false);
       Utilities.createTmpDirs(cloned, (MapWork) work);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Mon Oct  6 03:44:13 2014
@@ -31,10 +31,7 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.FileSplit;
@@ -82,14 +79,9 @@ public class CustomPartitionVertex exten
   private List<InputDataInformationEvent> dataInformationEvents;
   private int numBuckets = -1;
   private Configuration conf = null;
+  private boolean rootVertexInitialized = false;
   private final SplitGrouper grouper = new SplitGrouper();
   private int taskCount = 0;
-  private VertexType vertexType;
-  private String mainWorkName;
-  private final Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
-
-  private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap =
-      new HashMap<String, Multimap<Integer, InputSplit>>();
 
   public CustomPartitionVertex(VertexManagerPluginContext context) {
     super(context);
@@ -98,18 +90,8 @@ public class CustomPartitionVertex exten
   @Override
   public void initialize() {
     this.context = getContext();
-    ByteBuffer payload = context.getUserPayload().getPayload();
-    CustomVertexConfiguration vertexConf = new CustomVertexConfiguration();
-    DataInputByteBuffer dibb = new DataInputByteBuffer();
-    dibb.reset(payload);
-    try {
-      vertexConf.readFields(dibb);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    this.numBuckets = vertexConf.getNumBuckets();
-    this.mainWorkName = vertexConf.getInputName();
-    this.vertexType = vertexConf.getVertexType();
+    ByteBuffer byteBuf = context.getUserPayload().getPayload();
+    this.numBuckets = byteBuf.getInt();
   }
 
   @Override
@@ -131,12 +113,17 @@ public class CustomPartitionVertex exten
   public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
   }
 
-  // One call per root Input
+  // One call per root Input - and for now only one is handled.
   @Override
   public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
       List<Event> events) {
-    LOG.info("On root vertex initialized " + inputName);
 
+    // Ideally, since there's only 1 Input expected at the moment -
+    // ensure this method is called only once. Tez will call it once per Root
+    // Input.
+    Preconditions.checkState(rootVertexInitialized == false);
+    LOG.info("Root vertex not initialized");
+    rootVertexInitialized = true;
     try {
       // This is using the payload from the RootVertexInitializer corresponding
       // to InputName. Ideally it should be using it's own configuration class -
@@ -177,6 +164,9 @@ public class CustomPartitionVertex exten
         // No tasks should have been started yet. Checked by initial state
         // check.
         Preconditions.checkState(dataInformationEventSeen == false);
+        Preconditions
+            .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
+                "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
         InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
 
         // The vertex cannot be configured until all DataEvents are seen - to
@@ -230,55 +220,21 @@ public class CustomPartitionVertex exten
             (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
         Multimap<Integer, InputSplit> groupedSplit =
             HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
-                availableSlots, inputName);
+            availableSlots);
         bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
       }
 
-      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap);
-      if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) {
-        /*
-         * this is the small table side. In case of SMB join, we may need to send each split to the
-         * corresponding bucket-based task on the other side. In case a split needs to go to
-         * multiple downstream tasks, we need to clone the event and send it to the right
-         * destination.
-         */
-        processAllSideEvents(inputName, bucketToGroupedSplitMap);
-      } else {
-        processAllEvents(inputName, bucketToGroupedSplitMap);
-      }
+      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks");
+      processAllEvents(inputName, bucketToGroupedSplitMap);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  private void processAllSideEvents(String inputName,
-      Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
-    // the bucket to task map should have been setup by the big table.
-    if (bucketToTaskMap.isEmpty()) {
-      inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
-      return;
-    }
-    List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>();
-    for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
-      Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
-      for (Integer task : destTasks) {
-        for (InputSplit split : entry.getValue()) {
-          MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
-          InputDataInformationEvent diEvent =
-              InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
-                  .toByteString().asReadOnlyByteBuffer());
-          diEvent.setTargetIndex(task);
-          taskEvents.add(diEvent);
-        }
-      }
-    }
-
-    context.addRootInputEvents(inputName, taskEvents);
-  }
-
   private void processAllEvents(String inputName,
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
 
+    Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
     List<InputSplit> finalSplits = Lists.newLinkedList();
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
       int bucketNum = entry.getKey();
@@ -292,13 +248,11 @@ public class CustomPartitionVertex exten
 
     // Construct the EdgeManager descriptor to be used by all edges which need
     // the routing table.
-    EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null;
-    if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES)
-        || (vertexType == VertexType.INITIALIZED_EDGES)) {
-      hiveEdgeManagerDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
-      UserPayload payload = getBytePayload(bucketToTaskMap);
-      hiveEdgeManagerDesc.setUserPayload(payload);
-    }
+    EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
+        EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+    UserPayload payload = getBytePayload(bucketToTaskMap);
+    hiveEdgeManagerDesc.setUserPayload(payload);
+
     Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
 
     // Replace the edge manager for all vertices which have routing type custom.
@@ -331,21 +285,13 @@ public class CustomPartitionVertex exten
     rootInputSpecUpdate.put(
         inputName,
         InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
-    if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) {
-      context.setVertexParallelism(
-          taskCount,
-          VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
-              .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
-    }
+    context.setVertexParallelism(
+        taskCount,
+        VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+            .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
 
     // Set the actual events for the tasks.
     context.addRootInputEvents(inputName, taskEvents);
-    if (inputToGroupedSplitMap.isEmpty() == false) {
-      for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) {
-        processAllSideEvents(entry.getKey(), entry.getValue());
-      }
-      inputToGroupedSplitMap.clear();
-    }
   }
 
   UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
@@ -369,8 +315,7 @@ public class CustomPartitionVertex exten
 
     if (!(inputSplit instanceof FileSplit)) {
       throw new UnsupportedOperationException(
-          "Cannot handle splits other than FileSplit for the moment. Current input split type: "
-              + inputSplit.getClass().getSimpleName());
+          "Cannot handle splits other than FileSplit for the moment");
     }
     return (FileSplit) inputSplit;
   }
@@ -382,6 +327,7 @@ public class CustomPartitionVertex exten
       Map<String, List<FileSplit>> pathFileSplitsMap) {
 
     int bucketNum = 0;
+    int fsCount = 0;
 
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         ArrayListMultimap.<Integer, InputSplit> create();
@@ -389,20 +335,14 @@ public class CustomPartitionVertex exten
     for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       int bucketId = bucketNum % numBuckets;
       for (FileSplit fsplit : entry.getValue()) {
+        fsCount++;
         bucketToInitialSplitMap.put(bucketId, fsplit);
       }
       bucketNum++;
     }
 
-    if (bucketNum < numBuckets) {
-      int loopedBucketId = 0;
-      for (; bucketNum < numBuckets; bucketNum++) {
-        for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) {
-          bucketToInitialSplitMap.put(bucketNum, fsplit);
-        }
-        loopedBucketId++;
-      }
-    }
+    LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: "
+        + pathFileSplitsMap.size());
 
     return bucketToInitialSplitMap;
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Oct  6 03:44:13 2014
@@ -20,23 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
-import javax.security.auth.login.LoginException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -49,7 +32,6 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -65,12 +47,10 @@ import org.apache.hadoop.hive.ql.io.merg
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -110,16 +90,12 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
 import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
@@ -128,6 +104,21 @@ import org.apache.tez.runtime.library.co
 import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
  * map and reduce work to tez vertices and edges. It handles configuration
@@ -139,11 +130,6 @@ public class DagUtils {
   private static final Log LOG = LogFactory.getLog(DagUtils.class.getName());
   private static final String TEZ_DIR = "_tez_scratch_dir";
   private static DagUtils instance;
-  // The merge file being currently processed.
-  public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX =
-      "hive.tez.current.merge.file.prefix";
-  // "A comma separated list of work names used as prefix.
-  public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes";
 
   private void addCredentials(MapWork mapWork, DAG dag) {
     Set<String> paths = mapWork.getPathToAliases().keySet();
@@ -252,8 +238,8 @@ public class DagUtils {
    * endpoints.
    */
   @SuppressWarnings("rawtypes")
-  public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w,
-      TezEdgeProperty edgeProp, VertexType vertexType)
+  public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
+      Vertex w, TezEdgeProperty edgeProp)
     throws IOException {
 
     Class mergeInputClass;
@@ -268,14 +254,10 @@ public class DagUtils {
     case CUSTOM_EDGE: {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
-      CustomVertexConfiguration vertexConf =
-          new CustomVertexConfiguration(numBuckets, vertexType, "");
-      DataOutputBuffer dob = new DataOutputBuffer();
-      vertexConf.write(dob);
       VertexManagerPluginDescriptor desc =
           VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
-      byte[] userPayloadBytes = dob.getData();
-      ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
+      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+      userPayload.flip();
       desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
@@ -307,21 +289,17 @@ public class DagUtils {
    * @param w The second vertex (sink)
    * @return
    */
-  public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp,
-      VertexType vertexType)
+  public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
+      TezEdgeProperty edgeProp)
     throws IOException {
 
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      CustomVertexConfiguration vertexConf =
-          new CustomVertexConfiguration(numBuckets, vertexType, "");
-      DataOutputBuffer dob = new DataOutputBuffer();
-      vertexConf.write(dob);
+      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+      userPayload.flip();
       VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
           CustomPartitionVertex.class.getName());
-      byte[] userPayloadBytes = dob.getData();
-      ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
       desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
@@ -427,7 +405,7 @@ public class DagUtils {
    * from yarn. Falls back to Map-reduce's map size if tez
    * container size isn't set.
    */
-  public static Resource getContainerResource(Configuration conf) {
+  private Resource getContainerResource(Configuration conf) {
     int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
       HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
       conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
@@ -465,61 +443,12 @@ public class DagUtils {
     return MRHelpers.getJavaOptsForMRMapper(conf);
   }
 
-  private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
-      List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx,
-      VertexType vertexType)
-      throws Exception {
-    Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
-    if (mergeJoinWork.getMainWork() instanceof MapWork) {
-      List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
-      MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
-      CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
-      Vertex mergeVx =
-          createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
-
-      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
-      // here would cause pre-mature grouping which would be incorrect.
-      Class inputFormatClass = HiveInputFormat.class;
-      conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
-      // mapreduce.tez.input.initializer.serialize.event.payload should be set
-      // to false when using this plug-in to avoid getting a serialized event at run-time.
-      conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false);
-      for (int i = 0; i < mapWorkList.size(); i++) {
-
-        mapWork = (MapWork) (mapWorkList.get(i));
-        conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName());
-        conf.set(Utilities.INPUT_NAME, mapWork.getName());
-        LOG.info("Going through each work and adding MultiMRInput");
-        mergeVx.addDataSource(mapWork.getName(),
-            MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build());
-      }
-
-      VertexManagerPluginDescriptor desc =
-        VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
-      CustomVertexConfiguration vertexConf =
-          new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf()
-              .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias());
-      DataOutputBuffer dob = new DataOutputBuffer();
-      vertexConf.write(dob);
-      byte[] userPayload = dob.getData();
-      desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
-      mergeVx.setVertexManagerPlugin(desc);
-      return mergeVx;
-    } else {
-      Vertex mergeVx =
-          createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs,
-              mrScratchDir, ctx);
-      return mergeVx;
-    }
-  }
-
   /*
    * Helper function to create Vertex from MapWork.
    */
   private Vertex createVertex(JobConf conf, MapWork mapWork,
       LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
-      Path mrScratchDir, Context ctx, VertexType vertexType)
-      throws Exception {
+      Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception {
 
     Path tezDir = getTezDir(mrScratchDir);
 
@@ -541,8 +470,15 @@ public class DagUtils {
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
-    boolean vertexHasCustomInput = VertexType.isCustomInputType(vertexType);
-    LOG.info("Vertex has custom input? " + vertexHasCustomInput);
+    boolean vertexHasCustomInput = false;
+    if (tezWork != null) {
+      for (BaseWork baseWork : tezWork.getParents(mapWork)) {
+        if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) {
+          vertexHasCustomInput = true;
+        }
+      }
+    }
+
     if (vertexHasCustomInput) {
       groupSplitsInInputInitializer = false;
       // grouping happens in execution phase. The input payload should not enable grouping here,
@@ -577,8 +513,6 @@ public class DagUtils {
       }
     }
 
-    // remember mapping of plan to input
-    conf.set(Utilities.INPUT_NAME, mapWork.getName());
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
 
@@ -659,7 +593,6 @@ public class DagUtils {
       Path mrScratchDir, Context ctx) throws Exception {
 
     // set up operator plan
-    conf.set(Utilities.INPUT_NAME, reduceWork.getName());
     Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false);
 
     // create the directories FileSinkOperators need
@@ -1004,22 +937,12 @@ public class DagUtils {
       return initializeVertexConf(conf, context, (MapWork)work);
     } else if (work instanceof ReduceWork) {
       return initializeVertexConf(conf, context, (ReduceWork)work);
-    } else if (work instanceof MergeJoinWork) {
-      return initializeVertexConf(conf, context, (MergeJoinWork) work);
     } else {
       assert false;
       return null;
     }
   }
 
-  private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) {
-    if (work.getMainWork() instanceof MapWork) {
-      return initializeVertexConf(conf, context, (MapWork) (work.getMainWork()));
-    } else {
-      return initializeVertexConf(conf, context, (ReduceWork) (work.getMainWork()));
-    }
-  }
-
   /**
    * Create a vertex from a given work object.
    *
@@ -1035,21 +958,18 @@ public class DagUtils {
    */
   public Vertex createVertex(JobConf conf, BaseWork work,
       Path scratchDir, LocalResource appJarLr,
-      List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,
-      TezWork tezWork, VertexType vertexType) throws Exception {
+      List<LocalResource> additionalLr,
+      FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception {
 
     Vertex v = null;
     // simply dispatch the call to the right method for the actual (sub-) type of
     // BaseWork.
     if (work instanceof MapWork) {
-      v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx,
-              vertexType);
+      v = createVertex(conf, (MapWork) work, appJarLr,
+          additionalLr, fileSystem, scratchDir, ctx, tezWork);
     } else if (work instanceof ReduceWork) {
       v = createVertex(conf, (ReduceWork) work, appJarLr,
           additionalLr, fileSystem, scratchDir, ctx);
-    } else if (work instanceof MergeJoinWork) {
-      v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
-              ctx, vertexType);
     } else {
       // something is seriously wrong if this is happening
       throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Mon Oct  6 03:44:13 2014
@@ -152,21 +152,8 @@ public class HiveSplitGenerator extends 
   public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
       Configuration conf, InputSplit[] splits, float waves, int availableSlots)
       throws Exception {
-    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null);
-  }
-
-  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
-      Configuration conf, InputSplit[] splits, float waves, int availableSlots,
-      String inputName) throws Exception {
 
-    MapWork work = null;
-    if (inputName != null) {
-      work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
-      // work can still be null if there is no merge work for this input
-    }
-    if (work == null) {
-      work = Utilities.getMapWork(jobConf);
-    }
+    MapWork work = Utilities.getMapWork(jobConf);
 
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Mon Oct  6 03:44:13 2014
@@ -17,20 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -42,17 +36,15 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
-import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -66,61 +58,27 @@ public class MapRecordProcessor extends 
 
 
   private MapOperator mapOp;
-  private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>();
   public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
-  private MapRecordSource[] sources;
-  private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>();
-  private int position = 0;
-  private boolean foundCachedMergeWork = false;
-  MRInputLegacy legacyMRInput = null;
-  private ExecMapperContext execContext = null;
+  private final ExecMapperContext execContext = new ExecMapperContext();
   private boolean abort = false;
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MapWork mapWork;
-  List<MapWork> mergeWorkList = null;
-  private static Map<Integer, DummyStoreOperator> connectOps =
-      new TreeMap<Integer, DummyStoreOperator>();
 
-  public MapRecordProcessor(JobConf jconf) throws Exception {
+  public MapRecordProcessor(JobConf jconf) {
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
-    execContext = new ExecMapperContext(jconf);
     execContext.setJc(jconf);
     // create map and fetch operators
     mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
     if (mapWork == null) {
       mapWork = Utilities.getMapWork(jconf);
       cache.cache(MAP_PLAN_KEY, mapWork);
-      l4j.debug("Plan: " + mapWork);
+      l4j.info("Plan: "+mapWork);
       for (String s: mapWork.getAliases()) {
-        l4j.debug("Alias: " + s);
+        l4j.info("Alias: "+s);
       }
     } else {
       Utilities.setMapWork(jconf, mapWork);
     }
-
-    String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
-    if (prefixes != null) {
-      mergeWorkList = new ArrayList<MapWork>();
-      for (String prefix : prefixes.split(",")) {
-        MapWork mergeMapWork = (MapWork) cache.retrieve(prefix);
-        if (mergeMapWork != null) {
-          l4j.info("Found merge work in cache");
-          foundCachedMergeWork = true;
-          mergeWorkList.add(mergeMapWork);
-          continue;
-        }
-        if (foundCachedMergeWork) {
-          throw new Exception(
-              "Should find all work in cache else operator pipeline will be in non-deterministic state");
-        }
-
-        if ((prefix != null) && (prefix.isEmpty() == false)) {
-          mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, prefix);
-          mergeWorkList.add(mergeMapWork);
-          cache.cache(prefix, mergeMapWork);
-        }
-      }
-    }
   }
 
   @Override
@@ -130,8 +88,8 @@ public class MapRecordProcessor extends 
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
 
     //Update JobConf using MRInput, info like filename comes via this
-    legacyMRInput = getMRInput(inputs);
-    Configuration updatedConf = legacyMRInput.getConfigUpdates();
+    MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
+    Configuration updatedConf = mrInput.getConfigUpdates();
     if (updatedConf != null) {
       for (Entry<String, String> entry : updatedConf) {
         jconf.set(entry.getKey(), entry.getValue());
@@ -141,52 +99,20 @@ public class MapRecordProcessor extends 
     createOutputMap();
     // Start all the Outputs.
     for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
-      l4j.debug("Starting Output: " + outputEntry.getKey());
+      l4j.info("Starting Output: " + outputEntry.getKey());
       outputEntry.getValue().start();
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
 
     try {
-
       if (mapWork.getVectorMode()) {
         mapOp = new VectorMapOperator();
       } else {
         mapOp = new MapOperator();
       }
 
-      connectOps.clear();
-      if (mergeWorkList != null) {
-        MapOperator mergeMapOp = null;
-        for (MapWork mergeMapWork : mergeWorkList) {
-          processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs
-              .get(mergeMapWork.getName()))));
-          if (mergeMapWork.getVectorMode()) {
-            mergeMapOp = new VectorMapOperator();
-          } else {
-            mergeMapOp = new MapOperator();
-          }
-
-          mergeMapOpList.add(mergeMapOp);
-          // initialize the merge operators first.
-          if (mergeMapOp != null) {
-            mergeMapOp.setConf(mergeMapWork);
-            l4j.info("Input name is " + mergeMapWork.getName());
-            jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
-            mergeMapOp.setChildren(jconf);
-            if (foundCachedMergeWork == false) {
-              DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
-              connectOps.put(mergeMapWork.getTag(), dummyOp);
-            }
-            mergeMapOp.setExecContext(new ExecMapperContext(jconf));
-            mergeMapOp.initializeLocalWork(jconf);
-          }
-        }
-      }
-
       // initialize map operator
       mapOp.setConf(mapWork);
-      l4j.info("Main input name is " + mapWork.getName());
-      jconf.set(Utilities.INPUT_NAME, mapWork.getName());
       mapOp.setChildren(jconf);
       l4j.info(mapOp.dump(0));
 
@@ -195,21 +121,12 @@ public class MapRecordProcessor extends 
       ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
       mapOp.setExecContext(execContext);
       mapOp.initializeLocalWork(jconf);
-
-      initializeMapRecordSources();
       mapOp.initialize(jconf, null);
-      if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
-        for (MapOperator mergeMapOp : mergeMapOpList) {
-          jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
-          mergeMapOp.initialize(jconf, null);
-        }
-      }
 
       // Initialization isn't finished until all parents of all operators
       // are initialized. For broadcast joins that means initializing the
       // dummy parent operators as well.
       List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps();
-      jconf.set(Utilities.INPUT_NAME, mapWork.getName());
       if (dummyOps != null) {
         for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
           dummyOp.setExecContext(execContext);
@@ -234,46 +151,54 @@ public class MapRecordProcessor extends 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
   }
 
-  private void initializeMapRecordSources() throws Exception {
-    int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself
-    sources = new MapRecordSource[size];
-    KeyValueReader reader = legacyMRInput.getReader();
-    position = mapOp.getConf().getTag();
-    sources[position] = new MapRecordSource();
-    sources[position].init(jconf, mapOp, reader);
-    for (MapOperator mapOp : mergeMapOpList) {
-      int tag = mapOp.getConf().getTag();
-      sources[tag] = new MapRecordSource();
-      String inputName = mapOp.getConf().getName();
-      MultiMRInput multiMRInput = multiMRInputMap.get(inputName);
-      Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders();
-      l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName);
-      List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders);
-      reader = new KeyValueInputMerger(kvReaderList);
-      sources[tag].init(jconf, mapOp, reader);
-    }
-    ((TezContext) MapredContext.get()).setRecordSources(sources);
-  }
+  @Override
+  void run() throws IOException{
 
-  private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) {
-    for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) {
-      if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) {
-        return (DummyStoreOperator) childOp;
-      } else {
-        return getJoinParentOp(childOp);
+    MRInputLegacy in = TezProcessor.getMRInput(inputs);
+    KeyValueReader reader = in.getReader();
+
+    //process records until done
+    while(reader.next()){
+      //ignore the key for maps -  reader.getCurrentKey();
+      Object value = reader.getCurrentValue();
+      boolean needMore = processRow(value);
+      if(!needMore){
+        break;
       }
     }
-    return null;
   }
 
-  @Override
-  void run() throws Exception {
 
-    while (sources[position].pushRecord()) {
-      if (isLogInfoEnabled) {
-        logProgress();
+  /**
+   * @param value  value to process
+   * @return true if it is not done and can take more inputs
+   */
+  private boolean processRow(Object value) {
+    // reset the execContext for each new row
+    execContext.resetRow();
+
+    try {
+      if (mapOp.getDone()) {
+        return false; //done
+      } else {
+        // Since there is no concept of a group, we don't invoke
+        // startGroup/endGroup for a mapper
+        mapOp.process((Writable)value);
+        if (isLogInfoEnabled) {
+          logProgress();
+        }
+      }
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        l4j.fatal(StringUtils.stringifyException(e));
+        throw new RuntimeException(e);
       }
     }
+    return true; //give me more
   }
 
   @Override
@@ -289,11 +214,6 @@ public class MapRecordProcessor extends 
         return;
       }
       mapOp.close(abort);
-      if (mergeMapOpList.isEmpty() == false) {
-        for (MapOperator mergeMapOp : mergeMapOpList) {
-          mergeMapOp.close(abort);
-        }
-      }
 
       // Need to close the dummyOps as well. The operator pipeline
       // is not considered "closed/done" unless all operators are
@@ -322,27 +242,4 @@ public class MapRecordProcessor extends 
       MapredContext.close();
     }
   }
-
-  public static Map<Integer, DummyStoreOperator> getConnectOps() {
-    return connectOps;
-  }
-
-  private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
-    // there should be only one MRInput
-    MRInputLegacy theMRInput = null;
-    l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray()));
-    for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
-      if (inp.getValue() instanceof MRInputLegacy) {
-        if (theMRInput != null) {
-          throw new IllegalArgumentException("Only one MRInput is expected");
-        }
-        // a better logic would be to find the alias
-        theMRInput = (MRInputLegacy) inp.getValue();
-      } else if (inp.getValue() instanceof MultiMRInput) {
-        multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue());
-      }
-    }
-    theMRInput.init();
-    return theMRInput;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Mon Oct  6 03:44:13 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -41,9 +40,7 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
-import java.io.IOException;
 import java.util.Map;
-import java.util.Map.Entry;
 
 /**
  * Record processor for fast merging of files.
@@ -54,12 +51,11 @@ public class MergeFileRecordProcessor ex
       .getLog(MergeFileRecordProcessor.class);
 
   protected Operator<? extends OperatorDesc> mergeOp;
-  private ExecMapperContext execContext = null;
+  private final ExecMapperContext execContext = new ExecMapperContext();
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MergeFileWork mfWork;
-  MRInputLegacy mrInput = null;
   private boolean abort = false;
-  private final Object[] row = new Object[2];
+  private Object[] row = new Object[2];
 
   @Override
   void init(JobConf jconf, ProcessorContext processorContext,
@@ -67,16 +63,16 @@ public class MergeFileRecordProcessor ex
       Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
-    execContext = new ExecMapperContext(jconf);
 
     //Update JobConf using MRInput, info like filename comes via this
-    mrInput = getMRInput(inputs);
+    MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
     Configuration updatedConf = mrInput.getConfigUpdates();
     if (updatedConf != null) {
       for (Map.Entry<String, String> entry : updatedConf) {
         jconf.set(entry.getKey(), entry.getValue());
       }
     }
+
     createOutputMap();
     // Start all the Outputs.
     for (Map.Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
@@ -131,7 +127,8 @@ public class MergeFileRecordProcessor ex
 
   @Override
   void run() throws Exception {
-    KeyValueReader reader = mrInput.getReader();
+    MRInputLegacy in = TezProcessor.getMRInput(inputs);
+    KeyValueReader reader = in.getReader();
 
     //process records until done
     while (reader.next()) {
@@ -208,23 +205,4 @@ public class MergeFileRecordProcessor ex
     return true; //give me more
   }
 
-  private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
-    // there should be only one MRInput
-    MRInputLegacy theMRInput = null;
-    for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
-      if (inp.getValue() instanceof MRInputLegacy) {
-        if (theMRInput != null) {
-          throw new IllegalArgumentException("Only one MRInput is expected");
-        }
-        // a better logic would be to find the alias
-        theMRInput = (MRInputLegacy) inp.getValue();
-      } else {
-        throw new IOException("Expecting only one input of type MRInputLegacy. Found type: "
-            + inp.getClass().getCanonicalName());
-      }
-    }
-    theMRInput.init();
-
-    return theMRInput;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java Mon Oct  6 03:44:13 2014
@@ -39,6 +39,12 @@ public class MergeFileTezProcessor exten
   public void run(Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs) throws Exception {
     rproc = new MergeFileRecordProcessor();
+    MRInputLegacy mrInput = getMRInput(inputs);
+    try {
+      mrInput.init();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed while initializing MRInput", e);
+    }
     initializeAndRunProcessor(inputs, outputs);
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Mon Oct  6 03:44:13 2014
@@ -115,7 +115,8 @@ public abstract class RecordProcessor  {
    */
   protected void logCloseInfo() {
     long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-    l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory);
+    l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
+        + used_memory);
   }
 
   /**
@@ -125,7 +126,8 @@ public abstract class RecordProcessor  {
     numRows++;
     if (numRows == nextUpdateCntr) {
       long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-      l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory);
+      l4j.info("ExecMapper: processing " + numRows
+          + " rows: used memory = " + used_memory);
       nextUpdateCntr = getNextUpdateRecordCounter(numRows);
     }
   }