You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/03/19 22:14:54 UTC

svn commit: r1579403 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: HashTableLoader.java MapRecordProcessor.java RecordProcessor.java ReduceRecordProcessor.java TezCacheAccess.java TezContext.java TezProcessor.java

Author: gunther
Date: Wed Mar 19 21:14:53 2014
New Revision: 1579403

URL: http://svn.apache.org/r1579403
Log:
HIVE-6613: Control when spcific Inputs / Outputs are started (Siddharth Seth via Gunther Hagleitner)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1579403&r1=1579402&r2=1579403&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Wed Mar 19 21:14:53 2014
@@ -36,8 +36,8 @@ import org.apache.hadoop.hive.ql.exec.pe
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -75,6 +75,7 @@ public class HashTableLoader implements 
         HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
     boolean useLazyRows = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINLAZYHASHTABLE);
 
+    TezCacheAccess tezCacheAccess = TezCacheAccess.createInstance(hconf);
     // We only check if we can use optimized keys here; that is ok because we don't
     // create optimized keys in MapJoin if hash map doesn't have optimized keys.
     if (!HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDKEYS)) {
@@ -86,7 +87,8 @@ public class HashTableLoader implements 
         continue;
       }
 
-      LogicalInput input = tezContext.getInput(parentToInput.get(pos));
+      String inputName = parentToInput.get(pos);
+      LogicalInput input = tezContext.getInput(inputName);
 
       try {
         KeyValueReader kvReader = (KeyValueReader) input.getReader();
@@ -119,6 +121,9 @@ public class HashTableLoader implements 
       } catch (Exception e) {
         throw new HiveException(e);
       }
