You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/24 09:03:38 UTC
svn commit: r1627235 [2/9] - in /hive/trunk: itests/src/test/resources/
itests/util/src/main/java/org/apache/hadoop/hive/ql/
metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/
ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Wed Sep 24 07:03:35 2014
@@ -20,6 +20,23 @@ package org.apache.hadoop.hive.ql.exec.t
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -32,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -47,10 +65,12 @@ import org.apache.hadoop.hive.ql.io.merg
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -90,12 +110,16 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
@@ -104,21 +128,6 @@ import org.apache.tez.runtime.library.co
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import javax.security.auth.login.LoginException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
/**
* DagUtils. DagUtils is a collection of helper methods to convert
* map and reduce work to tez vertices and edges. It handles configuration
@@ -130,6 +139,11 @@ public class DagUtils {
private static final Log LOG = LogFactory.getLog(DagUtils.class.getName());
private static final String TEZ_DIR = "_tez_scratch_dir";
private static DagUtils instance;
+ // The merge file being currently processed.
+ public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX =
+ "hive.tez.current.merge.file.prefix";
+ // "A comma separated list of work names used as prefix.
+ public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes";
private void addCredentials(MapWork mapWork, DAG dag) {
Set<String> paths = mapWork.getPathToAliases().keySet();
@@ -238,8 +252,8 @@ public class DagUtils {
* endpoints.
*/
@SuppressWarnings("rawtypes")
- public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
- Vertex w, TezEdgeProperty edgeProp)
+ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w,
+ TezEdgeProperty edgeProp, VertexType vertexType)
throws IOException {
Class mergeInputClass;
@@ -254,10 +268,14 @@ public class DagUtils {
case CUSTOM_EDGE: {
mergeInputClass = ConcatenatedMergedKeyValueInput.class;
int numBuckets = edgeProp.getNumBuckets();
+ CustomVertexConfiguration vertexConf =
+ new CustomVertexConfiguration(numBuckets, vertexType, "");
+ DataOutputBuffer dob = new DataOutputBuffer();
+ vertexConf.write(dob);
VertexManagerPluginDescriptor desc =
VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
- ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
- userPayload.flip();
+ byte[] userPayloadBytes = dob.getData();
+ ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
@@ -289,17 +307,21 @@ public class DagUtils {
* @param w The second vertex (sink)
* @return
*/
- public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
- TezEdgeProperty edgeProp)
+ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp,
+ VertexType vertexType)
throws IOException {
switch(edgeProp.getEdgeType()) {
case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
- ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
- userPayload.flip();
+ CustomVertexConfiguration vertexConf =
+ new CustomVertexConfiguration(numBuckets, vertexType, "");
+ DataOutputBuffer dob = new DataOutputBuffer();
+ vertexConf.write(dob);
VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
CustomPartitionVertex.class.getName());
+ byte[] userPayloadBytes = dob.getData();
+ ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
@@ -443,12 +465,61 @@ public class DagUtils {
return MRHelpers.getJavaOptsForMRMapper(conf);
}
+ private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
+ List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx,
+ VertexType vertexType)
+ throws Exception {
+ Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
+ if (mergeJoinWork.getMainWork() instanceof MapWork) {
+ List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
+ MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
+ CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
+ Vertex mergeVx =
+ createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
+
+ // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
+ // here would cause pre-mature grouping which would be incorrect.
+ Class inputFormatClass = HiveInputFormat.class;
+ conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
+ // mapreduce.tez.input.initializer.serialize.event.payload should be set
+ // to false when using this plug-in to avoid getting a serialized event at run-time.
+ conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false);
+ for (int i = 0; i < mapWorkList.size(); i++) {
+
+ mapWork = (MapWork) (mapWorkList.get(i));
+ conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName());
+ conf.set(Utilities.INPUT_NAME, mapWork.getName());
+ LOG.info("Going through each work and adding MultiMRInput");
+ mergeVx.addDataSource(mapWork.getName(),
+ MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build());
+ }
+
+ VertexManagerPluginDescriptor desc =
+ VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+ CustomVertexConfiguration vertexConf =
+ new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf()
+ .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias());
+ DataOutputBuffer dob = new DataOutputBuffer();
+ vertexConf.write(dob);
+ byte[] userPayload = dob.getData();
+ desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
+ mergeVx.setVertexManagerPlugin(desc);
+ return mergeVx;
+ } else {
+ Vertex mergeVx =
+ createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs,
+ mrScratchDir, ctx);
+ return mergeVx;
+ }
+ }
+
/*
* Helper function to create Vertex from MapWork.
*/
private Vertex createVertex(JobConf conf, MapWork mapWork,
LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
- Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception {
+ Path mrScratchDir, Context ctx, VertexType vertexType)
+ throws Exception {
Path tezDir = getTezDir(mrScratchDir);
@@ -470,15 +541,8 @@ public class DagUtils {
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
- boolean vertexHasCustomInput = false;
- if (tezWork != null) {
- for (BaseWork baseWork : tezWork.getParents(mapWork)) {
- if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) {
- vertexHasCustomInput = true;
- }
- }
- }
-
+ boolean vertexHasCustomInput = VertexType.isCustomInputType(vertexType);
+ LOG.info("Vertex has custom input? " + vertexHasCustomInput);
if (vertexHasCustomInput) {
groupSplitsInInputInitializer = false;
// grouping happens in execution phase. The input payload should not enable grouping here,
@@ -513,6 +577,8 @@ public class DagUtils {
}
}
+ // remember mapping of plan to input
+ conf.set(Utilities.INPUT_NAME, mapWork.getName());
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
@@ -593,6 +659,7 @@ public class DagUtils {
Path mrScratchDir, Context ctx) throws Exception {
// set up operator plan
+ conf.set(Utilities.INPUT_NAME, reduceWork.getName());
Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false);
// create the directories FileSinkOperators need
@@ -937,12 +1004,22 @@ public class DagUtils {
return initializeVertexConf(conf, context, (MapWork)work);
} else if (work instanceof ReduceWork) {
return initializeVertexConf(conf, context, (ReduceWork)work);
+ } else if (work instanceof MergeJoinWork) {
+ return initializeVertexConf(conf, context, (MergeJoinWork) work);
} else {
assert false;
return null;
}
}
+ private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) {
+ if (work.getMainWork() instanceof MapWork) {
+ return initializeVertexConf(conf, context, (MapWork) (work.getMainWork()));
+ } else {
+ return initializeVertexConf(conf, context, (ReduceWork) (work.getMainWork()));
+ }
+ }
+
/**
* Create a vertex from a given work object.
*
@@ -958,18 +1035,21 @@ public class DagUtils {
*/
public Vertex createVertex(JobConf conf, BaseWork work,
Path scratchDir, LocalResource appJarLr,
- List<LocalResource> additionalLr,
- FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception {
+ List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,
+ TezWork tezWork, VertexType vertexType) throws Exception {
Vertex v = null;
// simply dispatch the call to the right method for the actual (sub-) type of
// BaseWork.
if (work instanceof MapWork) {
- v = createVertex(conf, (MapWork) work, appJarLr,
- additionalLr, fileSystem, scratchDir, ctx, tezWork);
+ v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx,
+ vertexType);
} else if (work instanceof ReduceWork) {
v = createVertex(conf, (ReduceWork) work, appJarLr,
additionalLr, fileSystem, scratchDir, ctx);
+ } else if (work instanceof MergeJoinWork) {
+ v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
+ ctx, vertexType);
} else {
// something is seriously wrong if this is happening
throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Wed Sep 24 07:03:35 2014
@@ -152,8 +152,21 @@ public class HiveSplitGenerator extends
public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
Configuration conf, InputSplit[] splits, float waves, int availableSlots)
throws Exception {
+ return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null);
+ }
+
+ public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+ Configuration conf, InputSplit[] splits, float waves, int availableSlots,
+ String inputName) throws Exception {
- MapWork work = Utilities.getMapWork(jobConf);
+ MapWork work = null;
+ if (inputName != null) {
+ work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
+ // work can still be null if there is no merge work for this input
+ }
+ if (work == null) {
+ work = Utilities.getMapWork(jobConf);
+ }
Multimap<Integer, InputSplit> bucketSplitMultiMap =
ArrayListMultimap.<Integer, InputSplit> create();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Sep 24 07:03:35 2014
@@ -17,14 +17,20 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -36,15 +42,17 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
+import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
@@ -58,27 +66,61 @@ public class MapRecordProcessor extends
private MapOperator mapOp;
+ private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>();
public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
- private final ExecMapperContext execContext = new ExecMapperContext();
+ private MapRecordSource[] sources;
+ private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>();
+ private int position = 0;
+ private boolean foundCachedMergeWork = false;
+ MRInputLegacy legacyMRInput = null;
+ private ExecMapperContext execContext = null;
private boolean abort = false;
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapWork mapWork;
+ List<MapWork> mergeWorkList = null;
+ private static Map<Integer, DummyStoreOperator> connectOps =
+ new TreeMap<Integer, DummyStoreOperator>();
- public MapRecordProcessor(JobConf jconf) {
+ public MapRecordProcessor(JobConf jconf) throws Exception {
ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+ execContext = new ExecMapperContext(jconf);
execContext.setJc(jconf);
// create map and fetch operators
mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
if (mapWork == null) {
mapWork = Utilities.getMapWork(jconf);
cache.cache(MAP_PLAN_KEY, mapWork);
- l4j.info("Plan: "+mapWork);
+ l4j.debug("Plan: " + mapWork);
for (String s: mapWork.getAliases()) {
- l4j.info("Alias: "+s);
+ l4j.debug("Alias: " + s);
}
} else {
Utilities.setMapWork(jconf, mapWork);
}
+
+ String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+ if (prefixes != null) {
+ mergeWorkList = new ArrayList<MapWork>();
+ for (String prefix : prefixes.split(",")) {
+ MapWork mergeMapWork = (MapWork) cache.retrieve(prefix);
+ if (mergeMapWork != null) {
+ l4j.info("Found merge work in cache");
+ foundCachedMergeWork = true;
+ mergeWorkList.add(mergeMapWork);
+ continue;
+ }
+ if (foundCachedMergeWork) {
+ throw new Exception(
+ "Should find all work in cache else operator pipeline will be in non-deterministic state");
+ }
+
+ if ((prefix != null) && (prefix.isEmpty() == false)) {
+ mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, prefix);
+ mergeWorkList.add(mergeMapWork);
+ cache.cache(prefix, mergeMapWork);
+ }
+ }
+ }
}
@Override
@@ -88,8 +130,8 @@ public class MapRecordProcessor extends
super.init(jconf, processorContext, mrReporter, inputs, outputs);
//Update JobConf using MRInput, info like filename comes via this
- MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
- Configuration updatedConf = mrInput.getConfigUpdates();
+ legacyMRInput = getMRInput(inputs);
+ Configuration updatedConf = legacyMRInput.getConfigUpdates();
if (updatedConf != null) {
for (Entry<String, String> entry : updatedConf) {
jconf.set(entry.getKey(), entry.getValue());
@@ -99,20 +141,52 @@ public class MapRecordProcessor extends
createOutputMap();
// Start all the Outputs.
for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
- l4j.info("Starting Output: " + outputEntry.getKey());
+ l4j.debug("Starting Output: " + outputEntry.getKey());
outputEntry.getValue().start();
((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
}
try {
+
if (mapWork.getVectorMode()) {
mapOp = new VectorMapOperator();
} else {
mapOp = new MapOperator();
}
+ connectOps.clear();
+ if (mergeWorkList != null) {
+ MapOperator mergeMapOp = null;
+ for (MapWork mergeMapWork : mergeWorkList) {
+ processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs
+ .get(mergeMapWork.getName()))));
+ if (mergeMapWork.getVectorMode()) {
+ mergeMapOp = new VectorMapOperator();
+ } else {
+ mergeMapOp = new MapOperator();
+ }
+
+ mergeMapOpList.add(mergeMapOp);
+ // initialize the merge operators first.
+ if (mergeMapOp != null) {
+ mergeMapOp.setConf(mergeMapWork);
+ l4j.info("Input name is " + mergeMapWork.getName());
+ jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
+ mergeMapOp.setChildren(jconf);
+ if (foundCachedMergeWork == false) {
+ DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
+ connectOps.put(mergeMapWork.getTag(), dummyOp);
+ }
+ mergeMapOp.setExecContext(new ExecMapperContext(jconf));
+ mergeMapOp.initializeLocalWork(jconf);
+ }
+ }
+ }
+
// initialize map operator
mapOp.setConf(mapWork);
+ l4j.info("Main input name is " + mapWork.getName());
+ jconf.set(Utilities.INPUT_NAME, mapWork.getName());
mapOp.setChildren(jconf);
l4j.info(mapOp.dump(0));
@@ -121,12 +195,21 @@ public class MapRecordProcessor extends
((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
mapOp.setExecContext(execContext);
mapOp.initializeLocalWork(jconf);
+
+ initializeMapRecordSources();
mapOp.initialize(jconf, null);
+ if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
+ for (MapOperator mergeMapOp : mergeMapOpList) {
+ jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
+ mergeMapOp.initialize(jconf, null);
+ }
+ }
// Initialization isn't finished until all parents of all operators
// are initialized. For broadcast joins that means initializing the
// dummy parent operators as well.
List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps();
+ jconf.set(Utilities.INPUT_NAME, mapWork.getName());
if (dummyOps != null) {
for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
dummyOp.setExecContext(execContext);
@@ -151,54 +234,46 @@ public class MapRecordProcessor extends
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
}
- @Override
- void run() throws IOException{
-
- MRInputLegacy in = TezProcessor.getMRInput(inputs);
- KeyValueReader reader = in.getReader();
+ private void initializeMapRecordSources() throws Exception {
+ int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself
+ sources = new MapRecordSource[size];
+ KeyValueReader reader = legacyMRInput.getReader();
+ position = mapOp.getConf().getTag();
+ sources[position] = new MapRecordSource();
+ sources[position].init(jconf, mapOp, reader);
+ for (MapOperator mapOp : mergeMapOpList) {
+ int tag = mapOp.getConf().getTag();
+ sources[tag] = new MapRecordSource();
+ String inputName = mapOp.getConf().getName();
+ MultiMRInput multiMRInput = multiMRInputMap.get(inputName);
+ Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders();
+ l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName);
+ List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders);
+ reader = new KeyValueInputMerger(kvReaderList);
+ sources[tag].init(jconf, mapOp, reader);
+ }
+ ((TezContext) MapredContext.get()).setRecordSources(sources);
+ }
- //process records until done
- while(reader.next()){
- //ignore the key for maps - reader.getCurrentKey();
- Object value = reader.getCurrentValue();
- boolean needMore = processRow(value);
- if(!needMore){
- break;
+ private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) {
+ for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) {
+ if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) {
+ return (DummyStoreOperator) childOp;
+ } else {
+ return getJoinParentOp(childOp);
}
}
+ return null;
}
+ @Override
+ void run() throws Exception {
- /**
- * @param value value to process
- * @return true if it is not done and can take more inputs
- */
- private boolean processRow(Object value) {
- // reset the execContext for each new row
- execContext.resetRow();
-
- try {
- if (mapOp.getDone()) {
- return false; //done
- } else {
- // Since there is no concept of a group, we don't invoke
- // startGroup/endGroup for a mapper
- mapOp.process((Writable)value);
- if (isLogInfoEnabled) {
- logProgress();
- }
- }
- } catch (Throwable e) {
- abort = true;
- if (e instanceof OutOfMemoryError) {
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
- } else {
- l4j.fatal(StringUtils.stringifyException(e));
- throw new RuntimeException(e);
+ while (sources[position].pushRecord()) {
+ if (isLogInfoEnabled) {
+ logProgress();
}
}
- return true; //give me more
}
@Override
@@ -214,6 +289,11 @@ public class MapRecordProcessor extends
return;
}
mapOp.close(abort);
+ if (mergeMapOpList.isEmpty() == false) {
+ for (MapOperator mergeMapOp : mergeMapOpList) {
+ mergeMapOp.close(abort);
+ }
+ }
// Need to close the dummyOps as well. The operator pipeline
// is not considered "closed/done" unless all operators are
@@ -242,4 +322,27 @@ public class MapRecordProcessor extends
MapredContext.close();
}
}
+
+ public static Map<Integer, DummyStoreOperator> getConnectOps() {
+ return connectOps;
+ }
+
+ private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
+ // there should be only one MRInput
+ MRInputLegacy theMRInput = null;
+ l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray()));
+ for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
+ if (inp.getValue() instanceof MRInputLegacy) {
+ if (theMRInput != null) {
+ throw new IllegalArgumentException("Only one MRInput is expected");
+ }
+ // a better logic would be to find the alias
+ theMRInput = (MRInputLegacy) inp.getValue();
+ } else if (inp.getValue() instanceof MultiMRInput) {
+ multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue());
+ }
+ }
+ theMRInput.init();
+ return theMRInput;
+ }
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan Just pump the records
+ * through the query plan.
+ */
+
+public class MapRecordSource implements RecordSource {
+
+ public static final Log LOG = LogFactory.getLog(MapRecordSource.class);
+ private ExecMapperContext execContext = null;
+ private MapOperator mapOp = null;
+ private KeyValueReader reader = null;
+ private final boolean grouped = false;
+
+ void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException {
+ execContext = new ExecMapperContext(jconf);
+ this.mapOp = mapOp;
+ this.reader = reader;
+ }
+
+ @Override
+ public final boolean isGrouped() {
+ return grouped;
+ }
+
+ @Override
+ public boolean pushRecord() throws HiveException {
+ execContext.resetRow();
+
+ try {
+ if (reader.next()) {
+ Object value;
+ try {
+ value = reader.getCurrentValue();
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ return processRow(value);
+ }
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ return false;
+ }
+
+ private boolean processRow(Object value) {
+ try {
+ if (mapOp.getDone()) {
+ return false; // done
+ } else {
+ // Since there is no concept of a group, we don't invoke
+ // startGroup/endGroup for a mapper
+ mapOp.process((Writable) value);
+ }
+ } catch (Throwable e) {
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ LOG.fatal(StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+ return true; // give me more
+ }
+
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Wed Sep 24 07:03:35 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.t
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -40,7 +41,9 @@ import org.apache.tez.runtime.api.Logica
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
+import java.io.IOException;
import java.util.Map;
+import java.util.Map.Entry;
/**
* Record processor for fast merging of files.
@@ -51,11 +54,12 @@ public class MergeFileRecordProcessor ex
.getLog(MergeFileRecordProcessor.class);
protected Operator<? extends OperatorDesc> mergeOp;
- private final ExecMapperContext execContext = new ExecMapperContext();
+ private ExecMapperContext execContext = null;
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MergeFileWork mfWork;
+ MRInputLegacy mrInput = null;
private boolean abort = false;
- private Object[] row = new Object[2];
+ private final Object[] row = new Object[2];
@Override
void init(JobConf jconf, ProcessorContext processorContext,
@@ -63,16 +67,16 @@ public class MergeFileRecordProcessor ex
Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(jconf, processorContext, mrReporter, inputs, outputs);
+ execContext = new ExecMapperContext(jconf);
//Update JobConf using MRInput, info like filename comes via this
- MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
+ mrInput = getMRInput(inputs);
Configuration updatedConf = mrInput.getConfigUpdates();
if (updatedConf != null) {
for (Map.Entry<String, String> entry : updatedConf) {
jconf.set(entry.getKey(), entry.getValue());
}
}
-
createOutputMap();
// Start all the Outputs.
for (Map.Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
@@ -127,8 +131,7 @@ public class MergeFileRecordProcessor ex
@Override
void run() throws Exception {
- MRInputLegacy in = TezProcessor.getMRInput(inputs);
- KeyValueReader reader = in.getReader();
+ KeyValueReader reader = mrInput.getReader();
//process records until done
while (reader.next()) {
@@ -205,4 +208,23 @@ public class MergeFileRecordProcessor ex
return true; //give me more
}
+ private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
+ // there should be only one MRInput
+ MRInputLegacy theMRInput = null;
+ for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
+ if (inp.getValue() instanceof MRInputLegacy) {
+ if (theMRInput != null) {
+ throw new IllegalArgumentException("Only one MRInput is expected");
+ }
+ // a better logic would be to find the alias
+ theMRInput = (MRInputLegacy) inp.getValue();
+ } else {
+ throw new IOException("Expecting only one input of type MRInputLegacy. Found type: "
+ + inp.getClass().getCanonicalName());
+ }
+ }
+ theMRInput.init();
+
+ return theMRInput;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java Wed Sep 24 07:03:35 2014
@@ -39,12 +39,6 @@ public class MergeFileTezProcessor exten
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
rproc = new MergeFileRecordProcessor();
- MRInputLegacy mrInput = getMRInput(inputs);
- try {
- mrInput.init();
- } catch (IOException e) {
- throw new RuntimeException("Failed while initializing MRInput", e);
- }
initializeAndRunProcessor(inputs, outputs);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Sep 24 07:03:35 2014
@@ -115,8 +115,7 @@ public abstract class RecordProcessor {
*/
protected void logCloseInfo() {
long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
- + used_memory);
+ l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory);
}
/**
@@ -126,8 +125,7 @@ public abstract class RecordProcessor {
numRows++;
if (numRows == nextUpdateCntr) {
long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processing " + numRows
- + " rows: used memory = " + used_memory);
+ l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory);
nextUpdateCntr = getNextUpdateRecordCounter(numRows);
}
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordSource.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public interface RecordSource {
+ public boolean pushRecord() throws HiveException;
+ public boolean isGrouped();
+}
\ No newline at end of file
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Sep 24 07:03:35 2014
@@ -17,9 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -35,31 +33,13 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
-import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
@@ -76,39 +56,16 @@ public class ReduceRecordProcessor exte
private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
- private final ExecMapperContext execContext = new ExecMapperContext();
- private boolean abort = false;
- private Deserializer inputKeyDeserializer;
-
- // Input value serde needs to be an array to support different SerDe
- // for different tags
- private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
- TableDesc keyTableDesc;
- TableDesc[] valueTableDesc;
+ private ReduceWork redWork;
- ObjectInspector[] rowObjectInspector;
private Operator<?> reducer;
- private boolean isTagged = false;
-
- private Object keyObject = null;
- private BytesWritable groupKey;
-
- private ReduceWork redWork;
- private boolean vectorized = false;
+ private ReduceRecordSource[] sources;
- List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+ private final byte position = 0;
- private DataOutputBuffer buffer;
- private VectorizedRowBatch[] batches;
- // number of columns pertaining to keys in a vectorized row batch
- private int keysColumnOffset;
- private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
- private StructObjectInspector keyStructInspector;
- private StructObjectInspector[] valueStructInspectors;
- /* this is only used in the error code path */
- private List<VectorExpressionWriter>[] valueStringWriters;
+ private boolean abort;
@Override
void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
@@ -118,10 +75,6 @@ public class ReduceRecordProcessor exte
ObjectCache cache = ObjectCacheFactory.getCache(jconf);
- rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
- ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
- ObjectInspector keyObjectInspector;
-
redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY);
if (redWork == null) {
redWork = Utilities.getReduceWork(jconf);
@@ -131,95 +84,35 @@ public class ReduceRecordProcessor exte
}
reducer = redWork.getReducer();
- reducer.setParentOperators(null); // clear out any parents as reducer is the
- // root
- isTagged = redWork.getNeedsTagging();
- vectorized = redWork.getVectorMode();
+ reducer.getParentOperators().clear();
+ reducer.setParentOperators(null); // clear out any parents as reducer is the root
- try {
- keyTableDesc = redWork.getKeyDesc();
- inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
- .getDeserializerClass(), null);
- SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
- keyObjectInspector = inputKeyDeserializer.getObjectInspector();
- reducer.setGroupKeyObjectInspector(keyObjectInspector);
- valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()];
-
- if(vectorized) {
- final int maxTags = redWork.getTagToValueDesc().size();
- keyStructInspector = (StructObjectInspector)keyObjectInspector;
- batches = new VectorizedRowBatch[maxTags];
- valueStructInspectors = new StructObjectInspector[maxTags];
- valueStringWriters = new List[maxTags];
- keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
- buffer = new DataOutputBuffer();
- }
+ int numTags = redWork.getTagToValueDesc().size();
- for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
- // We should initialize the SerDe with the TypeInfo when available.
- valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag);
- inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
- valueTableDesc[tag].getDeserializerClass(), null);
- SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
- valueTableDesc[tag].getProperties(), null);
- valueObjectInspector[tag] = inputValueDeserializer[tag]
- .getObjectInspector();
-
- ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-
- if(vectorized) {
- /* vectorization only works with struct object inspectors */
- valueStructInspectors[tag] = (StructObjectInspector)valueObjectInspector[tag];
-
- batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
- valueStructInspectors[tag]);
- final int totalColumns = keysColumnOffset +
- valueStructInspectors[tag].getAllStructFieldRefs().size();
- valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
- valueStringWriters[tag].addAll(Arrays
- .asList(VectorExpressionWriterFactory
- .genVectorStructExpressionWritables(keyStructInspector)));
- valueStringWriters[tag].addAll(Arrays
- .asList(VectorExpressionWriterFactory
- .genVectorStructExpressionWritables(valueStructInspectors[tag])));
-
- /*
- * The row object inspector used by ReduceWork needs to be a **standard**
- * struct object inspector, not just any struct object inspector.
- */
- ArrayList<String> colNames = new ArrayList<String>();
- List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
- for (StructField field: fields) {
- colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
- }
- fields = valueStructInspectors[tag].getAllStructFieldRefs();
- for (StructField field: fields) {
- colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
- }
- rowObjectInspector[tag] = ObjectInspectorFactory
- .getStandardStructObjectInspector(colNames, ois);
- } else {
- ois.add(keyObjectInspector);
- ois.add(valueObjectInspector[tag]);
- rowObjectInspector[tag] = ObjectInspectorFactory
- .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois);
- }
+ ObjectInspector[] ois = new ObjectInspector[numTags];
+ sources = new ReduceRecordSource[numTags];
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
+ TableDesc keyTableDesc = redWork.getKeyDesc();
+ TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag);
+ KeyValuesReader reader =
+ (KeyValuesReader) inputs.get(redWork.getTagToInput().get(tag)).getReader();
+
+ sources[tag] = new ReduceRecordSource();
+ sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc,
+ reader, tag == position, (byte) tag);
+ ois[tag] = sources[tag].getObjectInspector();
}
MapredContext.init(false, new JobConf(jconf));
((TezContext) MapredContext.get()).setInputs(inputs);
((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
+ ((TezContext) MapredContext.get()).setRecordSources(sources);
// initialize reduce operator tree
try {
l4j.info(reducer.dump(0));
- reducer.initialize(jconf, rowObjectInspector);
+ reducer.initialize(jconf, ois);
// Initialization isn't finished until all parents of all operators
// are initialized. For broadcast joins that means initializing the
@@ -227,7 +120,6 @@ public class ReduceRecordProcessor exte
List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
if (dummyOps != null) {
for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
- dummyOp.setExecContext(execContext);
dummyOp.initialize(jconf, null);
}
}
@@ -271,28 +163,12 @@ public class ReduceRecordProcessor exte
((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
}
- KeyValuesReader kvsReader;
- try {
- if(shuffleInputs.size() == 1){
- //no merging of inputs required
- kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader();
- }else {
- //get a sort merged input
- kvsReader = new InputMerger(shuffleInputs);
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- while(kvsReader.next()){
- Object key = kvsReader.getCurrentKey();
- Iterable<Object> values = kvsReader.getCurrentValues();
- boolean needMore = processRows(key, values);
- if(!needMore){
- break;
+ // run the operator pipeline
+ while (sources[position].pushRecord()) {
+ if (isLogInfoEnabled) {
+ logProgress();
}
}
-
}
/**
@@ -302,212 +178,22 @@ public class ReduceRecordProcessor exte
*/
private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) {
//the reduce plan inputs have tags, add all inputs that have tags
- Map<Integer, String> tag2input = redWork.getTagToInput();
+ Map<Integer, String> tagToinput = redWork.getTagToInput();
ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
- for(String inpStr : tag2input.values()){
+ for(String inpStr : tagToinput.values()){
+ if (inputs.get(inpStr) == null) {
+ throw new AssertionError("Cound not find input: " + inpStr);
+ }
shuffleInputs.add(inputs.get(inpStr));
}
return shuffleInputs;
}
- /**
- * @param key
- * @param values
- * @return true if it is not done and can take more inputs
- */
- private boolean processRows(Object key, Iterable<Object> values) {
- if(reducer.getDone()){
- //done - no more records needed
- return false;
- }
-
- // reset the execContext for each new row
- execContext.resetRow();
-
- try {
- BytesWritable keyWritable = (BytesWritable) key;
- byte tag = 0;
-
- if (isTagged) {
- // remove the tag from key coming out of reducer
- // and store it in separate variable.
- int size = keyWritable.getLength() - 1;
- tag = keyWritable.getBytes()[size];
- keyWritable.setSize(size);
- }
-
- //Set the key, check if this is a new group or same group
- if (!keyWritable.equals(this.groupKey)) {
- // If a operator wants to do some work at the beginning of a group
- if (groupKey == null) { // the first group
- this.groupKey = new BytesWritable();
- } else {
- // If a operator wants to do some work at the end of a group
- if(isLogTraceEnabled) {
- l4j.trace("End Group");
- }
- reducer.endGroup();
- }
-
- try {
- this.keyObject = inputKeyDeserializer.deserialize(keyWritable);
- } catch (Exception e) {
- throw new HiveException(
- "Hive Runtime Error: Unable to deserialize reduce input key from "
- + Utilities.formatBinaryString(keyWritable.getBytes(), 0,
- keyWritable.getLength()) + " with properties "
- + keyTableDesc.getProperties(), e);
- }
- groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength());
- if (isLogTraceEnabled) {
- l4j.trace("Start Group");
- }
- reducer.setGroupKeyObject(keyObject);
- reducer.startGroup();
- }
- /* this.keyObject passed via reference */
- if(vectorized) {
- return processVectors(values, tag);
- } else {
- return processKeyValues(values, tag);
- }
- } catch (Throwable e) {
- abort = true;
- if (e instanceof OutOfMemoryError) {
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
- } else {
- l4j.fatal(StringUtils.stringifyException(e));
- throw new RuntimeException(e);
- }
- }
- }
-
- private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException {
- try {
- return inputValueDeserializer[tag].deserialize(valueWritable);
- } catch (SerDeException e) {
- throw new HiveException(
- "Hive Runtime Error: Unable to deserialize reduce input value (tag="
- + tag
- + ") from "
- + Utilities.formatBinaryString(valueWritable.getBytes(), 0,
- valueWritable.getLength()) + " with properties "
- + valueTableDesc[tag].getProperties(), e);
- }
- }
-
- /**
- * @param values
- * @return true if it is not done and can take more inputs
- */
- private boolean processKeyValues(Iterable<Object> values, byte tag) throws HiveException {
-
- for (Object value : values) {
- BytesWritable valueWritable = (BytesWritable) value;
-
- row.clear();
- row.add(this.keyObject);
- row.add(deserializeValue(valueWritable, tag));
-
- try {
- reducer.processOp(row, tag);
- } catch (Exception e) {
- String rowString = null;
- try {
- rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
- } catch (Exception e2) {
- rowString = "[Error getting row data with exception "
- + StringUtils.stringifyException(e2) + " ]";
- }
- throw new HiveException("Hive Runtime Error while processing row (tag="
- + tag + ") " + rowString, e);
- }
- if (isLogInfoEnabled) {
- logProgress();
- }
- }
- return true; //give me more
- }
-
- /**
- * @param values
- * @return true if it is not done and can take more inputs
- */
- private boolean processVectors(Iterable<Object> values, byte tag) throws HiveException {
- VectorizedRowBatch batch = batches[tag];
- batch.reset();
- buffer.reset();
-
- /* deserialize key into columns */
- VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector,
- 0, 0, batch, buffer);
- for(int i = 0; i < keysColumnOffset; i++) {
- VectorizedBatchUtil.setRepeatingColumn(batch, i);
- }
-
- int rowIdx = 0;
- try {
- for (Object value : values) {
- /* deserialize value into columns */
- BytesWritable valueWritable = (BytesWritable) value;
- Object valueObj = deserializeValue(valueWritable, tag);
-
- VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag],
- rowIdx, keysColumnOffset, batch, buffer);
- rowIdx++;
- if (rowIdx >= BATCH_SIZE) {
- VectorizedBatchUtil.setBatchSize(batch, rowIdx);
- reducer.processOp(batch, tag);
- rowIdx = 0;
- buffer.reset();
- if (isLogInfoEnabled) {
- logProgress();
- }
- }
- }
- if (rowIdx > 0) {
- VectorizedBatchUtil.setBatchSize(batch, rowIdx);
- reducer.processOp(batch, tag);
- buffer.reset();
- }
- if (isLogInfoEnabled) {
- logProgress();
- }
- } catch (Exception e) {
- String rowString = null;
- try {
- /* batch.toString depends on this */
- batch.setValueWriters(valueStringWriters[tag]
- .toArray(new VectorExpressionWriter[0]));
- rowString = batch.toString();
- } catch (Exception e2) {
- rowString = "[Error getting row data with exception "
- + StringUtils.stringifyException(e2) + " ]";
- }
- throw new HiveException("Hive Runtime Error while processing vector batch (tag="
- + tag + ") " + rowString, e);
- }
- return true; // give me more
- }
-
@Override
void close(){
- // check if there are IOExceptions
- if (!abort) {
- abort = execContext.getIoCxt().getIOExceptions();
- }
-
try {
- if (groupKey != null) {
- // If a operator wants to do some work at the end of a group
- if(isLogTraceEnabled) {
- l4j.trace("End Group");
- }
- reducer.endGroup();
- }
- if (isLogInfoEnabled) {
- logCloseInfo();
+ for (ReduceRecordSource rs: sources) {
+ abort = abort && rs.close();
}
reducer.close(abort);
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * Process input from tez LogicalInput and write output - for a map plan
+ * Just pump the records through the query plan.
+ */
+@SuppressWarnings("deprecation")
+public class ReduceRecordSource implements RecordSource {
+
+ public static final Log l4j = LogFactory.getLog(ReduceRecordSource.class);
+
+ private static final String CLASS_NAME = ReduceRecordSource.class.getName();
+
+ private byte tag;
+
+ private boolean abort = false;
+
+ private static Deserializer inputKeyDeserializer;
+
+ // Input value serde needs to be an array to support different SerDe
+ // for different tags
+ private SerDe inputValueDeserializer;
+
+ TableDesc keyTableDesc;
+ TableDesc valueTableDesc;
+
+ ObjectInspector rowObjectInspector;
+ private Operator<?> reducer;
+
+ private Object keyObject = null;
+ private BytesWritable groupKey;
+
+ private boolean vectorized = false;
+
+ List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+
+ private DataOutputBuffer buffer;
+ private VectorizedRowBatch batch;
+
+ // number of columns pertaining to keys in a vectorized row batch
+ private int keysColumnOffset;
+ private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
+
+ private StructObjectInspector keyStructInspector;
+ private StructObjectInspector valueStructInspectors;
+
+ /* this is only used in the error code path */
+ private List<VectorExpressionWriter> valueStringWriters;
+
+ private KeyValuesReader reader;
+
+ private boolean handleGroupKey;
+
+ private ObjectInspector valueObjectInspector;
+
+ private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+
+ private Iterable<Object> valueWritables;
+
+ private final boolean grouped = true;
+
+ void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
+ TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag)
+ throws Exception {
+
+ ObjectInspector keyObjectInspector;
+
+ this.reducer = reducer;
+ this.vectorized = vectorized;
+ this.keyTableDesc = keyTableDesc;
+ this.reader = reader;
+ this.handleGroupKey = handleGroupKey;
+ this.tag = tag;
+
+ try {
+ inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+ .getDeserializerClass(), null);
+ SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
+ keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+ reducer.setGroupKeyObjectInspector(keyObjectInspector);
+
+ if(vectorized) {
+ keyStructInspector = (StructObjectInspector) keyObjectInspector;
+ keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
+ buffer = new DataOutputBuffer();
+ }
+
+ // We should initialize the SerDe with the TypeInfo when available.
+ this.valueTableDesc = valueTableDesc;
+ inputValueDeserializer = (SerDe) ReflectionUtils.newInstance(
+ valueTableDesc.getDeserializerClass(), null);
+ SerDeUtils.initializeSerDe(inputValueDeserializer, null,
+ valueTableDesc.getProperties(), null);
+ valueObjectInspector = inputValueDeserializer.getObjectInspector();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+
+ if(vectorized) {
+ /* vectorization only works with struct object inspectors */
+ valueStructInspectors = (StructObjectInspector) valueObjectInspector;
+
+ batch = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector,
+ valueStructInspectors);
+
+ final int totalColumns = keysColumnOffset +
+ valueStructInspectors.getAllStructFieldRefs().size();
+ valueStringWriters = new ArrayList<VectorExpressionWriter>(totalColumns);
+ valueStringWriters.addAll(Arrays
+ .asList(VectorExpressionWriterFactory
+ .genVectorStructExpressionWritables(keyStructInspector)));
+ valueStringWriters.addAll(Arrays
+ .asList(VectorExpressionWriterFactory
+ .genVectorStructExpressionWritables(valueStructInspectors)));
+
+ /*
+ * The row object inspector used by ReduceWork needs to be a **standard**
+ * struct object inspector, not just any struct object inspector.
+ */
+ ArrayList<String> colNames = new ArrayList<String>();
+ List<? extends StructField> fields = keyStructInspector.getAllStructFieldRefs();
+ for (StructField field: fields) {
+ colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+ ois.add(field.getFieldObjectInspector());
+ }
+ fields = valueStructInspectors.getAllStructFieldRefs();
+ for (StructField field: fields) {
+ colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+ ois.add(field.getFieldObjectInspector());
+ }
+ rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
+ } else {
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector);
+ rowObjectInspector =
+ ObjectInspectorFactory.getStandardStructObjectInspector(Utilities.reduceFieldNameList,
+ ois);
+ }
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ throw new RuntimeException("Reduce operator initialization failed", e);
+ }
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+ }
+
+ @Override
+ public final boolean isGrouped() {
+ return grouped;
+ }
+
+ @Override
+ public boolean pushRecord() throws HiveException {
+ BytesWritable keyWritable;
+
+ try {
+ if (!reader.next()) {
+ return false;
+ } else {
+ keyWritable = (BytesWritable) reader.getCurrentKey();
+ valueWritables = reader.getCurrentValues();
+ }
+
+ //Set the key, check if this is a new group or same group
+ try {
+ keyObject = inputKeyDeserializer.deserialize(keyWritable);
+ } catch (Exception e) {
+ throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input key from "
+ + Utilities.formatBinaryString(keyWritable.getBytes(), 0, keyWritable.getLength())
+ + " with properties " + keyTableDesc.getProperties(), e);
+ }
+
+ if (handleGroupKey && !keyWritable.equals(this.groupKey)) {
+ // If a operator wants to do some work at the beginning of a group
+ if (groupKey == null) { // the first group
+ this.groupKey = new BytesWritable();
+ } else {
+ // If a operator wants to do some work at the end of a group
+ reducer.endGroup();
+ }
+
+ groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength());
+ reducer.setGroupKeyObject(keyObject);
+ reducer.startGroup();
+ }
+
+ /* this.keyObject passed via reference */
+ if(vectorized) {
+ processVectors(valueWritables, tag);
+ } else {
+ processKeyValues(valueWritables, tag);
+ }
+ return true;
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ l4j.fatal(StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Object deserializeValue(BytesWritable valueWritable, byte tag)
+ throws HiveException {
+
+ try {
+ return inputValueDeserializer.deserialize(valueWritable);
+ } catch (SerDeException e) {
+ throw new HiveException(
+ "Hive Runtime Error: Unable to deserialize reduce input value (tag="
+ + tag
+ + ") from "
+ + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength())
+ + " with properties " + valueTableDesc.getProperties(), e);
+ }
+ }
+
+ /**
+ * @param values
+ * @return true if it is not done and can take more inputs
+ */
+ private void processKeyValues(Iterable<Object> values, byte tag) throws HiveException {
+ List<Object> passDownKey = null;
+ for (Object value : values) {
+ BytesWritable valueWritable = (BytesWritable) value;
+
+ row.clear();
+ if (passDownKey == null) {
+ row.add(this.keyObject);
+ } else {
+ row.add(passDownKey.get(0));
+ }
+ if ((passDownKey == null) && (reducer instanceof CommonMergeJoinOperator)) {
+ passDownKey =
+ (List<Object>) ObjectInspectorUtils.copyToStandardObject(row,
+ reducer.getInputObjInspectors()[tag], ObjectInspectorCopyOption.WRITABLE);
+ row.remove(0);
+ row.add(0, passDownKey.get(0));
+ }
+
+ row.add(deserializeValue(valueWritable, tag));
+
+ try {
+ reducer.processOp(row, tag);
+ } catch (Exception e) {
+ String rowString = null;
+ try {
+ rowString = SerDeUtils.getJSONString(row, rowObjectInspector);
+ } catch (Exception e2) {
+ rowString = "[Error getting row data with exception "
+ + StringUtils.stringifyException(e2) + " ]";
+ }
+ throw new HiveException("Hive Runtime Error while processing row (tag="
+ + tag + ") " + rowString, e);
+ }
+ }
+ }
+
+ /**
+ * @param values
+ * @return true if it is not done and can take more inputs
+ */
+ private void processVectors(Iterable<Object> values, byte tag) throws HiveException {
+ batch.reset();
+
+ /* deserialize key into columns */
+ VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector,
+ 0, 0, batch, buffer);
+ for(int i = 0; i < keysColumnOffset; i++) {
+ VectorizedBatchUtil.setRepeatingColumn(batch, i);
+ }
+
+ int rowIdx = 0;
+ try {
+ for (Object value : values) {
+ /* deserialize value into columns */
+ BytesWritable valueWritable = (BytesWritable) value;
+ Object valueObj = deserializeValue(valueWritable, tag);
+
+ VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors,
+ rowIdx, keysColumnOffset, batch, buffer);
+ rowIdx++;
+ if (rowIdx >= BATCH_SIZE) {
+ VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+ reducer.processOp(batch, tag);
+ rowIdx = 0;
+ }
+ }
+ if (rowIdx > 0) {
+ VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+ reducer.processOp(batch, tag);
+ }
+ } catch (Exception e) {
+ String rowString = null;
+ try {
+ /* batch.toString depends on this */
+ batch.setValueWriters(valueStringWriters
+ .toArray(new VectorExpressionWriter[0]));
+ rowString = batch.toString();
+ } catch (Exception e2) {
+ rowString = "[Error getting row data with exception "
+ + StringUtils.stringifyException(e2) + " ]";
+ }
+ throw new HiveException("Hive Runtime Error while processing vector batch (tag="
+ + tag + ") " + rowString, e);
+ }
+ }
+
+ boolean close() throws Exception {
+ try {
+ if (handleGroupKey && groupKey != null) {
+ // If a operator wants to do some work at the end of a group
+ reducer.endGroup();
+ }
+ } catch (Exception e) {
+ if (!abort) {
+ // signal new failure to map-reduce
+ l4j.error("Hit error while closing operators - failing tree");
+ throw new RuntimeException("Hive Runtime Error while closing operators: "
+ + e.getMessage(), e);
+ }
+ }
+ return abort;
+ }
+
+ public ObjectInspector getObjectInspector() {
+ return rowObjectInspector;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Wed Sep 24 07:03:35 2014
@@ -37,6 +37,8 @@ public class TezContext extends MapredCo
private ProcessorContext processorContext;
+ private RecordSource[] sources;
+
public TezContext(boolean isMap, JobConf jobConf) {
super(isMap, jobConf);
}
@@ -70,4 +72,12 @@ public class TezContext extends MapredCo
public ProcessorContext getTezProcessorContext() {
return processorContext;
}
+
+ public RecordSource[] getRecordSources() {
+ return sources;
+ }
+
+ public void setRecordSources(RecordSource[] sources) {
+ this.sources = sources;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Sep 24 07:03:35 2014
@@ -17,6 +17,14 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -26,6 +34,7 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
@@ -34,11 +43,6 @@ import org.apache.tez.runtime.api.Logica
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.Map;
-
/**
* Hive processor for Tez that forms the vertices in Tez and processes the data.
* Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -90,7 +94,8 @@ public class TezProcessor extends Abstra
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
this.jobConf = new JobConf(conf);
- setupMRLegacyConfigs(getContext());
+ this.processorContext = getContext();
+ setupMRLegacyConfigs(processorContext);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
}
@@ -130,12 +135,6 @@ public class TezProcessor extends Abstra
if (isMap) {
rproc = new MapRecordProcessor(jobConf);
- MRInputLegacy mrInput = getMRInput(inputs);
- try {
- mrInput.init();
- } catch (IOException e) {
- throw new RuntimeException("Failed while initializing MRInput", e);
- }
} else {
rproc = new ReduceRecordProcessor();
}
@@ -148,18 +147,6 @@ public class TezProcessor extends Abstra
throws Exception {
Throwable originalThrowable = null;
try {
- TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
- // Start the actual Inputs. After MRInput initialization.
- for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
- if (!cacheAccess.isInputCached(inputEntry.getKey())) {
- LOG.info("Input: " + inputEntry.getKey() + " is not cached");
- inputEntry.getValue().start();
- } else {
- LOG.info("Input: " + inputEntry.getKey() +
- " is already cached. Skipping start");
- }
- }
-
// Outputs will be started later by the individual Processors.
MRTaskReporter mrReporter = new MRTaskReporter(getContext());
@@ -214,19 +201,4 @@ public class TezProcessor extends Abstra
writer.write(key, value);
}
}
-
- static MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) {
- //there should be only one MRInput
- MRInputLegacy theMRInput = null;
- for(LogicalInput inp : inputs.values()){
- if(inp instanceof MRInputLegacy){
- if(theMRInput != null){
- throw new IllegalArgumentException("Only one MRInput is expected");
- }
- //a better logic would be to find the alias
- theMRInput = (MRInputLegacy)inp;
- }
- }
- return theMRInput;
- }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Wed Sep 24 07:03:35 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
@@ -313,15 +314,16 @@ public class TezTask extends Task<TezWor
for (BaseWork v: children) {
// finally we can create the grouped edge
GroupInputEdge e = utils.createEdge(group, parentConf,
- workToVertex.get(v), work.getEdgeProperty(w, v));
+ workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v));
dag.addEdge(e);
}
} else {
// Regular vertices
JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
- Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr,
- additionalLr, fs, ctx, !isFinal, work);
+ Vertex wx =
+ utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal,
+ work, work.getVertexType(w));
dag.addVertex(wx);
utils.addCredentials(w, dag);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
@@ -335,7 +337,7 @@ public class TezTask extends Task<TezWor
TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
- e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp);
+ e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v));
dag.addEdge(e);
}
}
@@ -386,6 +388,9 @@ public class TezTask extends Task<TezWor
try {
List<BaseWork> ws = work.getAllWork();
for (BaseWork w: ws) {
+ if (w instanceof MergeJoinWork) {
+ w = ((MergeJoinWork) w).getMainWork();
+ }
for (Operator<?> op: w.getAllOperators()) {
op.jobClose(conf, rc == 0);
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez.tools;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * A KeyValuesReader implementation that returns a sorted stream of key-values
+ * by doing a sorted merge of the key-value in LogicalInputs.
+ * Tags are in the last byte of the key, so no special handling for tags is required.
+ * Uses a priority queue to pick the KeyValuesReader of the input that is next in
+ * sort order.
+ */
+public class KeyValueInputMerger extends KeyValueReader {
+
+ public static final Log l4j = LogFactory.getLog(KeyValueInputMerger.class);
+ private PriorityQueue<KeyValueReader> pQueue = null;
+ private KeyValueReader nextKVReader = null;
+
+ public KeyValueInputMerger(List<KeyValueReader> multiMRInputs) throws Exception {
+ //get KeyValuesReaders from the LogicalInput and add them to priority queue
+ int initialCapacity = multiMRInputs.size();
+ pQueue = new PriorityQueue<KeyValueReader>(initialCapacity, new KVReaderComparator());
+ l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size());
+ for (KeyValueReader input : multiMRInputs) {
+ addToQueue(input);
+ }
+ }
+
+ /**
+ * Add KeyValueReader to queue if it has more key-value
+ *
+ * @param kvReader
+ * @throws IOException
+ */
+ private void addToQueue(KeyValueReader kvReader) throws IOException {
+ if (kvReader.next()) {
+ pQueue.add(kvReader);
+ }
+ }
+
+ /**
+ * @return true if there are more key-values and advances to next key-values
+ * @throws IOException
+ */
+ @Override
+ public boolean next() throws IOException {
+ //add the previous nextKVReader back to queue
+ if(nextKVReader != null){
+ addToQueue(nextKVReader);
+ }
+
+ //get the new nextKVReader with lowest key
+ nextKVReader = pQueue.poll();
+ return nextKVReader != null;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return nextKVReader.getCurrentKey();
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ return nextKVReader.getCurrentValue();
+ }
+
+ /**
+ * Comparator that compares KeyValuesReader on their current key
+ */
+ class KVReaderComparator implements Comparator<KeyValueReader> {
+
+ @Override
+ public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) {
+ try {
+ BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue();
+ BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue();
+ return key1.compareTo(key2);
+ } catch (IOException e) {
+ l4j.error("Caught exception while reading shuffle input", e);
+ //die!
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}