You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/08/21 01:15:10 UTC
svn commit: r1619261 - in /hive/branches/tez/ql/src:
java/org/apache/hadoop/hive/ql/exec/tez/
java/org/apache/hadoop/hive/ql/exec/tez/tools/
java/org/apache/hadoop/hive/ql/metadata/
test/org/apache/hadoop/hive/ql/exec/tez/
Author: gunther
Date: Wed Aug 20 23:15:09 2014
New Revision: 1619261
URL: http://svn.apache.org/r1619261
Log:
HIVE-7808: Changes to work against Tez-0.5 RC (Siddharth Seth via Gunther Hagleitner)
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Wed Aug 20 23:15:09 2014
@@ -19,29 +19,28 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.dag.api.EdgeManager;
-import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import com.google.common.collect.Multimap;
-
-public class CustomPartitionEdge extends EdgeManager {
+public class CustomPartitionEdge extends EdgeManagerPlugin {
private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration conf = null;
- EdgeManagerContext context = null;
+ final EdgeManagerPluginContext context;
// used by the framework at runtime. initialize is the real initializer at runtime
- public CustomPartitionEdge(EdgeManagerContext context) {
+ public CustomPartitionEdge(EdgeManagerPluginContext context) {
super(context);
this.context = context;
}
@@ -65,17 +64,17 @@ public class CustomPartitionEdge extends
// called at runtime to initialize the custom edge.
@Override
public void initialize() {
- byte[] payload = context.getUserPayload();
+ ByteBuffer payload = context.getUserPayload().getPayload();
LOG.info("Initializing the edge, payload: " + payload);
if (payload == null) {
throw new RuntimeException("Invalid payload");
}
// De-serialization code
- DataInputBuffer dib = new DataInputBuffer();
- dib.reset(payload, payload.length);
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(payload);
conf = new CustomEdgeConfiguration();
try {
- conf.readFields(dib);
+ conf.readFields(dibb);
} catch (IOException e) {
throw new RuntimeException(e);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Wed Aug 20 23:15:09 2014
@@ -37,24 +37,24 @@ import org.apache.hadoop.io.serializer.S
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import com.google.common.base.Preconditions;
@@ -63,6 +63,7 @@ import com.google.common.collect.HashMul
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
/*
* Only works with old mapred API
@@ -74,8 +75,8 @@ public class CustomPartitionVertex exten
VertexManagerPluginContext context;
- private RootInputConfigureVertexTasksEvent configureVertexTaskEvent;
- private List<RootInputDataInformationEvent> dataInformationEvents;
+ private InputConfigureVertexTasksEvent configureVertexTaskEvent;
+ private List<InputDataInformationEvent> dataInformationEvents;
private int numBuckets = -1;
private Configuration conf = null;
private boolean rootVertexInitialized = false;
@@ -89,7 +90,7 @@ public class CustomPartitionVertex exten
@Override
public void initialize() {
this.context = getContext();
- ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload());
+ ByteBuffer byteBuf = context.getUserPayload().getPayload();
this.numBuckets = byteBuf.getInt();
}
@@ -129,8 +130,8 @@ public class CustomPartitionVertex exten
// but that
// means serializing another instance.
MRInputUserPayloadProto protoPayload =
- MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
- this.conf = MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes());
+ MRInputHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
+ this.conf = TezUtils.createConfFromByteString(protoPayload.getConfigurationBytes());
/*
* Currently in tez, the flow of events is thus:
@@ -146,13 +147,10 @@ public class CustomPartitionVertex exten
*/
// This assumes that Grouping will always be used.
- // Changing the InputFormat - so that the correct one is initialized in
- // MRInput.
- this.conf.set("mapred.input.format.class", TezGroupedSplitsInputFormat.class.getName());
+ // Enabling grouping on the payload.
MRInputUserPayloadProto updatedPayload =
- MRInputUserPayloadProto.newBuilder(protoPayload)
- .setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)).build();
- inputDescriptor.setUserPayload(updatedPayload.toByteArray());
+ MRInputUserPayloadProto.newBuilder(protoPayload).setGroupingEnabled(true).build();
+ inputDescriptor.setUserPayload(UserPayload.create(updatedPayload.toByteString().asReadOnlyByteBuffer()));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
@@ -162,14 +160,14 @@ public class CustomPartitionVertex exten
Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, List<FileSplit>>();
for (Event event : events) {
- if (event instanceof RootInputConfigureVertexTasksEvent) {
+ if (event instanceof InputConfigureVertexTasksEvent) {
// No tasks should have been started yet. Checked by initial state
// check.
Preconditions.checkState(dataInformationEventSeen == false);
Preconditions
.checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
"Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
- RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event;
+ InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
// The vertex cannot be configured until all DataEvents are seen - to
// build the routing table.
@@ -177,12 +175,12 @@ public class CustomPartitionVertex exten
dataInformationEvents =
Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
}
- if (event instanceof RootInputUpdatePayloadEvent) {
+ if (event instanceof InputUpdatePayloadEvent) {
// this event can never occur. If it does, fail.
Preconditions.checkState(false);
- } else if (event instanceof RootInputDataInformationEvent) {
+ } else if (event instanceof InputDataInformationEvent) {
dataInformationEventSeen = true;
- RootInputDataInformationEvent diEvent = (RootInputDataInformationEvent) event;
+ InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
dataInformationEvents.add(diEvent);
FileSplit fileSplit;
try {
@@ -206,8 +204,8 @@ public class CustomPartitionVertex exten
int totalResource = context.getTotalAvailableResource().getMemory();
int taskResource = context.getVertexTaskResource().getMemory();
float waves =
- conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+ conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+ TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
int availableSlots = totalResource / taskResource;
@@ -250,12 +248,12 @@ public class CustomPartitionVertex exten
// Construct the EdgeManager descriptor to be used by all edges which need
// the routing table.
- EdgeManagerDescriptor hiveEdgeManagerDesc =
- new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
- byte[] payload = getBytePayload(bucketToTaskMap);
+ EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
+ EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+ UserPayload payload = getBytePayload(bucketToTaskMap);
hiveEdgeManagerDesc.setUserPayload(payload);
- Map<String, EdgeManagerDescriptor> emMap = Maps.newHashMap();
+ Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
// Replace the edge manager for all vertices which have routing type custom.
for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) {
@@ -268,52 +266,51 @@ public class CustomPartitionVertex exten
LOG.info("Task count is " + taskCount);
- List<RootInputDataInformationEvent> taskEvents =
+ List<InputDataInformationEvent> taskEvents =
Lists.newArrayListWithCapacity(finalSplits.size());
// Re-serialize the splits after grouping.
int count = 0;
for (InputSplit inputSplit : finalSplits) {
- MRSplitProto serializedSplit = MRHelpers.createSplitProto(inputSplit);
- RootInputDataInformationEvent diEvent =
- new RootInputDataInformationEvent(count, serializedSplit.toByteArray());
+ MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
+ InputDataInformationEvent diEvent =
+ InputDataInformationEvent.create(count, serializedSplit.toByteString().asReadOnlyByteBuffer());
diEvent.setTargetIndex(count);
count++;
taskEvents.add(diEvent);
}
// Replace the Edge Managers
- Map<String, RootInputSpecUpdate> rootInputSpecUpdate =
- new HashMap<String, RootInputSpecUpdate>();
+ Map<String, InputSpecUpdate> rootInputSpecUpdate =
+ new HashMap<String, InputSpecUpdate>();
rootInputSpecUpdate.put(
inputName,
- RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
+ InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
context.setVertexParallelism(
taskCount,
- new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
+ VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
.toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
}
- private byte[] getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
+ UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
CustomEdgeConfiguration edgeConf =
new CustomEdgeConfiguration(routingTable.keySet().size(), routingTable);
DataOutputBuffer dob = new DataOutputBuffer();
edgeConf.write(dob);
byte[] serialized = dob.getData();
-
- return serialized;
+ return UserPayload.create(ByteBuffer.wrap(serialized));
}
- private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) throws IOException {
+ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws IOException {
InputSplit inputSplit = null;
if (event.getDeserializedUserPayload() != null) {
inputSplit = (InputSplit) event.getDeserializedUserPayload();
} else {
- MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+ MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
SerializationFactory serializationFactory = new SerializationFactory(new Configuration());
- inputSplit = MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
+ inputSplit = MRInputHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
}
if (!(inputSplit instanceof FileSplit)) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Wed Aug 20 23:15:09 2014
@@ -70,7 +70,6 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -79,43 +78,41 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.PreWarmVertex;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -236,9 +233,10 @@ public class DagUtils {
mergeInputClass = ConcatenatedMergedKeyValueInput.class;
int numBuckets = edgeProp.getNumBuckets();
VertexManagerPluginDescriptor desc =
- new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName());
- byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
- desc.setUserPayload(userPayload);
+ VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+ ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+ userPayload.flip();
+ desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
}
@@ -256,8 +254,8 @@ public class DagUtils {
break;
}
- return new GroupInputEdge(group, w, createEdgeProperty(edgeProp, vConf),
- new InputDescriptor(mergeInputClass.getName()));
+ return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf),
+ InputDescriptor.create(mergeInputClass.getName()));
}
/**
@@ -276,10 +274,11 @@ public class DagUtils {
switch(edgeProp.getEdgeType()) {
case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
- byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
- VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
+ ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+ userPayload.flip();
+ VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
CustomPartitionVertex.class.getName());
- desc.setUserPayload(userPayload);
+ desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
}
@@ -291,7 +290,7 @@ public class DagUtils {
// nothing
}
- return new Edge(v, w, createEdgeProperty(edgeProp, vConf));
+ return Edge.create(v, w, createEdgeProperty(edgeProp, vConf));
}
/*
@@ -300,7 +299,7 @@ public class DagUtils {
@SuppressWarnings("rawtypes")
private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
throws IOException {
- MRHelpers.translateVertexConfToTez(conf);
+ MRHelpers.translateMRConfToTez(conf);
String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
String partitionerClassName = conf.get("mapred.partitioner.class");
@@ -309,28 +308,28 @@ public class DagUtils {
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
case BROADCAST_EDGE:
- UnorderedUnpartitionedKVEdgeConfigurer et1Conf = UnorderedUnpartitionedKVEdgeConfigurer
+ UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig
.newBuilder(keyClass, valClass).setFromConfiguration(conf).build();
return et1Conf.createDefaultBroadcastEdgeProperty();
case CUSTOM_EDGE:
assert partitionerClassName != null;
partitionerConf = createPartitionerConf(partitionerClassName, conf);
- UnorderedPartitionedKVEdgeConfigurer et2Conf = UnorderedPartitionedKVEdgeConfigurer
+ UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig
.newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
.setFromConfiguration(conf).build();
- EdgeManagerDescriptor edgeDesc =
- new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
+ EdgeManagerPluginDescriptor edgeDesc =
+ EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration edgeConf =
new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
DataOutputBuffer dob = new DataOutputBuffer();
edgeConf.write(dob);
byte[] userPayload = dob.getData();
- edgeDesc.setUserPayload(userPayload);
+ edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
return et2Conf.createDefaultCustomEdgeProperty(edgeDesc);
case CUSTOM_SIMPLE_EDGE:
assert partitionerClassName != null;
partitionerConf = createPartitionerConf(partitionerClassName, conf);
- UnorderedPartitionedKVEdgeConfigurer et3Conf = UnorderedPartitionedKVEdgeConfigurer
+ UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig
.newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
.setFromConfiguration(conf).build();
return et3Conf.createDefaultEdgeProperty();
@@ -338,7 +337,7 @@ public class DagUtils {
default:
assert partitionerClassName != null;
partitionerConf = createPartitionerConf(partitionerClassName, conf);
- OrderedPartitionedKVEdgeConfigurer et4Conf = OrderedPartitionedKVEdgeConfigurer
+ OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig
.newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
.setFromConfiguration(conf).build();
return et4Conf.createDefaultEdgeProperty();
@@ -384,7 +383,7 @@ public class DagUtils {
*/
private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
Map<String, String> environment = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, isMap);
+ MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap);
return environment;
}
@@ -398,14 +397,14 @@ public class DagUtils {
if (javaOpts != null && !javaOpts.isEmpty()) {
String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
List<String> logProps = Lists.newArrayList();
- MRHelpers.addLog4jSystemProperties(logLevel, logProps);
+ TezUtils.addLog4jSystemProperties(logLevel, logProps);
StringBuilder sb = new StringBuilder();
for (String str : logProps) {
sb.append(str).append(" ");
}
return javaOpts + " " + sb.toString();
}
- return MRHelpers.getMapJavaOpts(conf);
+ return MRHelpers.getJavaOptsForMRMapper(conf);
}
/*
@@ -427,11 +426,11 @@ public class DagUtils {
Vertex map = null;
// use tez to combine splits
- boolean useTezGroupedSplits = false;
+ boolean groupSplitsInInputInitializer;
+
+ DataSourceDescriptor dataSource;
int numTasks = -1;
- Class<? extends TezRootInputInitializer> amSplitGeneratorClass = null;
- InputSplitInfo inputSplitInfo = null;
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
@@ -445,9 +444,9 @@ public class DagUtils {
}
if (vertexHasCustomInput) {
- useTezGroupedSplits = false;
- // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
- // here would cause pre-mature grouping which would be incorrect.
+ groupSplitsInInputInitializer = false;
+ // grouping happens in execution phase. The input payload should not enable grouping here,
+ // it will be enabled in the CustomVertex.
inputFormatClass = HiveInputFormat.class;
conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
// mapreduce.tez.input.initializer.serialize.event.payload should be set to false when using
@@ -457,50 +456,48 @@ public class DagUtils {
// we'll set up tez to combine spits for us iff the input format
// is HiveInputFormat
if (inputFormatClass == HiveInputFormat.class) {
- useTezGroupedSplits = true;
- conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
+ groupSplitsInInputInitializer = true;
+ } else {
+ groupSplitsInInputInitializer = false;
}
}
+ // set up the operator plan. Before setting up Inputs since the config is updated.
+ Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
+
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
// if we're generating the splits in the AM, we just need to set
// the correct plugin.
- if (useTezGroupedSplits) {
- amSplitGeneratorClass = HiveSplitGenerator.class;
+ if (groupSplitsInInputInitializer) {
+ // Not setting a payload, since the MRInput payload is the same and can be accessed.
+ InputInitializerDescriptor descriptor = InputInitializerDescriptor.create(
+ HiveSplitGenerator.class.getName());
+ dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(true)
+ .setCustomInitializerDescriptor(descriptor).build();
} else {
- amSplitGeneratorClass = MRInputAMSplitGenerator.class;
+ // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits
+ dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
}
} else {
- // client side split generation means we have to compute them now
- inputSplitInfo = MRHelpers.generateInputSplits(conf,
- new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_")));
- numTasks = inputSplitInfo.getNumTasks();
+ // Setup client side split generation.
+ dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
+ "split_" + mapWork.getName().replaceAll(" ", "_")), true);
+ numTasks = dataSource.getNumberOfShards();
}
- // set up the operator plan
- Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
-
- byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
- map = new Vertex(mapWork.getName(),
- new ProcessorDescriptor(MapTezProcessor.class.getName()).
+ UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
+ map = Vertex.create(mapWork.getName(),
+ ProcessorDescriptor.create(MapTezProcessor.class.getName()).
setUserPayload(serializedConf), numTasks, getContainerResource(conf));
map.setTaskEnvironment(getContainerEnvironment(conf, true));
map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
+ // Add the actual source input
String alias = mapWork.getAliasToWork().keySet().iterator().next();
-
- byte[] mrInput = null;
- if (useTezGroupedSplits) {
- mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf);
- } else {
- mrInput = MRHelpers.createMRInputPayload(serializedConf);
- }
- map.addDataSource(alias,
- new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()).
- setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput),null));
+ map.addDataSource(alias, dataSource);
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.put(getBaseName(appJarLr), appJarLr);
@@ -508,13 +505,6 @@ public class DagUtils {
localResources.put(getBaseName(lr), lr);
}
- if (inputSplitInfo != null) {
- // only relevant for client-side split generation
- map.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
- MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
- localResources);
- }
-
map.setTaskLocalFiles(localResources);
return map;
}
@@ -550,9 +540,9 @@ public class DagUtils {
Utilities.createTmpDirs(conf, reduceWork);
// create the vertex
- Vertex reducer = new Vertex(reduceWork.getName(),
- new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
- setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+ Vertex reducer = Vertex.create(reduceWork.getName(),
+ ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
+ setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork
.getNumReduceTasks(), getContainerResource(conf));
@@ -606,10 +596,10 @@ public class DagUtils {
int numContainers, Map<String, LocalResource> localResources) throws
IOException, TezException {
- ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
- prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
+ ProcessorDescriptor prewarmProcDescriptor = ProcessorDescriptor.create(HivePreWarmProcessor.class.getName());
+ prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- PreWarmVertex prewarmVertex = new PreWarmVertex("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf));
+ PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf));
Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
@@ -855,7 +845,7 @@ public class DagUtils {
public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
hiveConf.setBoolean("mapred.mapper.new-api", false);
- JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
+ JobConf conf = new JobConf(hiveConf);
conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
@@ -946,8 +936,8 @@ public class DagUtils {
// final vertices need to have at least one output
if (!hasChildren) {
v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
- new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null, null));
+ OutputDescriptor.create(MROutput.class.getName())
+ .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, null));
}
return v;
@@ -1015,16 +1005,16 @@ public class DagUtils {
if (edgeProp.isAutoReduce()) {
Configuration pluginConf = new Configuration(false);
VertexManagerPluginDescriptor desc =
- new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+ VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
pluginConf.setBoolean(
- ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
- pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+ pluginConf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
edgeProp.getMinReducer());
pluginConf.setLong(
- ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
edgeProp.getInputSizePerReducer());
- ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
- desc.setUserPayload(payload.toByteArray());
+ UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf);
+ desc.setUserPayload(payload);
v.setVertexManagerPlugin(desc);
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java Wed Aug 20 23:15:09 2014
@@ -25,16 +25,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import java.net.URL;
import java.net.JarURLConnection;
-import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
@@ -57,14 +56,13 @@ public class HivePreWarmProcessor extend
private Configuration conf;
- public HivePreWarmProcessor(TezProcessorContext context) {
+ public HivePreWarmProcessor(ProcessorContext context) {
super(context);
}
@Override
public void initialize() throws Exception {
- TezProcessorContext processorContext = getContext();
- byte[] userPayload = processorContext.getUserPayload();
+ UserPayload userPayload = getContext().getUserPayload();
this.conf = TezUtils.createConfFromUserPayload(userPayload);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Wed Aug 20 23:15:09 2014
@@ -35,22 +35,23 @@ import org.apache.hadoop.mapred.FileSpli
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
-import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
@@ -62,25 +63,25 @@ import com.google.common.collect.Multima
* making sure that splits from different partitions are only grouped if they
* are of the same schema, format and serde
*/
-public class HiveSplitGenerator extends TezRootInputInitializer {
+public class HiveSplitGenerator extends InputInitializer {
private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
private static final SplitGrouper grouper = new SplitGrouper();
- public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) {
+ public HiveSplitGenerator(InputInitializerContext initializerContext) {
super(initializerContext);
}
@Override
public List<Event> initialize() throws Exception {
- TezRootInputInitializerContext rootInputContext = getContext();
+ InputInitializerContext rootInputContext = getContext();
MRInputUserPayloadProto userPayloadProto =
- MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
+ MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
Configuration conf =
- MRHelpers.createConfFromByteString(userPayloadProto.getConfigurationBytes());
+ TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
boolean sendSerializedEvents =
conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
@@ -91,7 +92,8 @@ public class HiveSplitGenerator extends
InputSplitInfoMem inputSplitInfo = null;
String realInputFormatName = conf.get("mapred.input.format.class");
- if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
+ boolean groupingEnabled = userPayloadProto.getGroupingEnabled();
+ if (groupingEnabled) {
// Need to instantiate the realInputFormat
InputFormat<?, ?> inputFormat =
(InputFormat<?, ?>) ReflectionUtils.newInstance(Class.forName(realInputFormatName),
@@ -103,8 +105,8 @@ public class HiveSplitGenerator extends
// Create the un-grouped splits
float waves =
- conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+ conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+ TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
@@ -124,7 +126,13 @@ public class HiveSplitGenerator extends
new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
} else {
// no need for grouping and the target #of tasks.
- inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf, false, 0);
+ // This code path should never be triggered at the moment. If grouping is disabled,
+ // DAGUtils uses MRInputAMSplitGenerator.
+ // If this is used in the future - make sure to disable grouping in the payload, if it isn't already disabled
+ throw new RuntimeException(
+ "HiveInputFormat does not support non-grouped splits, InputFormatName is: "
+ + realInputFormatName);
+ // inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
}
return createEventList(sendSerializedEvents, inputSplitInfo);
@@ -181,31 +189,32 @@ public class HiveSplitGenerator extends
return groupedSplits;
}
- public void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws Exception {
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
}
private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
- RootInputConfigureVertexTasksEvent configureVertexEvent = new
- RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
- inputSplitInfo.getTaskLocationHints(),
- RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
+ InputConfigureVertexTasksEvent configureVertexEvent =
+ InputConfigureVertexTasksEvent.create(inputSplitInfo.getNumTasks(),
+ VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
+ InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
events.add(configureVertexEvent);
if (sendSerializedEvents) {
MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
int count = 0;
for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
- RootInputDataInformationEvent diEvent =
- new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
+ InputDataInformationEvent diEvent =
+ InputDataInformationEvent.create(count++, mrSplit.toByteString().asReadOnlyByteBuffer());
events.add(diEvent);
}
} else {
int count = 0;
for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
- RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+ InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, split);
events.add(diEvent);
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Aug 20 23:15:09 2014
@@ -47,7 +47,7 @@ import org.apache.tez.mapreduce.input.MR
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
/**
@@ -65,7 +65,7 @@ public class MapRecordProcessor extends
private MapWork mapWork;
@Override
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(jconf, processorContext, mrReporter, inputs, outputs);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java Wed Aug 20 23:15:09 2014
@@ -17,14 +17,14 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
/**
* Subclass that is used to indicate if this is a map or reduce process
*/
public class MapTezProcessor extends TezProcessor {
- public MapTezProcessor(TezProcessorContext context) {
+ public MapTezProcessor(ProcessorContext context) {
super(context);
this.isMap = true;
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Wed Aug 20 23:15:09 2014
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.exec.t
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
/**
@@ -30,6 +30,7 @@ import org.apache.tez.runtime.common.obj
public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
+ // TODO HIVE-7809. This is broken. A new instance of ObjectRegistry should not be created.
private final ObjectRegistry registry = new ObjectRegistryImpl();
@Override
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Aug 20 23:15:09 2014
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -47,7 +47,7 @@ public abstract class RecordProcessor {
protected Map<String, LogicalInput> inputs;
protected Map<String, LogicalOutput> outputs;
protected Map<String, OutputCollector> outMap;
- protected TezProcessorContext processorContext;
+ protected ProcessorContext processorContext;
public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
@@ -72,7 +72,7 @@ public abstract class RecordProcessor {
* @param outputs map of Output names to {@link LogicalOutput}s
* @throws Exception
*/
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
this.jconf = jconf;
this.reporter = mrReporter;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Aug 20 23:15:09 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -59,14 +58,13 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValuesReader;
/**
@@ -113,7 +111,7 @@ public class ReduceRecordProcessor exte
private List<VectorExpressionWriter>[] valueStringWriters;
@Override
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(jconf, processorContext, mrReporter, inputs, outputs);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Wed Aug 20 23:15:09 2014
@@ -17,14 +17,14 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
/**
* Subclass that is used to indicate if this is a map or reduce process
*/
public class ReduceTezProcessor extends TezProcessor {
- public ReduceTezProcessor(TezProcessorContext context) {
+ public ReduceTezProcessor(ProcessorContext context) {
super(context);
this.isMap = false;
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Wed Aug 20 23:15:09 2014
@@ -142,7 +142,7 @@ public class TezJobMonitor {
if (!running) {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
console.printInfo("Status: Running (application id: "
- +dagClient.getApplicationId()+")\n");
+ +dagClient.getExecutionContext()+")\n");
for (String s: progressMap.keySet()) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Aug 20 23:15:09 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
import java.text.NumberFormat;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -35,10 +34,9 @@ import org.apache.tez.mapreduce.input.MR
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
/**
@@ -59,8 +57,6 @@ public class TezProcessor extends Abstra
private static final String CLASS_NAME = TezProcessor.class.getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
- private TezProcessorContext processorContext;
-
protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
static {
@@ -70,7 +66,7 @@ public class TezProcessor extends Abstra
jobIdFormat.setMinimumIntegerDigits(4);
}
- public TezProcessor(TezProcessorContext context) {
+ public TezProcessor(ProcessorContext context) {
super(context);
}
@@ -88,18 +84,14 @@ public class TezProcessor extends Abstra
@Override
public void initialize() throws IOException {
- TezProcessorContext processorContext = getContext();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
- this.processorContext = processorContext;
- //get the jobconf
- byte[] userPayload = processorContext.getUserPayload();
- Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
+ Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
this.jobConf = new JobConf(conf);
- setupMRLegacyConfigs(processorContext);
+ setupMRLegacyConfigs(getContext());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
}
- private void setupMRLegacyConfigs(TezProcessorContext processorContext) {
+ private void setupMRLegacyConfigs(ProcessorContext processorContext) {
// Hive "insert overwrite local directory" uses task id as dir name
// Setting the id in jobconf helps to have the similar dir name as MR
StringBuilder taskAttemptIdBuilder = new StringBuilder("task");
@@ -134,7 +126,7 @@ public class TezProcessor extends Abstra
// in case of broadcast-join read the broadcast edge inputs
// (possibly asynchronously)
- LOG.info("Running task: " + processorContext.getUniqueIdentifier());
+ LOG.info("Running task: " + getContext().getUniqueIdentifier());
if (isMap) {
rproc = new MapRecordProcessor();
@@ -161,8 +153,8 @@ public class TezProcessor extends Abstra
// Outputs will be started later by the individual Processors.
- MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
- rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
+ MRTaskReporter mrReporter = new MRTaskReporter(getContext());
+ rproc.init(jobConf, getContext(), mrReporter, inputs, outputs);
rproc.run();
//done - output does not need to be committed as hive does not use outputcommitter
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Wed Aug 20 23:15:09 2014
@@ -41,21 +41,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
-import org.apache.tez.client.PreWarmVertex;
+import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
/**
@@ -165,7 +161,7 @@ public class TezSessionState {
// Create environment for AM.
Map<String, String> amEnv = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRAM(conf, amEnv);
+ MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv);
// and finally we're ready to create and start the session
// generate basic tez config
@@ -180,7 +176,7 @@ public class TezSessionState {
tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
}
- session = new TezClient("HIVE-" + sessionId, tezConfig, true,
+ session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
commonLocalResources, null);
LOG.info("Opening new Tez Session (id: " + sessionId
@@ -196,9 +192,13 @@ public class TezSessionState {
commonLocalResources);
try {
session.preWarm(prewarmVertex);
- } catch (InterruptedException ie) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hive Prewarm threw an exception ", ie);
+ } catch (IOException ie) {
+ if (ie.getMessage().contains("Interrupted while waiting")) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive Prewarm threw an exception ", ie);
+ }
+ } else {
+ throw ie;
}
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Wed Aug 20 23:15:09 2014
@@ -22,9 +22,9 @@ import java.util.List;
import java.util.Map;
import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.TezMergedInputContext;
/**
* TezMergedLogicalInput is an adapter to make union input look like
@@ -34,7 +34,7 @@ public class TezMergedLogicalInput exten
private Map<Input, Boolean> readyInputs = new IdentityHashMap<Input, Boolean>();
- public TezMergedLogicalInput(TezMergedInputContext context, List<Input> inputs) {
+ public TezMergedLogicalInput(MergedInputContext context, List<Input> inputs) {
super(context, inputs);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Wed Aug 20 23:15:09 2014
@@ -14,7 +14,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
Modified: hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Wed Aug 20 23:15:09 2014
@@ -33,7 +33,6 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -97,7 +96,7 @@ public class TestTezTask {
@Override
public Vertex answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
- return new Vertex(((BaseWork)args[1]).getName(),
+ return Vertex.create(((BaseWork)args[1]).getName(),
mock(ProcessorDescriptor.class), 0, mock(Resource.class));
}
});
@@ -108,7 +107,7 @@ public class TestTezTask {
@Override
public Edge answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
- return new Edge((Vertex)args[1], (Vertex)args[2], mock(EdgeProperty.class));
+ return Edge.create((Vertex)args[1], (Vertex)args[2], mock(EdgeProperty.class));
}
});