You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/03/19 22:29:00 UTC
svn commit: r1579408 - in
/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez:
HashTableLoader.java MapRecordProcessor.java RecordProcessor.java
ReduceRecordProcessor.java TezCacheAccess.java TezContext.java
TezProcessor.java
Author: gunther
Date: Wed Mar 19 21:29:00 2014
New Revision: 1579408
URL: http://svn.apache.org/r1579408
Log:
HIVE-6613: Control when spcific Inputs / Outputs are started (Siddharth Seth via Gunther Hagleitner)
Added:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1579408&r1=1579407&r2=1579408&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Wed Mar 19 21:29:00 2014
@@ -36,8 +36,8 @@ import org.apache.hadoop.hive.ql.exec.pe
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.tez.runtime.api.LogicalInput;
@@ -75,6 +75,7 @@ public class HashTableLoader implements
HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
boolean useLazyRows = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINLAZYHASHTABLE);
+ TezCacheAccess tezCacheAccess = TezCacheAccess.createInstance(hconf);
// We only check if we can use optimized keys here; that is ok because we don't
// create optimized keys in MapJoin if hash map doesn't have optimized keys.
if (!HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDKEYS)) {
@@ -86,7 +87,8 @@ public class HashTableLoader implements
continue;
}
- LogicalInput input = tezContext.getInput(parentToInput.get(pos));
+ String inputName = parentToInput.get(pos);
+ LogicalInput input = tezContext.getInput(inputName);
try {
KeyValueReader kvReader = (KeyValueReader) input.getReader();
@@ -119,6 +121,9 @@ public class HashTableLoader implements
} catch (Exception e) {
throw new HiveException(e);
}
+ // Register that the Input has been cached.
+ tezCacheAccess.registerCachedInput(inputName);
+ LOG.info("Setting Input: " + inputName + " as cached");
}
if (lastKey == null) {
lastKey = new MapJoinKeyObject(); // No rows in tables, the key type doesn't matter.
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1579408&r1=1579407&r2=1579408&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Mar 19 21:29:00 2014
@@ -35,24 +35,26 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
/**
* Process input from tez LogicalInput and write output - for a map plan
* Just pump the records through the query plan.
*/
-public class MapRecordProcessor extends RecordProcessor{
+public class MapRecordProcessor extends RecordProcessor {
private MapOperator mapOp;
@@ -63,18 +65,13 @@ public class MapRecordProcessor extends
private MapWork mapWork;
@Override
- void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
- Map<String, OutputCollector> outMap){
+ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
- super.init(jconf, mrReporter, inputs, outMap);
+ super.init(jconf, processorContext, mrReporter, inputs, outputs);
//Update JobConf using MRInput, info like filename comes via this
- MRInputLegacy mrInput = getMRInput(inputs);
- try {
- mrInput.init();
- } catch (IOException e) {
- throw new RuntimeException("Failed while initializing MRInput", e);
- }
+ MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
Configuration updatedConf = mrInput.getConfigUpdates();
if (updatedConf != null) {
for (Entry<String, String> entry : updatedConf) {
@@ -82,6 +79,14 @@ public class MapRecordProcessor extends
}
}
+ createOutputMap();
+ // Start all the Outputs.
+ for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
+ l4j.info("Starting Output: " + outputEntry.getKey());
+ ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
+ outputEntry.getValue().start();
+ }
+
ObjectCache cache = ObjectCacheFactory.getCache(jconf);
try {
@@ -143,25 +148,10 @@ public class MapRecordProcessor extends
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
}
- private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) {
- //there should be only one MRInput
- MRInputLegacy theMRInput = null;
- for(LogicalInput inp : inputs.values()){
- if(inp instanceof MRInputLegacy){
- if(theMRInput != null){
- throw new IllegalArgumentException("Only one MRInput is expected");
- }
- //a better logic would be to find the alias
- theMRInput = (MRInputLegacy)inp;
- }
- }
- return theMRInput;
- }
-
@Override
void run() throws IOException{
- MRInputLegacy in = getMRInput(inputs);
+ MRInputLegacy in = TezProcessor.getMRInput(inputs);
KeyValueReader reader = in.getReader();
//process records until done
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1579408&r1=1579407&r2=1579408&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Mar 19 21:29:00 2014
@@ -16,20 +16,26 @@
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
/**
* Process input from tez LogicalInput and write output
@@ -39,7 +45,9 @@ public abstract class RecordProcessor {
protected JobConf jconf;
protected Map<String, LogicalInput> inputs;
+ protected Map<String, LogicalOutput> outputs;
protected Map<String, OutputCollector> outMap;
+ protected TezProcessorContext processorContext;
public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
@@ -54,20 +62,22 @@ public abstract class RecordProcessor {
protected PerfLogger perfLogger = PerfLogger.getPerfLogger();
protected String CLASS_NAME = RecordProcessor.class.getName();
-
/**
* Common initialization code for RecordProcessors
* @param jconf
+ * @param processorContext the {@link TezProcessorContext}
* @param mrReporter
- * @param inputs
- * @param out
+ * @param inputs map of Input names to {@link LogicalInput}s
+ * @param outputs map of Output names to {@link LogicalOutput}s
+ * @throws Exception
*/
- void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
- Map<String, OutputCollector> outMap){
+ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
this.jconf = jconf;
this.reporter = mrReporter;
this.inputs = inputs;
- this.outMap = outMap;
+ this.outputs = outputs;
+ this.processorContext = processorContext;
// Allocate the bean at the beginning -
memoryMXBean = ManagementFactory.getMemoryMXBean();
@@ -92,9 +102,9 @@ public abstract class RecordProcessor {
/**
* start processing the inputs and writing output
- * @throws IOException
+ * @throws Exception
*/
- abstract void run() throws IOException;
+ abstract void run() throws Exception;
abstract void close();
@@ -132,4 +142,12 @@ public abstract class RecordProcessor {
return 10 * cntr;
}
+ protected void createOutputMap() {
+ Preconditions.checkState(outMap == null, "Outputs should only be setup once");
+ outMap = Maps.newHashMap();
+ for (Entry<String, LogicalOutput> entry : outputs.entrySet()) {
+ TezKVOutputCollector collector = new TezKVOutputCollector(entry.getValue());
+ outMap.put(entry.getKey(), collector);
+ }
+ }
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1579408&r1=1579407&r2=1579408&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Mar 19 21:29:00 2014
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -53,9 +55,14 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.api.KeyValuesReader;
+import com.google.common.collect.Lists;
+
/**
* Process input from tez LogicalInput and write output - for a map plan
* Just pump the records through the query plan.
@@ -88,10 +95,10 @@ public class ReduceRecordProcessor exte
List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
@Override
- void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
- Map<String, OutputCollector> outMap){
+ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
- super.init(jconf, mrReporter, inputs, outMap);
+ super.init(jconf, processorContext, mrReporter, inputs, outputs);
ObjectCache cache = ObjectCacheFactory.getCache(jconf);
@@ -163,6 +170,7 @@ public class ReduceRecordProcessor exte
if (dummyOps != null) {
children.addAll(dummyOps);
}
+ createOutputMap();
OperatorUtils.setChildrenCollector(children, outMap);
reducer.setReporter(reporter);
@@ -182,10 +190,20 @@ public class ReduceRecordProcessor exte
}
@Override
- void run() throws IOException{
+ void run() throws Exception {
List<LogicalInput> shuffleInputs = getShuffleInputs(inputs);
- KeyValuesReader kvsReader;
+ if (shuffleInputs != null) {
+ l4j.info("Waiting for ShuffleInputs to become ready");
+ processorContext.waitForAllInputsReady(new ArrayList<Input>(shuffleInputs));
+ }
+ for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
+ l4j.info("Starting Output: " + outputEntry.getKey());
+ ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
+ outputEntry.getValue().start();
+ }
+
+ KeyValuesReader kvsReader;
try {
if(shuffleInputs.size() == 1){
//no merging of inputs required
Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java?rev=1579408&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java Wed Mar 19 21:29:00 2014
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+
+/**
+ * Access to the Object cache from Tez, along with utility methods for accessing specific Keys.
+ */
+public class TezCacheAccess {
+
+ private TezCacheAccess(ObjectCache cache) {
+ this.cache = cache;
+ }
+
+ private ObjectCache cache;
+
+ public static TezCacheAccess createInstance(Configuration conf) {
+ ObjectCache cache = ObjectCacheFactory.getCache(conf);
+ return new TezCacheAccess(cache);
+ }
+
+ private static final String CACHED_INPUT_KEY = "CACHED_INPUTS";
+
+ private final ReentrantLock cachedInputLock = new ReentrantLock();
+
+ public boolean isInputCached(String inputName) {
+ this.cachedInputLock.lock();
+ try {
+ @SuppressWarnings("unchecked")
+ Set<String> cachedInputs = (Set<String>) cache.retrieve(CACHED_INPUT_KEY);
+ if (cachedInputs == null) {
+ return false;
+ } else {
+ return cachedInputs.contains(inputName);
+ }
+ } finally {
+ this.cachedInputLock.unlock();
+ }
+ }
+
+ public void registerCachedInput(String inputName) {
+ this.cachedInputLock.lock();
+ try {
+ @SuppressWarnings("unchecked")
+ Set<String> cachedInputs = (Set<String>) cache.retrieve(CACHED_INPUT_KEY);
+ if (cachedInputs == null) {
+ cachedInputs = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ cache.cache(CACHED_INPUT_KEY, cachedInputs);
+ }
+ cachedInputs.add(inputName);
+ } finally {
+ this.cachedInputLock.unlock();
+ }
+ }
+
+}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1579408&r1=1579407&r2=1579408&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Wed Mar 19 21:29:00 2014
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
/**
* TezContext contains additional context only available with Tez
@@ -30,6 +31,8 @@ public class TezContext extends MapredCo
// all the inputs for the tez processor
private Map<String, LogicalInput> inputs;
+
+ private Map<String, LogicalOutput> outputs;
public TezContext(boolean isMap, JobConf jobConf) {
super(isMap, jobConf);
@@ -38,6 +41,10 @@ public class TezContext extends MapredCo
public void setInputs(Map<String, LogicalInput> inputs) {
this.inputs = inputs;
}
+
+ public void setOutputs(Map<String, LogicalOutput> outputs) {
+ this.outputs = outputs;
+ }
public LogicalInput getInput(String name) {
if (inputs == null) {
@@ -45,4 +52,11 @@ public class TezContext extends MapredCo
}
return inputs.get(name);
}
+
+ public LogicalOutput getOutput(String name) {
+ if (outputs == null) {
+ return null;
+ }
+ return outputs.get(name);
+ }
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1579408&r1=1579407&r2=1579408&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Mar 19 21:29:00 2014
@@ -21,15 +21,16 @@ import java.text.NumberFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.KVOutputCollector;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalIOProcessor;
@@ -43,6 +44,9 @@ import org.apache.tez.runtime.library.ap
* Does what ExecMapper and ExecReducer does for hive in MR framework.
*/
public class TezProcessor implements LogicalIOProcessor {
+
+
+
private static final Log LOG = LogFactory.getLog(TezProcessor.class);
private boolean isMap = false;
@@ -123,32 +127,34 @@ public class TezProcessor implements Log
// in case of broadcast-join read the broadcast edge inputs
// (possibly asynchronously)
- LOG.info("Running map: " + processorContext.getUniqueIdentifier());
- for (LogicalInput input : inputs.values()) {
- input.start();
- }
- for (LogicalOutput output : outputs.values()) {
- output.start();
- }
-
- Map<String, OutputCollector> outMap = new HashMap<String, OutputCollector>();
+ LOG.info("Running task: " + processorContext.getUniqueIdentifier());
- for (String outputName: outputs.keySet()) {
- LOG.info("Handling output: " + outputName);
- KeyValueWriter kvWriter = (KeyValueWriter) outputs.get(outputName).getWriter();
- OutputCollector collector = new KVOutputCollector(kvWriter);
- outMap.put(outputName, collector);
- }
-
- if(isMap){
+ if (isMap) {
rproc = new MapRecordProcessor();
- }
- else{
+ MRInputLegacy mrInput = getMRInput(inputs);
+ try {
+ mrInput.init();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed while initializing MRInput", e);
+ }
+ } else {
rproc = new ReduceRecordProcessor();
}
+ TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
+ // Start the actual Inputs. After MRInput initialization.
+ for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+ if (!cacheAccess.isInputCached(inputEntry.getKey())) {
+ inputEntry.getValue().start();
+ } else {
+ LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+ }
+ }
+
+ // Outputs will be started later by the individual Processors.
+
MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
- rproc.init(jobConf, mrReporter, inputs, outMap);
+ rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
rproc.run();
//done - output does not need to be committed as hive does not use outputcommitter
@@ -156,19 +162,39 @@ public class TezProcessor implements Log
}
/**
- * KVOutputCollector. OutputCollector that writes using KVWriter
- *
+ * KVOutputCollector. OutputCollector that writes using KVWriter.
+ * Must be initialized before it is used.
+ *
*/
- static class KVOutputCollector implements OutputCollector {
- private final KeyValueWriter output;
+ static class TezKVOutputCollector implements OutputCollector {
+ private KeyValueWriter writer;
+ private final LogicalOutput output;
- KVOutputCollector(KeyValueWriter output) {
- this.output = output;
+ TezKVOutputCollector(LogicalOutput logicalOutput) {
+ this.output = logicalOutput;
+ }
+
+ void initialize() throws Exception {
+ this.writer = (KeyValueWriter) output.getWriter();
}
public void collect(Object key, Object value) throws IOException {
- output.write(key, value);
+ writer.write(key, value);
}
}
+ static MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) {
+ //there should be only one MRInput
+ MRInputLegacy theMRInput = null;
+ for(LogicalInput inp : inputs.values()){
+ if(inp instanceof MRInputLegacy){
+ if(theMRInput != null){
+ throw new IllegalArgumentException("Only one MRInput is expected");
+ }
+ //a better logic would be to find the alias
+ theMRInput = (MRInputLegacy)inp;
+ }
+ }
+ return theMRInput;
+ }
}