You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/05 22:16:10 UTC
svn commit: r1622783 [2/3] - in /hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ data/files/ hbase-handler/
itests/src/test/resources/
itests/util/src/main/java/org/apache/hadoop/hive/ql/ ql/ ql/if/
ql/src/gen/thrift/gen-cpp/ ql/src/gen/thr...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Fri Sep 5 20:16:08 2014
@@ -18,45 +18,29 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.debug.Utils;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
-import org.apache.hadoop.hive.ql.exec.persistence.LazyFlatRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -73,6 +57,7 @@ public class HashTableLoader implements
private Configuration hconf;
private MapJoinDesc desc;
private MapJoinKey lastKey = null;
+ private int rowCount = 0;
@Override
public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
@@ -125,6 +110,7 @@ public class HashTableLoader implements
: new HashMapWrapper(hconf, keyCount);
while (kvReader.next()) {
+ rowCount++;
lastKey = tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
valCtx, (Writable)kvReader.getCurrentValue());
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java Fri Sep 5 20:16:08 2014
@@ -25,15 +25,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import java.net.URL;
import java.net.JarURLConnection;
-import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
@@ -48,7 +48,7 @@ import javax.crypto.Mac;
*
* @see Config for configuring the HivePreWarmProcessor
*/
-public class HivePreWarmProcessor implements LogicalIOProcessor {
+public class HivePreWarmProcessor extends AbstractLogicalIOProcessor {
private static boolean prewarmed = false;
@@ -56,10 +56,13 @@ public class HivePreWarmProcessor implem
private Configuration conf;
+ public HivePreWarmProcessor(ProcessorContext context) {
+ super(context);
+ }
+
@Override
- public void initialize(TezProcessorContext processorContext)
- throws Exception {
- byte[] userPayload = processorContext.getUserPayload();
+ public void initialize() throws Exception {
+ UserPayload userPayload = getContext().getUserPayload();
this.conf = TezUtils.createConfFromUserPayload(userPayload);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Sep 5 20:16:08 2014
@@ -35,19 +35,23 @@ import org.apache.hadoop.mapred.FileSpli
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
@@ -59,20 +63,30 @@ import com.google.common.collect.Multima
* making sure that splits from different partitions are only grouped if they
* are of the same schema, format and serde
*/
-public class HiveSplitGenerator implements TezRootInputInitializer {
+@SuppressWarnings("deprecation")
+public class HiveSplitGenerator extends InputInitializer {
private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
- private final SplitGrouper grouper = new SplitGrouper();
+ private static final SplitGrouper grouper = new SplitGrouper();
+ private final DynamicPartitionPruner pruner = new DynamicPartitionPruner();
+ private InputInitializerContext context;
+
+ public HiveSplitGenerator(InputInitializerContext initializerContext) {
+ super(initializerContext);
+ }
@Override
- public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
+ public List<Event> initialize() throws Exception {
+ InputInitializerContext rootInputContext = getContext();
+
+ context = rootInputContext;
MRInputUserPayloadProto userPayloadProto =
- MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
+ MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
Configuration conf =
- MRHelpers.createConfFromByteString(userPayloadProto.getConfigurationBytes());
+ TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
boolean sendSerializedEvents =
conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
@@ -81,42 +95,66 @@ public class HiveSplitGenerator implemen
JobConf jobConf = new JobConf(conf);
ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
+ MapWork work = Utilities.getMapWork(jobConf);
+
+ // perform dynamic partition pruning
+ pruner.prune(work, jobConf, context);
+
InputSplitInfoMem inputSplitInfo = null;
- String realInputFormatName = userPayloadProto.getInputFormatName();
- if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
- inputSplitInfo = generateGroupedSplits(rootInputContext, jobConf, conf, realInputFormatName);
+ String realInputFormatName = conf.get("mapred.input.format.class");
+ boolean groupingEnabled = userPayloadProto.getGroupingEnabled();
+ if (groupingEnabled) {
+ // Need to instantiate the realInputFormat
+ InputFormat<?, ?> inputFormat =
+ (InputFormat<?, ?>) ReflectionUtils.newInstance(Class.forName(realInputFormatName),
+ jobConf);
+
+ int totalResource = rootInputContext.getTotalAvailableResource().getMemory();
+ int taskResource = rootInputContext.getVertexTaskResource().getMemory();
+ int availableSlots = totalResource / taskResource;
+
+ // Create the un-grouped splits
+ float waves =
+ conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+ TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
+
+ InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
+ LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
+ + " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
+
+ Multimap<Integer, InputSplit> groupedSplits =
+ generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+ // And finally return them in a flat array
+ InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
+ LOG.info("Number of grouped splits: " + flatSplits.length);
+
+ List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
+
+ Utilities.clearWork(jobConf);
+
+ inputSplitInfo =
+ new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
} else {
- inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
+ // no need for grouping and the target #of tasks.
+ // This code path should never be triggered at the moment. If grouping is disabled,
+ // DAGUtils uses MRInputAMSplitGenerator.
+ // If this is used in the future - make sure to disable grouping in the payload, if it isn't already disabled
+ throw new RuntimeException(
+ "HiveInputFormat does not support non-grouped splits, InputFormatName is: "
+ + realInputFormatName);
+ // inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
}
return createEventList(sendSerializedEvents, inputSplitInfo);
}
- private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext context,
- JobConf jobConf, Configuration conf, String realInputFormatName) throws Exception {
- int totalResource = context.getTotalAvailableResource().getMemory();
- int taskResource = context.getVertexTaskResource().getMemory();
- int availableSlots = totalResource / taskResource;
-
- float waves =
- conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+ public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+ Configuration conf, InputSplit[] splits, float waves, int availableSlots)
+ throws Exception {
MapWork work = Utilities.getMapWork(jobConf);
- LOG.info("Grouping splits for " + work.getName() + ". " + availableSlots + " available slots, "
- + waves + " waves. Input format is: " + realInputFormatName);
-
- // Need to instantiate the realInputFormat
- InputFormat<?, ?> inputFormat =
- (InputFormat<?, ?>) ReflectionUtils
- .newInstance(Class.forName(realInputFormatName), jobConf);
-
- // Create the un-grouped splits
- InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
- LOG.info("Number of input splits: " + splits.length);
-
Multimap<Integer, InputSplit> bucketSplitMultiMap =
ArrayListMultimap.<Integer, InputSplit> create();
@@ -159,41 +197,42 @@ public class HiveSplitGenerator implemen
Multimap<Integer, InputSplit> groupedSplits =
grouper.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
- // And finally return them in a flat array
- InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
- LOG.info("Number of grouped splits: " + flatSplits.length);
-
- List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
-
- Utilities.clearWork(jobConf);
-
- return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
+ return groupedSplits;
}
private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
- RootInputConfigureVertexTasksEvent configureVertexEvent =
- new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
- inputSplitInfo.getTaskLocationHints());
+ InputConfigureVertexTasksEvent configureVertexEvent =
+ InputConfigureVertexTasksEvent.create(inputSplitInfo.getNumTasks(),
+ VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
+ InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
events.add(configureVertexEvent);
if (sendSerializedEvents) {
MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
int count = 0;
for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
- RootInputDataInformationEvent diEvent =
- new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
+ InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
+ count++, mrSplit.toByteString().asReadOnlyByteBuffer());
events.add(diEvent);
}
} else {
int count = 0;
for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
- RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+ InputDataInformationEvent diEvent = InputDataInformationEvent.createWithObjectPayload(
+ count++, split);
events.add(diEvent);
}
}
return events;
}
+
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+ for (InputInitializerEvent e : events) {
+ pruner.getQueue().put(e);
+ }
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Fri Sep 5 20:16:08 2014
@@ -47,7 +47,7 @@ import org.apache.tez.mapreduce.input.MR
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
/**
@@ -64,8 +64,25 @@ public class MapRecordProcessor extends
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapWork mapWork;
+ public MapRecordProcessor(JobConf jconf) {
+ ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+ execContext.setJc(jconf);
+ // create map and fetch operators
+ mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+ if (mapWork == null) {
+ mapWork = Utilities.getMapWork(jconf);
+ cache.cache(MAP_PLAN_KEY, mapWork);
+ l4j.info("Plan: "+mapWork);
+ for (String s: mapWork.getAliases()) {
+ l4j.info("Alias: "+s);
+ }
+ } else {
+ Utilities.setMapWork(jconf, mapWork);
+ }
+ }
+
@Override
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(jconf, processorContext, mrReporter, inputs, outputs);
@@ -87,22 +104,7 @@ public class MapRecordProcessor extends
((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
}
- ObjectCache cache = ObjectCacheFactory.getCache(jconf);
try {
-
- execContext.setJc(jconf);
- // create map and fetch operators
- mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
- if (mapWork == null) {
- mapWork = Utilities.getMapWork(jconf);
- cache.cache(MAP_PLAN_KEY, mapWork);
- l4j.info("Plan: "+mapWork);
- for (String s: mapWork.getAliases()) {
- l4j.info("Alias: "+s);
- }
- } else {
- Utilities.setMapWork(jconf, mapWork);
- }
if (mapWork.getVectorMode()) {
mapOp = new VectorMapOperator();
} else {
@@ -115,7 +117,8 @@ public class MapRecordProcessor extends
l4j.info(mapOp.dump(0));
MapredContext.init(true, new JobConf(jconf));
- ((TezContext)MapredContext.get()).setInputs(inputs);
+ ((TezContext) MapredContext.get()).setInputs(inputs);
+ ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
mapOp.setExecContext(execContext);
mapOp.initializeLocalWork(jconf);
mapOp.initialize(jconf, null);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java Fri Sep 5 20:16:08 2014
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.tez.runtime.api.ProcessorContext;
+
/**
* Subclass that is used to indicate if this is a map or reduce process
*/
public class MapTezProcessor extends TezProcessor {
- public MapTezProcessor(){
- super(true);
+
+ public MapTezProcessor(ProcessorContext context) {
+ super(context);
+ this.isMap = true;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Fri Sep 5 20:16:08 2014
@@ -20,24 +20,40 @@ package org.apache.hadoop.hive.ql.exec.t
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import com.google.common.base.Preconditions;
/**
* ObjectCache. Tez implementation based on the tez object registry.
*
*/
public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
-
+
private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
- private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+
+ // ObjectRegistry is available via the Input/Output/ProcessorContext.
+ // This is setup as part of the Tez Processor construction, so that it is available whenever an
+ // instance of the ObjectCache is created. The assumption is that Tez will initialize the Processor
+ // before anything else.
+ private volatile static ObjectRegistry staticRegistry;
+
+ private final ObjectRegistry registry;
+
+ public ObjectCache() {
+ Preconditions.checkNotNull(staticRegistry,
+ "Object registry not setup yet. This should have been setup by the TezProcessor");
+ registry = staticRegistry;
+ }
+ public static void setupObjectRegistry(ObjectRegistry objectRegistry) {
+ staticRegistry = objectRegistry;
+ }
+
@Override
public void cache(String key, Object value) {
LOG.info("Adding " + key + " to cache with value " + value);
- registry.add(ObjectLifeCycle.VERTEX, key, value);
+ registry.cacheForVertex(key, value);
}
@Override
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Fri Sep 5 20:16:08 2014
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -47,7 +47,7 @@ public abstract class RecordProcessor {
protected Map<String, LogicalInput> inputs;
protected Map<String, LogicalOutput> outputs;
protected Map<String, OutputCollector> outMap;
- protected TezProcessorContext processorContext;
+ protected ProcessorContext processorContext;
public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
@@ -72,7 +72,7 @@ public abstract class RecordProcessor {
* @param outputs map of Output names to {@link LogicalOutput}s
* @throws Exception
*/
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
this.jconf = jconf;
this.reporter = mrReporter;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Fri Sep 5 20:16:08 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -59,14 +58,13 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValuesReader;
/**
@@ -113,7 +111,7 @@ public class ReduceRecordProcessor exte
private List<VectorExpressionWriter>[] valueStringWriters;
@Override
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(jconf, processorContext, mrReporter, inputs, outputs);
@@ -140,7 +138,7 @@ public class ReduceRecordProcessor exte
try {
keyTableDesc = redWork.getKeyDesc();
- inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+ inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
.getDeserializerClass(), null);
SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
keyObjectInspector = inputKeyDeserializer.getObjectInspector();
@@ -152,7 +150,7 @@ public class ReduceRecordProcessor exte
keyStructInspector = (StructObjectInspector)keyObjectInspector;
batches = new VectorizedRowBatch[maxTags];
valueStructInspectors = new StructObjectInspector[maxTags];
- valueStringWriters = (List<VectorExpressionWriter>[])new List[maxTags];
+ valueStringWriters = new List[maxTags];
keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
buffer = new DataOutputBuffer();
}
@@ -215,7 +213,8 @@ public class ReduceRecordProcessor exte
}
MapredContext.init(false, new JobConf(jconf));
- ((TezContext)MapredContext.get()).setInputs(inputs);
+ ((TezContext) MapredContext.get()).setInputs(inputs);
+ ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
// initialize reduce operator tree
try {
@@ -306,7 +305,7 @@ public class ReduceRecordProcessor exte
Map<Integer, String> tag2input = redWork.getTagToInput();
ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
for(String inpStr : tag2input.values()){
- shuffleInputs.add((LogicalInput)inputs.get(inpStr));
+ shuffleInputs.add(inputs.get(inpStr));
}
return shuffleInputs;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Fri Sep 5 20:16:08 2014
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.tez.runtime.api.ProcessorContext;
+
/**
* Subclass that is used to indicate if this is a map or reduce process
*/
public class ReduceTezProcessor extends TezProcessor {
- public ReduceTezProcessor(){
- super(false);
+
+ public ReduceTezProcessor(ProcessorContext context) {
+ super(context);
+ this.isMap = false;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Fri Sep 5 20:16:08 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.FileSpli
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.split.TezGroupedSplit;
import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
@@ -141,13 +141,13 @@ public class SplitGrouper {
String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null;
if (rack == null) {
if (split.getLocations() != null) {
- locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays.asList(split
+ locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(split
.getLocations())), null));
} else {
- locationHints.add(new TaskLocationHint(null, null));
+ locationHints.add(TaskLocationHint.createTaskLocationHint(null, null));
}
} else {
- locationHints.add(new TaskLocationHint(null, Collections.singleton(rack)));
+ locationHints.add(TaskLocationHint.createTaskLocationHint(null, Collections.singleton(rack)));
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Fri Sep 5 20:16:08 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
/**
* TezContext contains additional context only available with Tez
@@ -31,9 +32,11 @@ public class TezContext extends MapredCo
// all the inputs for the tez processor
private Map<String, LogicalInput> inputs;
-
+
private Map<String, LogicalOutput> outputs;
+ private ProcessorContext processorContext;
+
public TezContext(boolean isMap, JobConf jobConf) {
super(isMap, jobConf);
}
@@ -41,7 +44,7 @@ public class TezContext extends MapredCo
public void setInputs(Map<String, LogicalInput> inputs) {
this.inputs = inputs;
}
-
+
public void setOutputs(Map<String, LogicalOutput> outputs) {
this.outputs = outputs;
}
@@ -52,11 +55,19 @@ public class TezContext extends MapredCo
}
return inputs.get(name);
}
-
+
public LogicalOutput getOutput(String name) {
if (outputs == null) {
return null;
}
return outputs.get(name);
}
+
+ public void setTezProcessorContext(ProcessorContext processorContext) {
+ this.processorContext = processorContext;
+ }
+
+ public ProcessorContext getTezProcessorContext() {
+ return processorContext;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Fri Sep 5 20:16:08 2014
@@ -142,7 +142,7 @@ public class TezJobMonitor {
if (!running) {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
console.printInfo("Status: Running (application id: "
- +dagClient.getApplicationId()+")\n");
+ +dagClient.getExecutionContext()+")\n");
for (String s: progressMap.keySet()) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Fri Sep 5 20:16:08 2014
@@ -33,23 +33,23 @@ import org.apache.hadoop.util.StringUtil
import org.apache.tez.common.TezUtils;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
/**
* Hive processor for Tez that forms the vertices in Tez and processes the data.
* Does what ExecMapper and ExecReducer does for hive in MR framework.
*/
-public class TezProcessor implements LogicalIOProcessor {
+public class TezProcessor extends AbstractLogicalIOProcessor {
private static final Log LOG = LogFactory.getLog(TezProcessor.class);
- private boolean isMap = false;
+ protected boolean isMap = false;
RecordProcessor rproc = null;
@@ -58,8 +58,6 @@ public class TezProcessor implements Log
private static final String CLASS_NAME = TezProcessor.class.getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
- private TezProcessorContext processorContext;
-
protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
static {
@@ -69,8 +67,9 @@ public class TezProcessor implements Log
jobIdFormat.setMinimumIntegerDigits(4);
}
- public TezProcessor(boolean isMap) {
- this.isMap = isMap;
+ public TezProcessor(ProcessorContext context) {
+ super(context);
+ ObjectCache.setupObjectRegistry(context.getObjectRegistry());
}
@Override
@@ -86,19 +85,15 @@ public class TezProcessor implements Log
}
@Override
- public void initialize(TezProcessorContext processorContext)
- throws IOException {
+ public void initialize() throws IOException {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
- this.processorContext = processorContext;
- //get the jobconf
- byte[] userPayload = processorContext.getUserPayload();
- Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
+ Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
this.jobConf = new JobConf(conf);
- setupMRLegacyConfigs(processorContext);
+ setupMRLegacyConfigs(getContext());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
}
- private void setupMRLegacyConfigs(TezProcessorContext processorContext) {
+ private void setupMRLegacyConfigs(ProcessorContext processorContext) {
// Hive "insert overwrite local directory" uses task id as dir name
// Setting the id in jobconf helps to have the similar dir name as MR
StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
@@ -133,10 +128,10 @@ public class TezProcessor implements Log
// in case of broadcast-join read the broadcast edge inputs
// (possibly asynchronously)
- LOG.info("Running task: " + processorContext.getUniqueIdentifier());
+ LOG.info("Running task: " + getContext().getUniqueIdentifier());
if (isMap) {
- rproc = new MapRecordProcessor();
+ rproc = new MapRecordProcessor(jobConf);
MRInputLegacy mrInput = getMRInput(inputs);
try {
mrInput.init();
@@ -160,8 +155,8 @@ public class TezProcessor implements Log
// Outputs will be started later by the individual Processors.
- MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
- rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
+ MRTaskReporter mrReporter = new MRTaskReporter(getContext());
+ rproc.init(jobConf, getContext(), mrReporter, inputs, outputs);
rproc.run();
//done - output does not need to be committed as hive does not use outputcommitter
@@ -207,6 +202,7 @@ public class TezProcessor implements Log
this.writer = (KeyValueWriter) output.getWriter();
}
+ @Override
public void collect(Object key, Object value) throws IOException {
writer.write(key, value);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Sep 5 20:16:08 2014
@@ -47,10 +47,8 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.AMConfiguration;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSession;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -67,7 +65,7 @@ public class TezSessionState {
private HiveConf conf;
private Path tezScratchDir;
private LocalResource appJarLr;
- private TezSession session;
+ private TezClient session;
private String sessionId;
private DagUtils utils;
private String queueName;
@@ -150,11 +148,6 @@ public class TezSessionState {
refreshLocalResourcesFromConf(conf);
- // generate basic tez config
- TezConfiguration tezConfig = new TezConfiguration(conf);
-
- tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
-
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
@@ -168,15 +161,23 @@ public class TezSessionState {
// Create environment for AM.
Map<String, String> amEnv = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRAM(conf, amEnv);
+ MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv);
- AMConfiguration amConfig = new AMConfiguration(amEnv, commonLocalResources, tezConfig, null);
+ // and finally we're ready to create and start the session
+ // generate basic tez config
+ TezConfiguration tezConfig = new TezConfiguration(conf);
+ tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
- // configuration for the session
- TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
+ int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
+ n = Math.max(tezConfig.getInt(
+ TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS,
+ TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT), n);
+ tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
+ }
- // and finally we're ready to create and start the session
- session = new TezSession("HIVE-" + sessionId, sessionConfig);
+ session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
+ commonLocalResources, null);
LOG.info("Opening new Tez Session (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
@@ -187,20 +188,30 @@ public class TezSessionState {
int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
LOG.info("Prewarming " + n + " containers (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
- PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, commonLocalResources);
+ PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n,
+ commonLocalResources);
try {
- session.preWarm(context);
- } catch (InterruptedException ie) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hive Prewarm threw an exception ", ie);
+ session.preWarm(prewarmVertex);
+ } catch (IOException ie) {
+ if (ie.getMessage().contains("Interrupted while waiting")) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive Prewarm threw an exception ", ie);
+ }
+ } else {
+ throw ie;
}
}
}
-
+ try {
+ session.waitTillReady();
+ } catch(InterruptedException ie) {
+ //ignore
+ }
// In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
// id is used for tez to reuse the current session rather than start a new one.
conf.set("mapreduce.framework.name", "yarn-tez");
- conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString());
+ conf.set("mapreduce.tez.session.tokill-application-id",
+ session.getAppMasterApplicationId().toString());
openSessions.add(this);
}
@@ -277,7 +288,7 @@ public class TezSessionState {
return sessionId;
}
- public TezSession getSession() {
+ public TezClient getSession() {
return session;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Sep 5 20:16:08 2014
@@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -67,7 +66,7 @@ import org.apache.tez.dag.api.client.Sta
* using the Tez APIs directly.
*
*/
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class TezTask extends Task<TezWork> {
private static final String CLASS_NAME = TezTask.class.getName();
@@ -135,7 +134,7 @@ public class TezTask extends Task<TezWor
}
List<LocalResource> additionalLr = session.getLocalizedResources();
-
+
// log which resources we're adding (apart from the hive exec)
if (LOG.isDebugEnabled()) {
if (additionalLr == null || additionalLr.size() == 0) {
@@ -166,7 +165,7 @@ public class TezTask extends Task<TezWor
counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
TezSessionPoolManager.getInstance().returnSession(session);
- if (LOG.isInfoEnabled()) {
+ if (LOG.isInfoEnabled() && counters != null) {
for (CounterGroup group: counters) {
LOG.info(group.getDisplayName() +":");
for (TezCounter counter: group) {
@@ -212,7 +211,7 @@ public class TezTask extends Task<TezWor
FileSystem fs = scratchDir.getFileSystem(conf);
// the name of the dag is what is displayed in the AM/Job UI
- DAG dag = new DAG(work.getName());
+ DAG dag = DAG.create(work.getName());
for (BaseWork w: ws) {
@@ -247,16 +246,14 @@ public class TezTask extends Task<TezWor
}
VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
+ // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner.
+ // Pick any one source vertex to figure out the Edge configuration.
+ JobConf parentConf = workToConf.get(unionWorkItems.get(0));
+
// now hook up the children
for (BaseWork v: children) {
- // need to pairwise patch up the configuration of the vertices
- for (BaseWork part: unionWorkItems) {
- utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part),
- workToConf.get(v), workToVertex.get(v));
- }
-
// finally we can create the grouped edge
- GroupInputEdge e = utils.createEdge(group, workToConf.get(v),
+ GroupInputEdge e = utils.createEdge(group, parentConf,
workToVertex.get(v), work.getEdgeProperty(w, v));
dag.addEdge(e);
@@ -279,7 +276,7 @@ public class TezTask extends Task<TezWor
TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
- e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeProp);
+ e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp);
dag.addEdge(e);
}
}
@@ -305,7 +302,8 @@ public class TezTask extends Task<TezWor
try {
// ready to start execution on the cluster
- dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+ sessionState.getSession().addAppMasterLocalFiles(resourceMap);
+ dagClient = sessionState.getSession().submitDAG(dag);
} catch (SessionNotRunning nr) {
console.printInfo("Tez session was closed. Reopening...");
@@ -313,7 +311,7 @@ public class TezTask extends Task<TezWor
TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
console.printInfo("Session re-established.");
- dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+ dagClient = sessionState.getSession().submitDAG(dag);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java Fri Sep 5 20:16:08 2014
@@ -37,7 +37,7 @@ import org.apache.tez.runtime.library.ap
* Uses a priority queue to pick the KeyValuesReader of the input that is next in
* sort order.
*/
-public class InputMerger implements KeyValuesReader {
+public class InputMerger extends KeyValuesReader {
public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
private PriorityQueue<KeyValuesReader> pQueue = null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Fri Sep 5 20:16:08 2014
@@ -18,9 +18,11 @@
package org.apache.hadoop.hive.ql.exec.tez.tools;
import java.util.IdentityHashMap;
+import java.util.List;
import java.util.Map;
import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.Reader;
@@ -31,7 +33,11 @@ import org.apache.tez.runtime.api.Reader
public class TezMergedLogicalInput extends MergedLogicalInput {
private Map<Input, Boolean> readyInputs = new IdentityHashMap<Input, Boolean>();
-
+
+ public TezMergedLogicalInput(MergedInputContext context, List<Input> inputs) {
+ super(context, inputs);
+ }
+
@Override
public Reader getReader() throws Exception {
return new InputMerger(getInputs());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java Fri Sep 5 20:16:08 2014
@@ -18,14 +18,13 @@
package org.apache.hadoop.hive.ql.io;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.lib.HashPartitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> implements HivePartitioner<K2, V2> {
/** Use {@link Object#hashCode()} to partition. */
+ @Override
public int getBucket(K2 key, V2 value, int numBuckets) {
return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Fri Sep 5 20:16:08 2014
@@ -13,6 +13,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.common.FileUtils;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Fri Sep 5 20:16:08 2014
@@ -941,12 +941,12 @@ public final class ConstantPropagateProc
return null;
}
- List<ExprNodeDesc> newChildren = new ArrayList<ExprNodeDesc>();
- for (ExprNodeDesc expr : pred.getChildren()) {
- ExprNodeDesc constant = foldExpr(expr, constants, cppCtx, op, 0, false);
- newChildren.add(constant);
+ ExprNodeDesc constant = foldExpr(pred, constants, cppCtx, op, 0, false);
+ if (constant instanceof ExprNodeGenericFuncDesc) {
+ conf.setFilterExpr((ExprNodeGenericFuncDesc) constant);
+ } else {
+ conf.setFilterExpr(null);
}
- pred.setChildren(newChildren);
return null;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Fri Sep 5 20:16:08 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,6 +28,7 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -363,6 +366,19 @@ public class ConvertJoinMapJoin implemen
Operator<? extends OperatorDesc> parentBigTableOp
= mapJoinOp.getParentOperators().get(bigTablePosition);
if (parentBigTableOp instanceof ReduceSinkOperator) {
+ for (Operator<?> p : parentBigTableOp.getParentOperators()) {
+ // we might have generated a dynamic partition operator chain. Since
+ // we're removing the reduce sink we need do remove that too.
+ Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>();
+ for (Operator<?> c : p.getChildOperators()) {
+ if (hasDynamicPartitionBroadcast(c)) {
+ dynamicPartitionOperators.add(c);
+ }
+ }
+ for (Operator<?> c : dynamicPartitionOperators) {
+ p.removeChild(c);
+ }
+ }
mapJoinOp.getParentOperators().remove(bigTablePosition);
if (!(mapJoinOp.getParentOperators().contains(
parentBigTableOp.getParentOperators().get(0)))) {
@@ -380,4 +396,16 @@ public class ConvertJoinMapJoin implemen
return mapJoinOp;
}
+
+ private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
+ if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+ return true;
+ }
+ for (Operator<?> c : op.getChildOperators()) {
+ if (hasDynamicPartitionBroadcast(c)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Sep 5 20:16:08 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.ppd.PredicatePushDown;
import org.apache.hadoop.hive.ql.ppd.PredicateTransitivePropagate;
+import org.apache.hadoop.hive.ql.ppd.SyntheticJoinPredicate;
/**
* Implementation of the optimizer.
@@ -55,6 +56,7 @@ public class Optimizer {
transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
transformations.add(new PredicateTransitivePropagate());
+ transformations.add(new SyntheticJoinPredicate());
transformations.add(new PredicatePushDown());
transformations.add(new PartitionPruner());
transformations.add(new PartitionConditionRemover());
@@ -125,8 +127,8 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
transformations.add(new StatsOptimizer());
}
- if (pctx.getContext().getExplain() ||
- HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ if (pctx.getContext().getExplain()
+ && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
transformations.add(new AnnotateWithStatistics());
transformations.add(new AnnotateWithOpTraits());
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java Fri Sep 5 20:16:08 2014
@@ -186,8 +186,7 @@ public abstract class PrunerExpressionOp
return ((ExprNodeNullDesc) nd).clone();
}
- assert (false);
- return null;
+ return new ExprNodeConstantDesc(((ExprNodeDesc)nd).getTypeInfo(), null);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Fri Sep 5 20:16:08 2014
@@ -26,8 +26,6 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -42,7 +40,6 @@ import org.apache.hadoop.hive.ql.parse.G
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -134,7 +131,8 @@ public class ReduceSinkMapJoinProc imple
String prefix = Utilities.ReduceField.KEY.toString();
for (String keyCol : keyCols) {
ExprNodeDesc realCol = parentRS.getColumnExprMap().get(prefix + "." + keyCol);
- ColStatistics cs = StatsUtils.getColStatisticsFromExpression(null, stats, realCol);
+ ColStatistics cs =
+ StatsUtils.getColStatisticsFromExpression(context.conf, stats, realCol);
if (cs == null || cs.getCountDistint() <= 0) {
maxKeyCount = Long.MAX_VALUE;
break;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java Fri Sep 5 20:16:08 2014
@@ -396,8 +396,7 @@ public final class PcrExprProcFactory {
return new NodeInfoWrapper(WalkState.CONSTANT, null,
(ExprNodeDesc) nd);
}
- assert (false);
- return null;
+ return new NodeInfoWrapper(WalkState.UNKNOWN, null, (ExprNodeDesc)nd);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Fri Sep 5 20:16:08 2014
@@ -18,13 +18,18 @@
package org.apache.hadoop.hive.ql.optimizer.stats.annotation;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -67,12 +72,8 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.serde.serdeConstants;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
public class StatsRulesProcFactory {
@@ -657,7 +658,8 @@ public class StatsRulesProcFactory {
if (parentStats != null) {
// worst case, in the absence of column statistics assume half the rows are emitted
- if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) {
+ if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator
+ || gop.getChildOperators().get(0) instanceof AppMasterEventOperator) {
// map side
stats = parentStats.clone();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Fri Sep 5 20:16:08 2014
@@ -22,16 +22,14 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
/**
- * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ * FileSinkProcessor is a simple rule to remember seen file sinks for later
+ * processing.
*
*/
public class FileSinkProcessor implements NodeProcessor {
@@ -39,12 +37,6 @@ public class FileSinkProcessor implement
static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
@Override
- /*
- * (non-Javadoc)
- * we should ideally not modify the tree we traverse.
- * However, since we need to walk the tree at any time when we modify the
- * operator, we might as well do it here.
- */
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx, Object... nodeOutputs)
throws SemanticException {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Fri Sep 5 20:16:08 2014
@@ -26,29 +26,28 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
/**
@@ -134,6 +133,15 @@ public class GenTezProcContext implement
// remember which reducesinks we've already connected
public final Set<ReduceSinkOperator> connectedReduceSinks;
+ // remember the event operators we've seen
+ public final Set<AppMasterEventOperator> eventOperatorSet;
+
+ // remember the event operators we've abandoned.
+ public final Set<AppMasterEventOperator> abandonedEventOperatorSet;
+
+ // remember the connections between ts and event
+ public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -165,6 +173,9 @@ public class GenTezProcContext implement
this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>();
this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
+ this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+ this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+ this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
rootTasks.add(currentTask);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Fri Sep 5 20:16:08 2014
@@ -20,38 +20,43 @@ package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
import java.util.Deque;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.LinkedList;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
-import org.apache.hadoop.fs.Path;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
/**
* GenTezUtils is a collection of shared helper methods to produce
* TezWork
@@ -119,12 +124,12 @@ public class GenTezUtils {
int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
// min we allow tez to pick
- int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
+ int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
* minPartitionFactor));
minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
// max we allow tez to pick
- int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
+ int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
reduceWork.setMinReduceTasks(minPartition);
@@ -210,18 +215,20 @@ public class GenTezUtils {
BaseWork work)
throws SemanticException {
- Set<Operator<?>> roots = work.getAllRootOperators();
+ List<Operator<?>> roots = new ArrayList<Operator<?>>();
+ roots.addAll(work.getAllRootOperators());
if (work.getDummyOps() != null) {
roots.addAll(work.getDummyOps());
}
+ roots.addAll(context.eventOperatorSet);
// need to clone the plan.
- Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+ List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
// we're cloning the operator plan but we're retaining the original work. That means
// that root operators have to be replaced with the cloned ops. The replacement map
// tells you what that mapping is.
- Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+ BiMap<Operator<?>, Operator<?>> replacementMap = HashBiMap.create();
// there's some special handling for dummyOps required. Mapjoins won't be properly
// initialized if their dummy parents aren't initialized. Since we cloned the plan
@@ -231,11 +238,35 @@ public class GenTezUtils {
Iterator<Operator<?>> it = newRoots.iterator();
for (Operator<?> orig: roots) {
Operator<?> newRoot = it.next();
+
+ replacementMap.put(orig, newRoot);
+
if (newRoot instanceof HashTableDummyOperator) {
- dummyOps.add((HashTableDummyOperator)newRoot);
+ // dummy ops need to be updated to the cloned ones.
+ dummyOps.add((HashTableDummyOperator) newRoot);
+ it.remove();
+ } else if (newRoot instanceof AppMasterEventOperator) {
+ // event operators point to table scan operators. When cloning these we
+ // need to restore the original scan.
+ if (newRoot.getConf() instanceof DynamicPruningEventDesc) {
+ TableScanOperator ts = ((DynamicPruningEventDesc) orig.getConf()).getTableScan();
+ if (ts == null) {
+ throw new AssertionError("No table scan associated with dynamic event pruning. " + orig);
+ }
+ ((DynamicPruningEventDesc) newRoot.getConf()).setTableScan(ts);
+ }
it.remove();
} else {
- replacementMap.put(orig,newRoot);
+ if (newRoot instanceof TableScanOperator) {
+ if (context.tsToEventMap.containsKey(orig)) {
+ // we need to update event operators with the cloned table scan
+ for (AppMasterEventOperator event : context.tsToEventMap.get(orig)) {
+ ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot);
+ }
+ }
+ }
+ context.rootToWorkMap.remove(orig);
+ context.rootToWorkMap.put(newRoot, work);
}
}
@@ -272,6 +303,15 @@ public class GenTezUtils {
desc.setLinkedFileSinkDesc(linked);
}
+ if (current instanceof AppMasterEventOperator) {
+ // remember for additional processing later
+ context.eventOperatorSet.add((AppMasterEventOperator) current);
+
+ // mark the original as abandoned. Don't need it anymore.
+ context.abandonedEventOperatorSet.add((AppMasterEventOperator) replacementMap.inverse()
+ .get(current));
+ }
+
if (current instanceof UnionOperator) {
Operator<?> parent = null;
int count = 0;
@@ -337,4 +377,87 @@ public class GenTezUtils {
}
}
}
+
+ /**
+ * processAppMasterEvent sets up the event descriptor and the MapWork.
+ *
+ * @param procCtx
+ * @param event
+ */
+ public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) {
+
+ if (procCtx.abandonedEventOperatorSet.contains(event)) {
+ // don't need this anymore
+ return;
+ }
+
+ DynamicPruningEventDesc eventDesc = (DynamicPruningEventDesc)event.getConf();
+ TableScanOperator ts = eventDesc.getTableScan();
+
+ MapWork work = (MapWork) procCtx.rootToWorkMap.get(ts);
+ if (work == null) {
+ throw new AssertionError("No work found for tablescan " + ts);
+ }
+
+ BaseWork enclosingWork = getEnclosingWork(event, procCtx);
+ if (enclosingWork == null) {
+ throw new AssertionError("Cannot find work for operator" + event);
+ }
+ String sourceName = enclosingWork.getName();
+
+ // store the vertex name in the operator pipeline
+ eventDesc.setVertexName(work.getName());
+ eventDesc.setInputName(work.getAliases().get(0));
+
+ // store table descriptor in map-work
+ if (!work.getEventSourceTableDescMap().containsKey(sourceName)) {
+ work.getEventSourceTableDescMap().put(sourceName, new LinkedList<TableDesc>());
+ }
+ List<TableDesc> tables = work.getEventSourceTableDescMap().get(sourceName);
+ tables.add(event.getConf().getTable());
+
+ // store column name in map-work
+ if (!work.getEventSourceColumnNameMap().containsKey(sourceName)) {
+ work.getEventSourceColumnNameMap().put(sourceName, new LinkedList<String>());
+ }
+ List<String> columns = work.getEventSourceColumnNameMap().get(sourceName);
+ columns.add(eventDesc.getTargetColumnName());
+
+ // store partition key expr in map-work
+ if (!work.getEventSourcePartKeyExprMap().containsKey(sourceName)) {
+ work.getEventSourcePartKeyExprMap().put(sourceName, new LinkedList<ExprNodeDesc>());
+ }
+ List<ExprNodeDesc> keys = work.getEventSourcePartKeyExprMap().get(sourceName);
+ keys.add(eventDesc.getPartKey());
+
+ }
+
+ /**
+ * getEncosingWork finds the BaseWork any given operator belongs to.
+ */
+ public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
+ List<Operator<?>> ops = new ArrayList<Operator<?>>();
+ findRoots(op, ops);
+ for (Operator<?> r : ops) {
+ BaseWork work = procCtx.rootToWorkMap.get(r);
+ if (work != null) {
+ return work;
+ }
+ }
+ return null;
+ }
+
+ /*
+ * findRoots returns all root operators (in ops) that result in operator op
+ */
+ private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+ List<Operator<?>> parents = op.getParentOperators();
+ if (parents == null || parents.isEmpty()) {
+ ops.add(op);
+ return;
+ }
+ for (Operator<?> p : parents) {
+ findRoots(p, ops);
+ }
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java Fri Sep 5 20:16:08 2014
@@ -23,13 +23,18 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
/**
* OptimizeTezProcContext. OptimizeTezProcContext maintains information
* about the current operator plan as we walk the operator tree
@@ -47,19 +52,23 @@ public class OptimizeTezProcContext impl
public final Set<ReduceSinkOperator> visitedReduceSinks
= new HashSet<ReduceSinkOperator>();
+ public final Multimap<AppMasterEventOperator, TableScanOperator> eventOpToTableScanMap =
+ HashMultimap.create();
+
// rootOperators are all the table scan operators in sequence
// of traversal
- public final Deque<Operator<? extends OperatorDesc>> rootOperators;
+ public Deque<Operator<? extends OperatorDesc>> rootOperators;
- @SuppressWarnings("unchecked")
- public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext,
- Set<ReadEntity> inputs, Set<WriteEntity> outputs,
- Deque<Operator<?>> rootOperators) {
+ public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs) {
this.conf = conf;
this.parseContext = parseContext;
this.inputs = inputs;
this.outputs = outputs;
- this.rootOperators = rootOperators;
+ }
+
+ public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) {
+ this.rootOperators = roots;
}
}