You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC
svn commit: r1623263 [17/28] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/
beeline/src/test/org/apache/hive/beeline/ bin/...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Mon Sep 8 04:38:17 2014
@@ -19,61 +19,62 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
-import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.dag.api.EdgeManager;
-import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import com.google.common.collect.Multimap;
-
-public class CustomPartitionEdge implements EdgeManager {
+public class CustomPartitionEdge extends EdgeManagerPlugin {
private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration conf = null;
+ final EdgeManagerPluginContext context;
// used by the framework at runtime. initialize is the real initializer at runtime
- public CustomPartitionEdge() {
+ public CustomPartitionEdge(EdgeManagerPluginContext context) {
+ super(context);
+ this.context = context;
}
+
@Override
- public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
- int destinationTaskIndex) {
- return numSourceTasks;
+ public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+ return context.getSourceVertexNumTasks();
}
@Override
- public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
- int sourceTaskIndex) {
+ public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
return conf.getNumBuckets();
}
@Override
- public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
- return numDestinationTasks;
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return context.getDestinationVertexNumTasks();
}
// called at runtime to initialize the custom edge.
@Override
- public void initialize(EdgeManagerContext context) {
- byte[] payload = context.getUserPayload();
+ public void initialize() {
+ ByteBuffer payload = context.getUserPayload().getPayload();
LOG.info("Initializing the edge, payload: " + payload);
if (payload == null) {
throw new RuntimeException("Invalid payload");
}
// De-serialization code
- DataInputBuffer dib = new DataInputBuffer();
- dib.reset(payload, payload.length);
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(payload);
conf = new CustomEdgeConfiguration();
try {
- conf.readFields(dib);
+ conf.readFields(dibb);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -83,30 +84,25 @@ public class CustomPartitionEdge impleme
@Override
public void routeDataMovementEventToDestination(DataMovementEvent event,
- int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> mapDestTaskIndices) {
- int srcIndex = event.getSourceIndex();
- List<Integer> destTaskIndices = new ArrayList<Integer>();
- destTaskIndices.addAll(conf.getRoutingTable().get(srcIndex));
- mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+ int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> mapDestTaskIndices) {
+ List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+ for (Integer destIndex : conf.getRoutingTable().get(sourceOutputIndex)) {
+ mapDestTaskIndices.put(destIndex, outputIndices);
+ }
}
@Override
- public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
- int numDestinationTasks, Map<Integer, List<Integer>> mapDestTaskIndices) {
- List<Integer> destTaskIndices = new ArrayList<Integer>();
- addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices);
- mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ Map<Integer, List<Integer>> mapDestTaskIndices) {
+ List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+ for (int i = 0; i < context.getDestinationVertexNumTasks(); i++) {
+ mapDestTaskIndices.put(i, outputIndices);
+ }
}
@Override
- public int routeInputErrorEventToSource(InputReadErrorEvent event,
- int destinationTaskIndex) {
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex, int destinationFailedInputIndex) {
return event.getIndex();
}
-
- void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndices) {
- for(int i=0; i<numDestinationTasks; ++i) {
- taskIndices.add(new Integer(i));
- }
- }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Mon Sep 8 04:38:17 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -30,27 +31,30 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import com.google.common.base.Preconditions;
@@ -59,40 +63,44 @@ import com.google.common.collect.HashMul
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
/*
* Only works with old mapred API
* Will only work with a single MRInput for now.
*/
-public class CustomPartitionVertex implements VertexManagerPlugin {
+public class CustomPartitionVertex extends VertexManagerPlugin {
private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
VertexManagerPluginContext context;
- private RootInputConfigureVertexTasksEvent configureVertexTaskEvent;
- private List<RootInputDataInformationEvent> dataInformationEvents;
+ private InputConfigureVertexTasksEvent configureVertexTaskEvent;
+ private List<InputDataInformationEvent> dataInformationEvents;
private int numBuckets = -1;
private Configuration conf = null;
private boolean rootVertexInitialized = false;
private final SplitGrouper grouper = new SplitGrouper();
+ private int taskCount = 0;
- public CustomPartitionVertex() {
+ public CustomPartitionVertex(VertexManagerPluginContext context) {
+ super(context);
}
@Override
- public void initialize(VertexManagerPluginContext context) {
- this.context = context;
- ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload());
+ public void initialize() {
+ this.context = getContext();
+ ByteBuffer byteBuf = context.getUserPayload().getPayload();
this.numBuckets = byteBuf.getInt();
}
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
int numTasks = context.getVertexNumTasks(context.getVertexName());
- List<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
+ List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks =
+ new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
for (int i = 0; i < numTasks; ++i) {
- scheduledTasks.add(new Integer(i));
+ scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
}
context.scheduleVertexTasks(scheduledTasks);
}
@@ -114,6 +122,7 @@ public class CustomPartitionVertex imple
// ensure this method is called only once. Tez will call it once per Root
// Input.
Preconditions.checkState(rootVertexInitialized == false);
+ LOG.info("Root vertex not initialized");
rootVertexInitialized = true;
try {
// This is using the payload from the RootVertexInitializer corresponding
@@ -121,8 +130,8 @@ public class CustomPartitionVertex imple
// but that
// means serializing another instance.
MRInputUserPayloadProto protoPayload =
- MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
- this.conf = MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes());
+ MRInputHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
+ this.conf = TezUtils.createConfFromByteString(protoPayload.getConfigurationBytes());
/*
* Currently in tez, the flow of events is thus:
@@ -138,30 +147,27 @@ public class CustomPartitionVertex imple
*/
// This assumes that Grouping will always be used.
- // Changing the InputFormat - so that the correct one is initialized in
- // MRInput.
- this.conf.set("mapred.input.format.class", TezGroupedSplitsInputFormat.class.getName());
+ // Enabling grouping on the payload.
MRInputUserPayloadProto updatedPayload =
- MRInputUserPayloadProto.newBuilder(protoPayload)
- .setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)).build();
- inputDescriptor.setUserPayload(updatedPayload.toByteArray());
+ MRInputUserPayloadProto.newBuilder(protoPayload).setGroupingEnabled(true).build();
+ inputDescriptor.setUserPayload(UserPayload.create(updatedPayload.toByteString().asReadOnlyByteBuffer()));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
boolean dataInformationEventSeen = false;
- Map<Path, List<FileSplit>> pathFileSplitsMap = new TreeMap<Path, List<FileSplit>>();
+ Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, List<FileSplit>>();
for (Event event : events) {
- if (event instanceof RootInputConfigureVertexTasksEvent) {
+ if (event instanceof InputConfigureVertexTasksEvent) {
// No tasks should have been started yet. Checked by initial state
// check.
Preconditions.checkState(dataInformationEventSeen == false);
Preconditions
.checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
"Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
- RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event;
+ InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
// The vertex cannot be configured until all DataEvents are seen - to
// build the routing table.
@@ -169,12 +175,12 @@ public class CustomPartitionVertex imple
dataInformationEvents =
Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
}
- if (event instanceof RootInputUpdatePayloadEvent) {
+ if (event instanceof InputUpdatePayloadEvent) {
// this event can never occur. If it does, fail.
Preconditions.checkState(false);
- } else if (event instanceof RootInputDataInformationEvent) {
+ } else if (event instanceof InputDataInformationEvent) {
dataInformationEventSeen = true;
- RootInputDataInformationEvent diEvent = (RootInputDataInformationEvent) event;
+ InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
dataInformationEvents.add(diEvent);
FileSplit fileSplit;
try {
@@ -182,10 +188,10 @@ public class CustomPartitionVertex imple
} catch (IOException e) {
throw new RuntimeException("Failed to get file split for event: " + diEvent);
}
- List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath());
+ List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath().getName());
if (fsList == null) {
fsList = new ArrayList<FileSplit>();
- pathFileSplitsMap.put(fileSplit.getPath(), fsList);
+ pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
}
fsList.add(fileSplit);
}
@@ -195,21 +201,32 @@ public class CustomPartitionVertex imple
getBucketSplitMapForPath(pathFileSplitsMap);
try {
- int totalResource = context.getTotalAVailableResource().getMemory();
+ int totalResource = context.getTotalAvailableResource().getMemory();
int taskResource = context.getVertexTaskResource().getMemory();
float waves =
- conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+ conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+ TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
int availableSlots = totalResource / taskResource;
LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves + " waves.");
+ JobConf jobConf = new JobConf(conf);
+ ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
- grouper.group(conf, bucketToInitialSplitMap, availableSlots, waves);
+ HashMultimap.<Integer, InputSplit> create();
+ for (Integer key : bucketToInitialSplitMap.keySet()) {
+ InputSplit[] inputSplitArray =
+ (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+ Multimap<Integer, InputSplit> groupedSplit =
+ HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
+ availableSlots);
+ bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+ }
+ LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks");
processAllEvents(inputName, bucketToGroupedSplitMap);
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -219,7 +236,6 @@ public class CustomPartitionVertex imple
Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
List<InputSplit> finalSplits = Lists.newLinkedList();
- int taskCount = 0;
for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
int bucketNum = entry.getKey();
Collection<InputSplit> initialSplits = entry.getValue();
@@ -232,12 +248,12 @@ public class CustomPartitionVertex imple
// Construct the EdgeManager descriptor to be used by all edges which need
// the routing table.
- EdgeManagerDescriptor hiveEdgeManagerDesc =
- new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
- byte[] payload = getBytePayload(bucketToTaskMap);
+ EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
+ EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+ UserPayload payload = getBytePayload(bucketToTaskMap);
hiveEdgeManagerDesc.setUserPayload(payload);
- Map<String, EdgeManagerDescriptor> emMap = Maps.newHashMap();
+ Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
// Replace the edge manager for all vertices which have routing type custom.
for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) {
@@ -250,47 +266,51 @@ public class CustomPartitionVertex imple
LOG.info("Task count is " + taskCount);
- List<RootInputDataInformationEvent> taskEvents =
+ List<InputDataInformationEvent> taskEvents =
Lists.newArrayListWithCapacity(finalSplits.size());
// Re-serialize the splits after grouping.
int count = 0;
for (InputSplit inputSplit : finalSplits) {
- MRSplitProto serializedSplit = MRHelpers.createSplitProto(inputSplit);
- RootInputDataInformationEvent diEvent =
- new RootInputDataInformationEvent(count, serializedSplit.toByteArray());
+ MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
+ InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
+ count, serializedSplit.toByteString().asReadOnlyByteBuffer());
diEvent.setTargetIndex(count);
count++;
taskEvents.add(diEvent);
}
// Replace the Edge Managers
+ Map<String, InputSpecUpdate> rootInputSpecUpdate =
+ new HashMap<String, InputSpecUpdate>();
+ rootInputSpecUpdate.put(
+ inputName,
+ InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
context.setVertexParallelism(
taskCount,
- new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap);
+ VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+ .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
}
- private byte[] getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
+ UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
CustomEdgeConfiguration edgeConf =
new CustomEdgeConfiguration(routingTable.keySet().size(), routingTable);
DataOutputBuffer dob = new DataOutputBuffer();
edgeConf.write(dob);
byte[] serialized = dob.getData();
-
- return serialized;
+ return UserPayload.create(ByteBuffer.wrap(serialized));
}
- private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) throws IOException {
+ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws IOException {
InputSplit inputSplit = null;
if (event.getDeserializedUserPayload() != null) {
inputSplit = (InputSplit) event.getDeserializedUserPayload();
} else {
- MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+ MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
SerializationFactory serializationFactory = new SerializationFactory(new Configuration());
- inputSplit = MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
+ inputSplit = MRInputHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
}
if (!(inputSplit instanceof FileSplit)) {
@@ -304,7 +324,7 @@ public class CustomPartitionVertex imple
* This method generates the map of bucket to file splits.
*/
private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
- Map<Path, List<FileSplit>> pathFileSplitsMap) {
+ Map<String, List<FileSplit>> pathFileSplitsMap) {
int bucketNum = 0;
int fsCount = 0;
@@ -312,7 +332,7 @@ public class CustomPartitionVertex imple
Multimap<Integer, InputSplit> bucketToInitialSplitMap =
ArrayListMultimap.<Integer, InputSplit> create();
- for (Map.Entry<Path, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
+ for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
int bucketId = bucketNum % numBuckets;
for (FileSplit fsplit : entry.getValue()) {
fsCount++;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Sep 8 04:38:17 2014
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
@@ -70,7 +71,6 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -79,43 +79,43 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
+import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -131,7 +131,7 @@ public class DagUtils {
private void addCredentials(MapWork mapWork, DAG dag) {
Set<String> paths = mapWork.getPathToAliases().keySet();
- if (paths != null && !paths.isEmpty()) {
+ if (!paths.isEmpty()) {
Iterator<URI> pathIterator = Iterators.transform(paths.iterator(), new Function<String, URI>() {
@Override
public URI apply(String input) {
@@ -163,6 +163,7 @@ public class DagUtils {
JobConf conf = new JobConf(baseConf);
if (mapWork.getNumMapTasks() != null) {
+ // Is this required ?
conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
}
@@ -201,6 +202,12 @@ public class DagUtils {
inpFormat = CombineHiveInputFormat.class.getName();
}
+ if (mapWork.getDummyTableScan()) {
+ // hive input format doesn't handle the special condition of no paths + 1
+ // split correctly.
+ inpFormat = CombineHiveInputFormat.class.getName();
+ }
+
conf.set(TEZ_TMP_DIR_KEY, context.getMRTmpPath().toUri().toString());
conf.set("mapred.mapper.class", ExecMapper.class.getName());
conf.set("mapred.input.format.class", inpFormat);
@@ -213,20 +220,19 @@ public class DagUtils {
* Edge between them.
*
* @param group The parent VertexGroup
- * @param wConf The job conf of the child vertex
+ * @param vConf The job conf of one of the parrent (grouped) vertices
* @param w The child vertex
* @param edgeProp the edge property of connection between the two
* endpoints.
*/
@SuppressWarnings("rawtypes")
- public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
+ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
Vertex w, TezEdgeProperty edgeProp)
throws IOException {
Class mergeInputClass;
- LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName());
- w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+ LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getName());
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
@@ -237,9 +243,10 @@ public class DagUtils {
mergeInputClass = ConcatenatedMergedKeyValueInput.class;
int numBuckets = edgeProp.getNumBuckets();
VertexManagerPluginDescriptor desc =
- new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName());
- byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
- desc.setUserPayload(userPayload);
+ VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+ ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+ userPayload.flip();
+ desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
}
@@ -257,47 +264,31 @@ public class DagUtils {
break;
}
- return new GroupInputEdge(group, w, createEdgeProperty(edgeProp),
- new InputDescriptor(mergeInputClass.getName()));
+ return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf),
+ InputDescriptor.create(mergeInputClass.getName()));
}
/**
- * Given two vertices a, b update their configurations to be used in an Edge a-b
- */
- public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
- throws IOException {
-
- // Tez needs to setup output subsequent input pairs correctly
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
-
- // update payloads (configuration for the vertices might have changed)
- v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
- w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
- }
-
- /**
- * Given two vertices and their respective configuration objects createEdge
+ * Given two vertices and the configuration for the source vertex, createEdge
* will create an Edge object that connects the two.
*
- * @param vConf JobConf of the first vertex
+ * @param vConf JobConf of the first (source) vertex
* @param v The first vertex (source)
- * @param wConf JobConf of the second vertex
* @param w The second vertex (sink)
* @return
*/
- public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
+ public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
TezEdgeProperty edgeProp)
throws IOException {
- updateConfigurationForEdge(vConf, v, wConf, w);
-
switch(edgeProp.getEdgeType()) {
case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
- byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
- VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
+ ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+ userPayload.flip();
+ VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
CustomPartitionVertex.class.getName());
- desc.setUserPayload(userPayload);
+ desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
}
@@ -309,71 +300,92 @@ public class DagUtils {
// nothing
}
- return new Edge(v, w, createEdgeProperty(edgeProp));
+ return Edge.create(v, w, createEdgeProperty(edgeProp, vConf));
}
/*
* Helper function to create an edge property from an edge type.
*/
@SuppressWarnings("rawtypes")
- private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException {
- DataMovementType dataMovementType;
- Class logicalInputClass;
- Class logicalOutputClass;
+ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
+ throws IOException {
+ MRHelpers.translateMRConfToTez(conf);
+ String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+ String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+ String partitionerClassName = conf.get("mapred.partitioner.class");
+ Map<String, String> partitionerConf;
- EdgeProperty edgeProperty = null;
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
case BROADCAST_EDGE:
- dataMovementType = DataMovementType.BROADCAST;
- logicalOutputClass = OnFileUnorderedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
+ UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig
+ .newBuilder(keyClass, valClass)
+ .setFromConfiguration(conf)
+ .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .build();
+ return et1Conf.createDefaultBroadcastEdgeProperty();
case CUSTOM_EDGE:
- dataMovementType = DataMovementType.CUSTOM;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- EdgeManagerDescriptor edgeDesc =
- new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+ .setFromConfiguration(conf)
+ .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .build();
+ EdgeManagerPluginDescriptor edgeDesc =
+ EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration edgeConf =
new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
DataOutputBuffer dob = new DataOutputBuffer();
edgeConf.write(dob);
byte[] userPayload = dob.getData();
- edgeDesc.setUserPayload(userPayload);
- edgeProperty =
- new EdgeProperty(edgeDesc,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- new OutputDescriptor(logicalOutputClass.getName()),
- new InputDescriptor(logicalInputClass.getName()));
- break;
-
+ edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
+ return et2Conf.createDefaultCustomEdgeProperty(edgeDesc);
case CUSTOM_SIMPLE_EDGE:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+ .setFromConfiguration(conf)
+ .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .build();
+ return et3Conf.createDefaultEdgeProperty();
case SIMPLE_EDGE:
default:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileSortedOutput.class;
- logicalInputClass = ShuffledMergedInputLegacy.class;
- break;
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+ .setFromConfiguration(conf)
+ .setKeySerializationClass(TezBytesWritableSerialization.class.getName(),
+ TezBytesComparator.class.getName(), null)
+ .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+ .build();
+ return et4Conf.createDefaultEdgeProperty();
}
+ }
- if (edgeProperty == null) {
- edgeProperty =
- new EdgeProperty(dataMovementType,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- new OutputDescriptor(logicalOutputClass.getName()),
- new InputDescriptor(logicalInputClass.getName()));
+ /**
+ * Utility method to create a stripped down configuration for the MR partitioner.
+ *
+ * @param partitionerClassName
+ * the real MR partitioner class name
+ * @param baseConf
+ * a base configuration to extract relevant properties
+ * @return
+ */
+ private Map<String, String> createPartitionerConf(String partitionerClassName,
+ Configuration baseConf) {
+ Map<String, String> partitionerConf = new HashMap<String, String>();
+ partitionerConf.put("mapred.partitioner.class", partitionerClassName);
+ if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) {
+ partitionerConf.put("mapreduce.totalorderpartitioner.path",
+ baseConf.get("mapreduce.totalorderpartitioner.path"));
}
-
- return edgeProperty;
+ return partitionerConf;
}
/*
@@ -391,6 +403,15 @@ public class DagUtils {
}
/*
+ * Helper to setup default environment for a task in YARN.
+ */
+ private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
+ Map<String, String> environment = new HashMap<String, String>();
+ MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap);
+ return environment;
+ }
+
+ /*
* Helper to determine what java options to use for the containers
* Falls back to Map-reduces map java opts if no tez specific options
* are set
@@ -400,14 +421,14 @@ public class DagUtils {
if (javaOpts != null && !javaOpts.isEmpty()) {
String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
List<String> logProps = Lists.newArrayList();
- MRHelpers.addLog4jSystemProperties(logLevel, logProps);
+ TezUtils.addLog4jSystemProperties(logLevel, logProps);
StringBuilder sb = new StringBuilder();
for (String str : logProps) {
sb.append(str).append(" ");
}
return javaOpts + " " + sb.toString();
}
- return MRHelpers.getMapJavaOpts(conf);
+ return MRHelpers.getJavaOptsForMRMapper(conf);
}
/*
@@ -425,18 +446,15 @@ public class DagUtils {
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, mapWork);
- // Tez ask us to call this even if there's no preceding vertex
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
// finally create the vertex
Vertex map = null;
// use tez to combine splits
- boolean useTezGroupedSplits = false;
+ boolean groupSplitsInInputInitializer;
+
+ DataSourceDescriptor dataSource;
int numTasks = -1;
- Class<HiveSplitGenerator> amSplitGeneratorClass = null;
- InputSplitInfo inputSplitInfo = null;
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
@@ -450,9 +468,9 @@ public class DagUtils {
}
if (vertexHasCustomInput) {
- useTezGroupedSplits = false;
- // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
- // here would cause pre-mature grouping which would be incorrect.
+ groupSplitsInInputInitializer = false;
+ // grouping happens in execution phase. The input payload should not enable grouping here,
+ // it will be enabled in the CustomVertex.
inputFormatClass = HiveInputFormat.class;
conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
// mapreduce.tez.input.initializer.serialize.event.payload should be set to false when using
@@ -462,49 +480,52 @@ public class DagUtils {
// we'll set up tez to combine spits for us iff the input format
// is HiveInputFormat
if (inputFormatClass == HiveInputFormat.class) {
- useTezGroupedSplits = true;
- conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+ groupSplitsInInputInitializer = true;
+ } else {
+ groupSplitsInInputInitializer = false;
}
}
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
+
+ // set up the operator plan. (before setting up splits on the AM)
+ Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
+
// if we're generating the splits in the AM, we just need to set
// the correct plugin.
- amSplitGeneratorClass = HiveSplitGenerator.class;
+ if (groupSplitsInInputInitializer) {
+ // Not setting a payload, since the MRInput payload is the same and can be accessed.
+ InputInitializerDescriptor descriptor = InputInitializerDescriptor.create(
+ HiveSplitGenerator.class.getName());
+ dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(true)
+ .setCustomInitializerDescriptor(descriptor).build();
+ } else {
+ // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits
+ dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+ }
} else {
- // client side split generation means we have to compute them now
- inputSplitInfo = MRHelpers.generateInputSplits(conf,
- new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_")));
- numTasks = inputSplitInfo.getNumTasks();
+ // Setup client side split generation.
+ dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
+ "split_" + mapWork.getName().replaceAll(" ", "_")), true);
+ numTasks = dataSource.getNumberOfShards();
+
+ // set up the operator plan. (after generating splits - that changes configs)
+ Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
}
- // set up the operator plan
- Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
-
- byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
- map = new Vertex(mapWork.getName(),
- new ProcessorDescriptor(MapTezProcessor.class.getName()).
+ UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
+ map = Vertex.create(mapWork.getName(),
+ ProcessorDescriptor.create(MapTezProcessor.class.getName()).
setUserPayload(serializedConf), numTasks, getContainerResource(conf));
- Map<String, String> environment = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
- map.setTaskEnvironment(environment);
- map.setJavaOpts(getContainerJavaOpts(conf));
+ map.setTaskEnvironment(getContainerEnvironment(conf, true));
+ map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
+ // Add the actual source input
String alias = mapWork.getAliasToWork().keySet().iterator().next();
-
- byte[] mrInput = null;
- if (useTezGroupedSplits) {
- mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
- HiveInputFormat.class.getName());
- } else {
- mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
- }
- map.addInput(alias,
- new InputDescriptor(MRInputLegacy.class.getName()).
- setUserPayload(mrInput), amSplitGeneratorClass);
+ map.addDataSource(alias, dataSource);
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.put(getBaseName(appJarLr), appJarLr);
@@ -512,14 +533,7 @@ public class DagUtils {
localResources.put(getBaseName(lr), lr);
}
- if (inputSplitInfo != null) {
- // only relevant for client-side split generation
- map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
- MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
- localResources);
- }
-
- map.setTaskLocalResources(localResources);
+ map.addTaskLocalFiles(localResources);
return map;
}
@@ -529,6 +543,7 @@ public class DagUtils {
private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) {
JobConf conf = new JobConf(baseConf);
+ // Is this required ?
conf.set("mapred.reducer.class", ExecReducer.class.getName());
boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
@@ -552,29 +567,22 @@ public class DagUtils {
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, reduceWork);
- // Call once here, will be updated when we find edges
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
// create the vertex
- Vertex reducer = new Vertex(reduceWork.getName(),
- new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
- setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+ Vertex reducer = Vertex.create(reduceWork.getName(),
+ ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
+ setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork
.getNumReduceTasks(), getContainerResource(conf));
- Map<String, String> environment = new HashMap<String, String>();
-
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
- reducer.setTaskEnvironment(environment);
-
- reducer.setJavaOpts(getContainerJavaOpts(conf));
+ reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
+ reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.put(getBaseName(appJarLr), appJarLr);
for (LocalResource lr: additionalLr) {
localResources.put(getBaseName(lr), lr);
}
- reducer.setTaskLocalResources(localResources);
+ reducer.addTaskLocalFiles(localResources);
return reducer;
}
@@ -608,37 +616,29 @@ public class DagUtils {
}
/**
- * @param sessionConfig session configuration
* @param numContainers number of containers to pre-warm
* @param localResources additional resources to pre-warm with
- * @return prewarm context object
+ * @return prewarm vertex to run
*/
- public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers,
- Map<String, LocalResource> localResources) throws IOException, TezException {
+ public PreWarmVertex createPreWarmVertex(TezConfiguration conf,
+ int numContainers, Map<String, LocalResource> localResources) throws
+ IOException, TezException {
- Configuration conf = sessionConfig.getTezConfiguration();
+ ProcessorDescriptor prewarmProcDescriptor = ProcessorDescriptor.create(HivePreWarmProcessor.class.getName());
+ prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
- prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
-
- PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, getContainerResource(conf),
- numContainers, null);
+ PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf));
Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
- combinedResources.putAll(sessionConfig.getSessionResources());
if (localResources != null) {
combinedResources.putAll(localResources);
}
- context.setLocalResources(combinedResources);
-
- /* boiler plate task env */
- Map<String, String> environment = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
- context.setEnvironment(environment);
- context.setJavaOpts(getContainerJavaOpts(conf));
- return context;
+ prewarmVertex.addTaskLocalFiles(localResources);
+ prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
+ prewarmVertex.setTaskEnvironment(getContainerEnvironment(conf, false));
+ return prewarmVertex;
}
/**
@@ -710,7 +710,7 @@ public class DagUtils {
/**
* Localizes files, archives and jars from a provided array of names.
- * @param hdfsDirPathStr Destination directoty in HDFS.
+ * @param hdfsDirPathStr Destination directory in HDFS.
* @param conf Configuration.
* @param inputOutputJars The file names to localize.
* @return List<LocalResource> local resources to add to execution
@@ -784,7 +784,7 @@ public class DagUtils {
}
/**
- * @param pathStr - the string from which we try to determine the resource base name
+ * @param path - the path from which we try to determine the resource base name
* @return the name of the resource from a given path string.
*/
public String getResourceBaseName(Path path) {
@@ -830,9 +830,8 @@ public class DagUtils {
int waitAttempts =
conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
- long sleepInterval =
- conf.getLong(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.varname,
- HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.defaultLongVal);
+ long sleepInterval = HiveConf.getTimeVar(
+ conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
+ sleepInterval);
boolean found = false;
@@ -873,7 +872,7 @@ public class DagUtils {
public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
hiveConf.setBoolean("mapred.mapper.new-api", false);
- JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
+ JobConf conf = new JobConf(hiveConf);
conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
@@ -919,7 +918,6 @@ public class DagUtils {
* @param work The instance of BaseWork representing the actual work to be performed
* by this vertex.
* @param scratchDir HDFS scratch dir for this execution unit.
- * @param list
* @param appJarLr Local resource for hive-exec.
* @param additionalLr
* @param fileSystem FS corresponding to scratchDir and LocalResources
@@ -963,9 +961,9 @@ public class DagUtils {
// final vertices need to have at least one output
if (!hasChildren) {
- v.addOutput("out_"+work.getName(),
- new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+ v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
+ OutputDescriptor.create(MROutput.class.getName())
+ .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, null));
}
return v;
@@ -1033,16 +1031,16 @@ public class DagUtils {
if (edgeProp.isAutoReduce()) {
Configuration pluginConf = new Configuration(false);
VertexManagerPluginDescriptor desc =
- new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+ VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
pluginConf.setBoolean(
- ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
- pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+ pluginConf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
edgeProp.getMinReducer());
pluginConf.setLong(
- ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
edgeProp.getInputSizePerReducer());
- ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
- desc.setUserPayload(payload.toByteArray());
+ UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf);
+ desc.setUserPayload(payload);
v.setVertexManagerPlugin(desc);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Mon Sep 8 04:38:17 2014
@@ -18,45 +18,29 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.debug.Utils;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
-import org.apache.hadoop.hive.ql.exec.persistence.LazyFlatRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -73,6 +57,7 @@ public class HashTableLoader implements
private Configuration hconf;
private MapJoinDesc desc;
private MapJoinKey lastKey = null;
+ private int rowCount = 0;
@Override
public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
@@ -125,6 +110,7 @@ public class HashTableLoader implements
: new HashMapWrapper(hconf, keyCount);
while (kvReader.next()) {
+ rowCount++;
lastKey = tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
valCtx, (Writable)kvReader.getCurrentValue());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java Mon Sep 8 04:38:17 2014
@@ -25,15 +25,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import java.net.URL;
import java.net.JarURLConnection;
-import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
@@ -48,7 +48,7 @@ import javax.crypto.Mac;
*
* @see Config for configuring the HivePreWarmProcessor
*/
-public class HivePreWarmProcessor implements LogicalIOProcessor {
+public class HivePreWarmProcessor extends AbstractLogicalIOProcessor {
private static boolean prewarmed = false;
@@ -56,10 +56,13 @@ public class HivePreWarmProcessor implem
private Configuration conf;
+ public HivePreWarmProcessor(ProcessorContext context) {
+ super(context);
+ }
+
@Override
- public void initialize(TezProcessorContext processorContext)
- throws Exception {
- byte[] userPayload = processorContext.getUserPayload();
+ public void initialize() throws Exception {
+ UserPayload userPayload = getContext().getUserPayload();
this.conf = TezUtils.createConfFromUserPayload(userPayload);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Mon Sep 8 04:38:17 2014
@@ -35,19 +35,23 @@ import org.apache.hadoop.mapred.FileSpli
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
@@ -59,20 +63,30 @@ import com.google.common.collect.Multima
* making sure that splits from different partitions are only grouped if they
* are of the same schema, format and serde
*/
-public class HiveSplitGenerator implements TezRootInputInitializer {
+@SuppressWarnings("deprecation")
+public class HiveSplitGenerator extends InputInitializer {
private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
- private final SplitGrouper grouper = new SplitGrouper();
+ private static final SplitGrouper grouper = new SplitGrouper();
+ private final DynamicPartitionPruner pruner = new DynamicPartitionPruner();
+ private InputInitializerContext context;
+
+ public HiveSplitGenerator(InputInitializerContext initializerContext) {
+ super(initializerContext);
+ }
@Override
- public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
+ public List<Event> initialize() throws Exception {
+ InputInitializerContext rootInputContext = getContext();
+
+ context = rootInputContext;
MRInputUserPayloadProto userPayloadProto =
- MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
+ MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
Configuration conf =
- MRHelpers.createConfFromByteString(userPayloadProto.getConfigurationBytes());
+ TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
boolean sendSerializedEvents =
conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
@@ -81,42 +95,66 @@ public class HiveSplitGenerator implemen
JobConf jobConf = new JobConf(conf);
ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
+ MapWork work = Utilities.getMapWork(jobConf);
+
+ // perform dynamic partition pruning
+ pruner.prune(work, jobConf, context);
+
InputSplitInfoMem inputSplitInfo = null;
- String realInputFormatName = userPayloadProto.getInputFormatName();
- if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
- inputSplitInfo = generateGroupedSplits(rootInputContext, jobConf, conf, realInputFormatName);
+ String realInputFormatName = conf.get("mapred.input.format.class");
+ boolean groupingEnabled = userPayloadProto.getGroupingEnabled();
+ if (groupingEnabled) {
+ // Need to instantiate the realInputFormat
+ InputFormat<?, ?> inputFormat =
+ (InputFormat<?, ?>) ReflectionUtils.newInstance(Class.forName(realInputFormatName),
+ jobConf);
+
+ int totalResource = rootInputContext.getTotalAvailableResource().getMemory();
+ int taskResource = rootInputContext.getVertexTaskResource().getMemory();
+ int availableSlots = totalResource / taskResource;
+
+ // Create the un-grouped splits
+ float waves =
+ conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+ TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
+
+ InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
+ LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
+ + " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
+
+ Multimap<Integer, InputSplit> groupedSplits =
+ generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+ // And finally return them in a flat array
+ InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
+ LOG.info("Number of grouped splits: " + flatSplits.length);
+
+ List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
+
+ Utilities.clearWork(jobConf);
+
+ inputSplitInfo =
+ new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
} else {
- inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
+ // no need for grouping and the target #of tasks.
+ // This code path should never be triggered at the moment. If grouping is disabled,
+ // DAGUtils uses MRInputAMSplitGenerator.
+ // If this is used in the future - make sure to disable grouping in the payload, if it isn't already disabled
+ throw new RuntimeException(
+ "HiveInputFormat does not support non-grouped splits, InputFormatName is: "
+ + realInputFormatName);
+ // inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
}
return createEventList(sendSerializedEvents, inputSplitInfo);
}
- private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext context,
- JobConf jobConf, Configuration conf, String realInputFormatName) throws Exception {
- int totalResource = context.getTotalAvailableResource().getMemory();
- int taskResource = context.getVertexTaskResource().getMemory();
- int availableSlots = totalResource / taskResource;
-
- float waves =
- conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
- TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+ public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+ Configuration conf, InputSplit[] splits, float waves, int availableSlots)
+ throws Exception {
MapWork work = Utilities.getMapWork(jobConf);
- LOG.info("Grouping splits for " + work.getName() + ". " + availableSlots + " available slots, "
- + waves + " waves. Input format is: " + realInputFormatName);
-
- // Need to instantiate the realInputFormat
- InputFormat<?, ?> inputFormat =
- (InputFormat<?, ?>) ReflectionUtils
- .newInstance(Class.forName(realInputFormatName), jobConf);
-
- // Create the un-grouped splits
- InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
- LOG.info("Number of input splits: " + splits.length);
-
Multimap<Integer, InputSplit> bucketSplitMultiMap =
ArrayListMultimap.<Integer, InputSplit> create();
@@ -159,41 +197,42 @@ public class HiveSplitGenerator implemen
Multimap<Integer, InputSplit> groupedSplits =
grouper.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
- // And finally return them in a flat array
- InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
- LOG.info("Number of grouped splits: " + flatSplits.length);
-
- List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
-
- Utilities.clearWork(jobConf);
-
- return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
+ return groupedSplits;
}
private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
- RootInputConfigureVertexTasksEvent configureVertexEvent =
- new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
- inputSplitInfo.getTaskLocationHints());
+ InputConfigureVertexTasksEvent configureVertexEvent =
+ InputConfigureVertexTasksEvent.create(inputSplitInfo.getNumTasks(),
+ VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
+ InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
events.add(configureVertexEvent);
if (sendSerializedEvents) {
MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
int count = 0;
for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
- RootInputDataInformationEvent diEvent =
- new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
+ InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
+ count++, mrSplit.toByteString().asReadOnlyByteBuffer());
events.add(diEvent);
}
} else {
int count = 0;
for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
- RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+ InputDataInformationEvent diEvent = InputDataInformationEvent.createWithObjectPayload(
+ count++, split);
events.add(diEvent);
}
}
return events;
}
+
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+ for (InputInitializerEvent e : events) {
+ pruner.getQueue().put(e);
+ }
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Mon Sep 8 04:38:17 2014
@@ -47,7 +47,7 @@ import org.apache.tez.mapreduce.input.MR
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
/**
@@ -64,8 +64,25 @@ public class MapRecordProcessor extends
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapWork mapWork;
+ public MapRecordProcessor(JobConf jconf) {
+ ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+ execContext.setJc(jconf);
+ // create map and fetch operators
+ mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+ if (mapWork == null) {
+ mapWork = Utilities.getMapWork(jconf);
+ cache.cache(MAP_PLAN_KEY, mapWork);
+ l4j.info("Plan: "+mapWork);
+ for (String s: mapWork.getAliases()) {
+ l4j.info("Alias: "+s);
+ }
+ } else {
+ Utilities.setMapWork(jconf, mapWork);
+ }
+ }
+
@Override
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(jconf, processorContext, mrReporter, inputs, outputs);
@@ -87,22 +104,7 @@ public class MapRecordProcessor extends
((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
}
- ObjectCache cache = ObjectCacheFactory.getCache(jconf);
try {
-
- execContext.setJc(jconf);
- // create map and fetch operators
- mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
- if (mapWork == null) {
- mapWork = Utilities.getMapWork(jconf);
- cache.cache(MAP_PLAN_KEY, mapWork);
- l4j.info("Plan: "+mapWork);
- for (String s: mapWork.getAliases()) {
- l4j.info("Alias: "+s);
- }
- } else {
- Utilities.setMapWork(jconf, mapWork);
- }
if (mapWork.getVectorMode()) {
mapOp = new VectorMapOperator();
} else {
@@ -115,7 +117,8 @@ public class MapRecordProcessor extends
l4j.info(mapOp.dump(0));
MapredContext.init(true, new JobConf(jconf));
- ((TezContext)MapredContext.get()).setInputs(inputs);
+ ((TezContext) MapredContext.get()).setInputs(inputs);
+ ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
mapOp.setExecContext(execContext);
mapOp.initializeLocalWork(jconf);
mapOp.initialize(jconf, null);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java Mon Sep 8 04:38:17 2014
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.tez.runtime.api.ProcessorContext;
+
/**
* Subclass that is used to indicate if this is a map or reduce process
*/
public class MapTezProcessor extends TezProcessor {
- public MapTezProcessor(){
- super(true);
+
+ public MapTezProcessor(ProcessorContext context) {
+ super(context);
+ this.isMap = true;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Mon Sep 8 04:38:17 2014
@@ -20,24 +20,40 @@ package org.apache.hadoop.hive.ql.exec.t
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+import org.apache.tez.runtime.api.ObjectRegistry;
+import com.google.common.base.Preconditions;
/**
* ObjectCache. Tez implementation based on the tez object registry.
*
*/
public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
-
+
private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
- private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+
+ // ObjectRegistry is available via the Input/Output/ProcessorContext.
+ // This is setup as part of the Tez Processor construction, so that it is available whenever an
+ // instance of the ObjectCache is created. The assumption is that Tez will initialize the Processor
+ // before anything else.
+ private volatile static ObjectRegistry staticRegistry;
+
+ private final ObjectRegistry registry;
+
+ public ObjectCache() {
+ Preconditions.checkNotNull(staticRegistry,
+ "Object registry not setup yet. This should have been setup by the TezProcessor");
+ registry = staticRegistry;
+ }
+ public static void setupObjectRegistry(ObjectRegistry objectRegistry) {
+ staticRegistry = objectRegistry;
+ }
+
@Override
public void cache(String key, Object value) {
LOG.info("Adding " + key + " to cache with value " + value);
- registry.add(ObjectLifeCycle.VERTEX, key, value);
+ registry.cacheForVertex(key, value);
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Mon Sep 8 04:38:17 2014
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -47,7 +47,7 @@ public abstract class RecordProcessor {
protected Map<String, LogicalInput> inputs;
protected Map<String, LogicalOutput> outputs;
protected Map<String, OutputCollector> outMap;
- protected TezProcessorContext processorContext;
+ protected ProcessorContext processorContext;
public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
@@ -72,7 +72,7 @@ public abstract class RecordProcessor {
* @param outputs map of Output names to {@link LogicalOutput}s
* @throws Exception
*/
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
this.jconf = jconf;
this.reporter = mrReporter;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Mon Sep 8 04:38:17 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -59,14 +58,13 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValuesReader;
/**
@@ -113,7 +111,7 @@ public class ReduceRecordProcessor exte
private List<VectorExpressionWriter>[] valueStringWriters;
@Override
- void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(jconf, processorContext, mrReporter, inputs, outputs);
@@ -140,7 +138,7 @@ public class ReduceRecordProcessor exte
try {
keyTableDesc = redWork.getKeyDesc();
- inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+ inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
.getDeserializerClass(), null);
SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
keyObjectInspector = inputKeyDeserializer.getObjectInspector();
@@ -152,7 +150,7 @@ public class ReduceRecordProcessor exte
keyStructInspector = (StructObjectInspector)keyObjectInspector;
batches = new VectorizedRowBatch[maxTags];
valueStructInspectors = new StructObjectInspector[maxTags];
- valueStringWriters = (List<VectorExpressionWriter>[])new List[maxTags];
+ valueStringWriters = new List[maxTags];
keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
buffer = new DataOutputBuffer();
}
@@ -215,7 +213,8 @@ public class ReduceRecordProcessor exte
}
MapredContext.init(false, new JobConf(jconf));
- ((TezContext)MapredContext.get()).setInputs(inputs);
+ ((TezContext) MapredContext.get()).setInputs(inputs);
+ ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
// initialize reduce operator tree
try {
@@ -306,7 +305,7 @@ public class ReduceRecordProcessor exte
Map<Integer, String> tag2input = redWork.getTagToInput();
ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
for(String inpStr : tag2input.values()){
- shuffleInputs.add((LogicalInput)inputs.get(inpStr));
+ shuffleInputs.add(inputs.get(inpStr));
}
return shuffleInputs;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Mon Sep 8 04:38:17 2014
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.tez.runtime.api.ProcessorContext;
+
/**
* Subclass that is used to indicate if this is a map or reduce process
*/
public class ReduceTezProcessor extends TezProcessor {
- public ReduceTezProcessor(){
- super(false);
+
+ public ReduceTezProcessor(ProcessorContext context) {
+ super(context);
+ this.isMap = false;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Mon Sep 8 04:38:17 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.FileSpli
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.split.TezGroupedSplit;
import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
@@ -141,13 +141,13 @@ public class SplitGrouper {
String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null;
if (rack == null) {
if (split.getLocations() != null) {
- locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays.asList(split
+ locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(split
.getLocations())), null));
} else {
- locationHints.add(new TaskLocationHint(null, null));
+ locationHints.add(TaskLocationHint.createTaskLocationHint(null, null));
}
} else {
- locationHints.add(new TaskLocationHint(null, Collections.singleton(rack)));
+ locationHints.add(TaskLocationHint.createTaskLocationHint(null, Collections.singleton(rack)));
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Mon Sep 8 04:38:17 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
/**
* TezContext contains additional context only available with Tez
@@ -31,9 +32,11 @@ public class TezContext extends MapredCo
// all the inputs for the tez processor
private Map<String, LogicalInput> inputs;
-
+
private Map<String, LogicalOutput> outputs;
+ private ProcessorContext processorContext;
+
public TezContext(boolean isMap, JobConf jobConf) {
super(isMap, jobConf);
}
@@ -41,7 +44,7 @@ public class TezContext extends MapredCo
public void setInputs(Map<String, LogicalInput> inputs) {
this.inputs = inputs;
}
-
+
public void setOutputs(Map<String, LogicalOutput> outputs) {
this.outputs = outputs;
}
@@ -52,11 +55,19 @@ public class TezContext extends MapredCo
}
return inputs.get(name);
}
-
+
public LogicalOutput getOutput(String name) {
if (outputs == null) {
return null;
}
return outputs.get(name);
}
+
+ public void setTezProcessorContext(ProcessorContext processorContext) {
+ this.processorContext = processorContext;
+ }
+
+ public ProcessorContext getTezProcessorContext() {
+ return processorContext;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon Sep 8 04:38:17 2014
@@ -142,7 +142,7 @@ public class TezJobMonitor {
if (!running) {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
console.printInfo("Status: Running (application id: "
- +dagClient.getApplicationId()+")\n");
+ +dagClient.getExecutionContext()+")\n");
for (String s: progressMap.keySet()) {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
}