+      // Register that the Input has been cached.
+      tezCacheAccess.registerCachedInput(inputName);
+      LOG.info("Setting Input: " + inputName + " as cached");
     }
     if (lastKey == null) {
       lastKey = new MapJoinKeyObject(); // No rows in tables, the key type doesn't matter.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1579403&r1=1579402&r2=1579403&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Mar 19 21:14:53 2014
@@ -35,24 +35,26 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
  * Process input from tez LogicalInput and write output - for a map plan
  * Just pump the records through the query plan.
  */
-public class MapRecordProcessor  extends RecordProcessor{
+public class MapRecordProcessor extends RecordProcessor {
 
 
   private MapOperator mapOp;
@@ -63,18 +65,13 @@ public class MapRecordProcessor  extends
   private MapWork mapWork;
 
   @Override
-  void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
-      Map<String, OutputCollector> outMap){
+  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+      Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
-    super.init(jconf, mrReporter, inputs, outMap);
+    super.init(jconf, processorContext, mrReporter, inputs, outputs);
 
     //Update JobConf using MRInput, info like filename comes via this
-    MRInputLegacy mrInput = getMRInput(inputs);
-    try {
-      mrInput.init();
-    } catch (IOException e) {
-      throw new RuntimeException("Failed while initializing MRInput", e);
-    }
+    MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
     Configuration updatedConf = mrInput.getConfigUpdates();
     if (updatedConf != null) {
       for (Entry<String, String> entry : updatedConf) {
@@ -82,6 +79,14 @@ public class MapRecordProcessor  extends
       }
     }
 
+    createOutputMap();
+    // Start all the Outputs.
+    for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
+      l4j.info("Starting Output: " + outputEntry.getKey());
+      ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
+      outputEntry.getValue().start();
+    }
+
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
     try {
 
@@ -143,25 +148,10 @@ public class MapRecordProcessor  extends
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
   }
 
-  private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) {
-    //there should be only one MRInput
-    MRInputLegacy theMRInput = null;
-    for(LogicalInput inp : inputs.values()){
-      if(inp instanceof MRInputLegacy){
-        if(theMRInput != null){
-          throw new IllegalArgumentException("Only one MRInput is expected");
-        }
-        //a better logic would be to find the alias
-        theMRInput = (MRInputLegacy)inp;
-      }
-    }
-    return theMRInput;
-  }
-
   @Override
   void run() throws IOException{
 
-    MRInputLegacy in = getMRInput(inputs);
+    MRInputLegacy in = TezProcessor.getMRInput(inputs);
     KeyValueReader reader = in.getReader();
 
     //process records until done

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1579403&r1=1579402&r2=1579403&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Mar 19 21:14:53 2014
@@ -16,20 +16,26 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 /**
  * Process input from tez LogicalInput and write output
@@ -39,7 +45,9 @@ public abstract class RecordProcessor  {
 
   protected JobConf jconf;
   protected Map<String, LogicalInput> inputs;
+  protected Map<String, LogicalOutput> outputs;
   protected Map<String, OutputCollector> outMap;
+  protected TezProcessorContext processorContext;
 
   public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
 
@@ -54,20 +62,22 @@ public abstract class RecordProcessor  {
   protected PerfLogger perfLogger = PerfLogger.getPerfLogger();
   protected String CLASS_NAME = RecordProcessor.class.getName();
 
-
   /**
    * Common initialization code for RecordProcessors
    * @param jconf
+   * @param processorContext the {@link TezProcessorContext}
    * @param mrReporter
-   * @param inputs
-   * @param out
+   * @param inputs map of Input names to {@link LogicalInput}s
+   * @param outputs map of Output names to {@link LogicalOutput}s
+   * @throws Exception
    */
-  void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
-      Map<String, OutputCollector> outMap){
+  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+      Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     this.jconf = jconf;
     this.reporter = mrReporter;
     this.inputs = inputs;
-    this.outMap = outMap;
+    this.outputs = outputs;
+    this.processorContext = processorContext;
 
     // Allocate the bean at the beginning -
     memoryMXBean = ManagementFactory.getMemoryMXBean();
@@ -92,9 +102,9 @@ public abstract class RecordProcessor  {
 
   /**
    * start processing the inputs and writing output
-   * @throws IOException
+   * @throws Exception
    */
-  abstract void run() throws IOException;
+  abstract void run() throws Exception;
 
 
   abstract void close();
@@ -132,4 +142,12 @@ public abstract class RecordProcessor  {
     return 10 * cntr;
   }
 
+  protected void createOutputMap() {
+    Preconditions.checkState(outMap == null, "Outputs should only be setup once");
+    outMap = Maps.newHashMap();
+    for (Entry<String, LogicalOutput> entry : outputs.entrySet()) {
+      TezKVOutputCollector collector = new TezKVOutputCollector(entry.getValue());
+      outMap.put(entry.getKey(), collector);
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1579403&r1=1579402&r2=1579403&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Mar 19 21:14:53 2014
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -53,9 +55,14 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 
+import com.google.common.collect.Lists;
+
 /**
  * Process input from tez LogicalInput and write output - for a map plan
  * Just pump the records through the query plan.
@@ -88,10 +95,10 @@ public class ReduceRecordProcessor  exte
   List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
 
   @Override
-  void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
-      Map<String, OutputCollector> outMap){
+  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+      Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
-    super.init(jconf, mrReporter, inputs, outMap);
+    super.init(jconf, processorContext, mrReporter, inputs, outputs);
 
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
 
@@ -163,6 +170,7 @@ public class ReduceRecordProcessor  exte
       if (dummyOps != null) {
         children.addAll(dummyOps);
       }
+      createOutputMap();
       OperatorUtils.setChildrenCollector(children, outMap);
 
       reducer.setReporter(reporter);
@@ -182,10 +190,20 @@ public class ReduceRecordProcessor  exte
   }
 
   @Override
-  void run() throws IOException{
+  void run() throws Exception {
     List<LogicalInput> shuffleInputs = getShuffleInputs(inputs);
-    KeyValuesReader kvsReader;
+    if (shuffleInputs != null) {
+      l4j.info("Waiting for ShuffleInputs to become ready");
+      processorContext.waitForAllInputsReady(new ArrayList<Input>(shuffleInputs));
+    }
 
+    for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
+      l4j.info("Starting Output: " + outputEntry.getKey());
+      ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
+      outputEntry.getValue().start();
+    }
+
+    KeyValuesReader kvsReader;
     try {
       if(shuffleInputs.size() == 1){
         //no merging of inputs required

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java?rev=1579403&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java Wed Mar 19 21:14:53 2014
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+
+/**
+ * Access to the Object cache from Tez, along with utility methods for accessing specific Keys.
+ */
+public class TezCacheAccess {
+
+  private TezCacheAccess(ObjectCache cache) {
+    this.cache = cache;
+  }
+
+  private ObjectCache cache;
+
+  public static TezCacheAccess createInstance(Configuration conf) {
+    ObjectCache cache = ObjectCacheFactory.getCache(conf);
+    return new TezCacheAccess(cache);
+  }
+
+  private static final String CACHED_INPUT_KEY = "CACHED_INPUTS";
+  
+  private final ReentrantLock cachedInputLock = new ReentrantLock();
+
+  public boolean isInputCached(String inputName) {
+    this.cachedInputLock.lock();
+    try {
+      @SuppressWarnings("unchecked")
+      Set<String> cachedInputs = (Set<String>) cache.retrieve(CACHED_INPUT_KEY);
+      if (cachedInputs == null) {
+        return false;
+      } else {
+        return cachedInputs.contains(inputName);
+      }
+    } finally {
+      this.cachedInputLock.unlock();
+    }
+  }
+
+  public void registerCachedInput(String inputName) {
+    this.cachedInputLock.lock();
+    try {
+      @SuppressWarnings("unchecked")
+      Set<String> cachedInputs = (Set<String>) cache.retrieve(CACHED_INPUT_KEY);
+      if (cachedInputs == null) {
+        cachedInputs = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+        cache.cache(CACHED_INPUT_KEY, cachedInputs);
+      }
+      cachedInputs.add(inputName);
+    } finally {
+      this.cachedInputLock.unlock();
+    }
+  }
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1579403&r1=1579402&r2=1579403&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Wed Mar 19 21:14:53 2014
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
 
 /**
  * TezContext contains additional context only available with Tez
@@ -30,6 +31,8 @@ public class TezContext extends MapredCo
 
   // all the inputs for the tez processor
   private Map<String, LogicalInput> inputs;
+  
+  private Map<String, LogicalOutput> outputs;
 
   public TezContext(boolean isMap, JobConf jobConf) {
     super(isMap, jobConf);
@@ -38,6 +41,10 @@ public class TezContext extends MapredCo
   public void setInputs(Map<String, LogicalInput> inputs) {
     this.inputs = inputs;
   }
+  
+  public void setOutputs(Map<String, LogicalOutput> outputs) {
+    this.outputs = outputs;
+  }
 
   public LogicalInput getInput(String name) {
     if (inputs == null) {
@@ -45,4 +52,11 @@ public class TezContext extends MapredCo
     }
     return inputs.get(name);
   }
+  
+  public LogicalOutput getOutput(String name) {
+    if (outputs == null) {
+      return null;
+    }
+    return outputs.get(name);
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1579403&r1=1579402&r2=1579403&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Mar 19 21:14:53 2014
@@ -21,15 +21,16 @@ import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.KVOutputCollector;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
@@ -43,6 +44,9 @@ import org.apache.tez.runtime.library.ap
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
  */
 public class TezProcessor implements LogicalIOProcessor {
+
+  
+  
   private static final Log LOG = LogFactory.getLog(TezProcessor.class);
   private boolean isMap = false;
 
@@ -123,32 +127,34 @@ public class TezProcessor implements Log
     // in case of broadcast-join read the broadcast edge inputs
     // (possibly asynchronously)
 
-    LOG.info("Running map: " + processorContext.getUniqueIdentifier());
-    for (LogicalInput input : inputs.values()) {
-      input.start();
-    }
-    for (LogicalOutput output : outputs.values()) {
-      output.start();
-    }
-
-    Map<String, OutputCollector> outMap = new HashMap<String, OutputCollector>();
+    LOG.info("Running task: " + processorContext.getUniqueIdentifier());
 
-    for (String outputName: outputs.keySet()) {
-      LOG.info("Handling output: " + outputName);
-      KeyValueWriter kvWriter = (KeyValueWriter) outputs.get(outputName).getWriter();
-      OutputCollector collector = new KVOutputCollector(kvWriter);
-      outMap.put(outputName, collector);
-    }
-
-    if(isMap){
+    if (isMap) {
       rproc = new MapRecordProcessor();
-    }
-    else{
+      MRInputLegacy mrInput = getMRInput(inputs);
+      try {
+        mrInput.init();
+      } catch (IOException e) {
+        throw new RuntimeException("Failed while initializing MRInput", e);
+      }
+    } else {
       rproc = new ReduceRecordProcessor();
     }
 
+    TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
+    // Start the actual Inputs. After MRInput initialization.
+    for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+      if (!cacheAccess.isInputCached(inputEntry.getKey())) {
+        inputEntry.getValue().start();
+      } else {
+        LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+      }
+    }
+
+    // Outputs will be started later by the individual Processors.
+
     MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
-    rproc.init(jobConf, mrReporter, inputs, outMap);
+    rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
     rproc.run();
 
     //done - output does not need to be committed as hive does not use outputcommitter
@@ -156,19 +162,39 @@ public class TezProcessor implements Log
   }
 
   /**
-   * KVOutputCollector. OutputCollector that writes using KVWriter
-   *
+   * KVOutputCollector. OutputCollector that writes using KVWriter.
+   * Must be initialized before it is used.
+   * 
    */
-  static class KVOutputCollector implements OutputCollector {
-    private final KeyValueWriter output;
+  static class TezKVOutputCollector implements OutputCollector {
+    private KeyValueWriter writer;
+    private final LogicalOutput output;
 
-    KVOutputCollector(KeyValueWriter output) {
-      this.output = output;
+    TezKVOutputCollector(LogicalOutput logicalOutput) {
+      this.output = logicalOutput;
+    }
+
+    void initialize() throws Exception {
+      this.writer = (KeyValueWriter) output.getWriter();
     }
 
     public void collect(Object key, Object value) throws IOException {
-        output.write(key, value);
+      writer.write(key, value);
     }
   }
 
+  static  MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) {
+    //there should be only one MRInput
+    MRInputLegacy theMRInput = null;
+    for(LogicalInput inp : inputs.values()){
+      if(inp instanceof MRInputLegacy){
+        if(theMRInput != null){
+          throw new IllegalArgumentException("Only one MRInput is expected");
+        }
+        //a better logic would be to find the alias
+        theMRInput = (MRInputLegacy)inp;
+      }
+    }
+    return theMRInput;
+  }
 }