You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ml...@apache.org on 2013/05/01 00:07:44 UTC
svn commit: r1477852 - in /incubator/tez/branches/TEZ-1:
tez-dag-api/src/main/java/org/apache/tez/dag/api/
tez-dag/src/main/java/org/apache/tez/dag/app/dag/
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/
tez-dag/src/main/java/org/apache/tez/dag...
Author: mliddell
Date: Tue Apr 30 22:07:43 2013
New Revision: 1477852
URL: http://svn.apache.org/r1477852
Log:
TEZ-20: passing javaopts from DAG API through to taskattempts
Modified:
incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java Tue Apr 30 22:07:43 2013
@@ -366,6 +366,8 @@ public class DAGConfiguration extends Co
setVertexEnv(vertex);
// set processor name
setVertexTaskModuleClassName(vertex);
+ //set javaOpts
+ setVertexJavaOpts(vertex.getVertexName(), vertex.getJavaOpts());
}
}
@@ -375,7 +377,7 @@ public class DAGConfiguration extends Co
public String[] getInputVertices(String vertexName) {
String[] vertices =
getStrings(TEZ_DAG_VERTEX_INPUT_VERTICES + "." + vertexName, EMPTY);
- return vertices == null? EMPTY : vertices;
+ return vertices == null ? EMPTY : vertices;
}
@Private
public void setInputVertices(String vertexName, List<Vertex> inputVertices) {
@@ -455,6 +457,18 @@ public class DAGConfiguration extends Co
String taskModule) {
set(TEZ_DAG_VERTEX_TASK_MODULE + "." + vertexName, taskModule);
}
+
+ public final String TEZ_DAG_VERTEX_JAVAOPTS= VERTEX + "java-opts";
+ @Private
+ public String getVertexJavaOpts(String vertexName) {
+ String opts = get(TEZ_DAG_VERTEX_JAVAOPTS + "." + vertexName);
+ return opts == null? "" : opts;
+ }
+
+ @Private
+ public void setVertexJavaOpts(String vertexName, String javaOpts){
+ set(TEZ_DAG_VERTEX_JAVAOPTS + "." + vertexName, javaOpts);
+ }
/// File used for storing location hints that are passed to the DAG
public static final String DAG_LOCATION_HINT_RESOURCE_FILE =
Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java Tue Apr 30 22:07:43 2013
@@ -41,6 +41,8 @@ public class Vertex { // FIXME rename to
private final List<Vertex> outputVertices = new ArrayList<Vertex>();
private final List<String> inputEdgeIds = new ArrayList<String>();
private final List<String> outputEdgeIds = new ArrayList<String>();
+ private String javaOpts = "";
+
public Vertex(String vertexName, String processorName, int parallelism) {
this.vertexName = vertexName;
@@ -94,6 +96,14 @@ public class Vertex { // FIXME rename to
return taskEnvironment;
}
+ public void setJavaOpts(String javaOpts){
+ this. javaOpts = javaOpts;
+ }
+
+ public String getJavaOpts(){
+ return javaOpts;
+ }
+
@Override
public String toString() {
return "[" + vertexName + " : " + processorName + "]";
@@ -125,8 +135,6 @@ public class Vertex { // FIXME rename to
return outputEdgeIds;
}
- // FIXME add java opts?
-
// FIXME how do we support profiling? Can't profile all tasks.
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java Tue Apr 30 22:07:43 2013
@@ -108,4 +108,6 @@ public interface TaskAttempt {
public Map<String, LocalResource> getLocalResources();
public Map<String, String> getEnvironment();
+
+ public String getJavaOpts();
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java Tue Apr 30 22:07:43 2013
@@ -141,7 +141,7 @@ public class TaskAttemptImpl implements
private final Resource taskResource;
private final Map<String, LocalResource> localResources;
private final Map<String, String> environment;
-
+ private final String javaOpts;
private final boolean isRescheduled;
private boolean speculatorContainerRequestSent = false;
@@ -161,6 +161,7 @@ public class TaskAttemptImpl implements
STATUS_UPDATER = new StatusUpdaterTransition();
private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
+
private static StateMachineFactory
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory
@@ -257,7 +258,8 @@ public class TaskAttemptImpl implements
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
String mrxModuleClassName, TaskLocationHint locationHint,
Resource resource, Map<String, LocalResource> localResources,
- Map<String, String> environment, boolean isRescheduled) {
+ Map<String, String> environment,
+ String javaOpts, boolean isRescheduled) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
@@ -281,6 +283,7 @@ public class TaskAttemptImpl implements
this.locationHint = locationHint;
this.localResources = localResources;
this.environment = environment;
+ this.javaOpts = javaOpts;
this.isRescheduled = isRescheduled;
}
@@ -889,7 +892,8 @@ public class TaskAttemptImpl implements
ta.localResources, remoteTaskContext, ta,
ta.credentials, ta.jobToken, hostArray,
rackArray,
- scheduleEvent.getPriority(), ta.environment, ta.conf);
+ scheduleEvent.getPriority(), ta.environment, //ta.javaOpts,
+ ta.conf);
ta.sendEvent(launchRequestEvent);
}
}
@@ -1275,4 +1279,10 @@ public class TaskAttemptImpl implements
public Map<String, String> getEnvironment() {
return this.environment;
}
+
+ @Override
+ public String getJavaOpts() {
+ return this.javaOpts;
+ }
+
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java Tue Apr 30 22:07:43 2013
@@ -253,6 +253,8 @@ public class TaskImpl implements Task, E
private final boolean leafVertex;
+ private String javaOpts;
+
@Override
public TaskState getState() {
readLock.lock();
@@ -277,7 +279,8 @@ public class TaskImpl implements Task, E
String mrxModuleClassName,
boolean leafVertex, TaskLocationHint locationHint, Resource resource,
Map<String, LocalResource> localResources,
- Map<String, String> environment) {
+ Map<String, String> environment,
+ String javaOpts) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
@@ -308,7 +311,7 @@ public class TaskImpl implements Task, E
this.taskResource = resource;
this.localResources = localResources;
this.environment = environment;
-
+ this.javaOpts = javaOpts;
// TODO: Recovery
/*
// See if this is from a previous generation.
@@ -616,7 +619,7 @@ public class TaskImpl implements Task, E
taskAttemptListener, null, 0, conf,
jobToken, credentials, clock, taskHeartbeatHandler,
appContext, mrxModuleClassName, locationHint, taskResource,
- localResources, environment, (failedAttempts>0));
+ localResources, environment, javaOpts, (failedAttempts>0));
}
protected TaskAttempt getSuccessfulAttempt() {
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Tue Apr 30 22:07:43 2013
@@ -330,7 +330,8 @@ public class VertexImpl implements org.a
private VertexLocationHint vertexLocationHint;
private Map<String, LocalResource> localResources;
private Map<String, String> environment;
-
+ private String javaOpts;
+
public VertexImpl(TezVertexID vertexId, String vertexName,
TezConfiguration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
@@ -367,7 +368,9 @@ public class VertexImpl implements org.a
this.processorName = getDAGPlan().getVertexTaskModuleClassName(getName());
this.localResources = getDAGPlan().getVertexLocalResources(getName());
this.environment = getDAGPlan().getVertexEnv(getName());
-
+
+ this.javaOpts = getDAGPlan().getVertexJavaOpts(getName());
+
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
@@ -858,7 +861,8 @@ public class VertexImpl implements org.a
vertex.processorName, false,
locHint, vertex.taskResource,
vertex.localResources,
- vertex.environment);
+ vertex.environment,
+ vertex.javaOpts);
vertex.addTask(task);
LOG.info("Created task for vertex " + vertex.getVertexId() + ": " +
task.getTaskId());
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java Tue Apr 30 22:07:43 2013
@@ -54,8 +54,7 @@ public class AMSchedulerEventTALaunchReq
TezTaskContext remoteTaskContext, TaskAttempt ta,
Credentials credentials, Token<JobTokenIdentifier> jobToken,
String[] hosts, String[] racks, Priority priority,
- Map<String, String> environment,
- TezConfiguration conf) {
+ Map<String, String> environment, TezConfiguration conf) {
super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
this.attemptId = attemptId;
this.capability = capability;
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java Tue Apr 30 22:07:43 2013
@@ -465,7 +465,8 @@ public class TaskSchedulerEventHandler e
// TODO getConf from AMSchedulerEventTALaunchRequest
event.getCredentials(), false, event.getConf(),
taskAttempt.getLocalResources(),
- taskAttempt.getEnvironment()));
+ taskAttempt.getEnvironment(),
+ taskAttempt.getJavaOpts()));
}
sendEvent(new AMContainerEventAssignTA(containerId,
taskAttempt.getID(), event.getRemoteTaskContext()));
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java Tue Apr 30 22:07:43 2013
@@ -38,13 +38,14 @@ public class AMContainerEventLaunchReque
private final TezConfiguration conf;
private final Map<String, LocalResource> localResources;
private final Map<String, String> environment;
+ private final String javaOpts;
public AMContainerEventLaunchRequest(ContainerId containerId,
TezVertexID vertexId,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, boolean shouldProfile, TezConfiguration conf,
Map<String, LocalResource> localResources,
- Map<String, String> environment) {
+ Map<String, String> environment, String javaOpts) {
super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
this.vertexId = vertexId;
this.jobToken = jobToken;
@@ -53,6 +54,7 @@ public class AMContainerEventLaunchReque
this.conf = conf;
this.localResources = localResources;
this.environment = environment;
+ this.javaOpts = javaOpts;
}
public TezDAGID getDAGId() {
@@ -86,4 +88,8 @@ public class AMContainerEventLaunchReque
public Map<String, String> getEnvironment() {
return environment;
}
+
+ public String getJavaOpts() {
+ return javaOpts;
+ }
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java Tue Apr 30 22:07:43 2013
@@ -152,6 +152,7 @@ public class AMContainerHelpers {
Token<JobTokenIdentifier> jobToken,
Resource assignedCapability, Map<String, LocalResource> localResources,
Map<String, String> vertexEnv,
+ String javaOpts,
TaskAttemptListener taskAttemptListener, Credentials credentials,
boolean shouldProfile, AppContext appContext) {
@@ -179,7 +180,7 @@ public class AMContainerHelpers {
// Set up the launch command
List<String> commands = TezEngineChildJVM.getVMCommand(
taskAttemptListener.getAddress(), conf, vertexId, containerId,
- vertexId.getDAGId().getApplicationId(), shouldProfile);
+ vertexId.getDAGId().getApplicationId(), shouldProfile, javaOpts);
// Duplicate the ByteBuffers for access by multiple containers.
Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java Tue Apr 30 22:07:43 2013
@@ -327,6 +327,7 @@ public class AMContainerImpl implements
container.getContainer().getResource(),
event.getLocalResources(),
event.getEnvironment(),
+ event.getJavaOpts(),
container.taskAttemptListener, event.getCredentials(),
event.shouldProfile(), container.appContext);
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java Tue Apr 30 22:07:43 2013
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress;
import java.util.List;
import java.util.Vector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.YarnTezDagChild;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -30,10 +32,13 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
import org.apache.tez.engine.records.TezVertexID;
public class TezEngineChildJVM {
+ private static final Log LOG = LogFactory.getLog(TezEngineChildJVM.class);
+
// FIXME
public static enum LogName {
/** Log on the stdout of the task. */
@@ -71,19 +76,21 @@ public class TezEngineChildJVM {
public static List<String> getVMCommand(
InetSocketAddress taskAttemptListenerAddr, TezConfiguration conf,
TezVertexID vertexId,
- ContainerId containerId, ApplicationId jobID, boolean shouldProfile) {
+ ContainerId containerId, ApplicationId jobID, boolean shouldProfile,
+ String javaOpts) {
Vector<String> vargs = new Vector<String>(9);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
- // Add child (task) java-vm options.
- // FIXME add support for child java opts
-
+ //set custom javaOpts
+ LOG.info("getVMCommand: javaOpts=" + javaOpts);
+ vargs.add(javaOpts);
+
Path childTmpDir = new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
-
+
// FIXME Setup the log4j properties
// Decision to profile needs to be made in the scheduler.
Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1477852&r1=1477851&r2=1477852&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Tue Apr 30 22:07:43 2013
@@ -53,21 +53,10 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DAGConfiguration;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
import org.apache.tez.dag.api.EdgeProperty.SourceType;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.engine.lib.input.ShuffledMergedInput;
import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -108,6 +97,17 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import com.google.common.annotations.VisibleForTesting;
@@ -579,7 +579,10 @@ public class YARNRunner implements Clien
String mapProcessor = "org.apache.tez.mapreduce.processor.map.MapProcessor";
Vertex mapVertex = new Vertex(
MultiStageMRConfigUtil.getInitialMapVertexName(),
- mapProcessor, numMaps);
+ mapProcessor, numMaps);
+
+ // Set java opts example:
+ // mapVertex.setJavaOpts("-DmapperTestArg=val");
// FIXME set up map environment
Map<String, String> mapEnv = new HashMap<String, String>();
@@ -606,7 +609,8 @@ public class YARNRunner implements Clien
LOG.info("XXXX Adding map vertex to DAG"
+ ", vertexName=" + mapVertex.getVertexName()
+ ", processor=" + mapVertex.getProcessorName()
- + ", parrellism=" + mapVertex.getParallelism());
+ + ", parrellism=" + mapVertex.getParallelism()
+ + ", javaOpts=" + mapVertex.getJavaOpts());
dag.addVertex(mapVertex);
Vertex[] intermediateVertices = null;
@@ -646,7 +650,8 @@ public class YARNRunner implements Clien
LOG.info("XXXX Adding reduce vertex to DAG"
+ ", vertexName=" + reduceVertex.getVertexName()
+ ", processor=" + reduceVertex.getProcessorName()
- + ", parrellism=" + reduceVertex.getParallelism());
+ + ", parrellism=" + reduceVertex.getParallelism()
+ + ", javaOpts=" + reduceVertex.getJavaOpts());
dag.addVertex(reduceVertex);
EdgeProperty edgeProperty =
@@ -676,7 +681,6 @@ public class YARNRunner implements Clien
reduceVertex, edgeProperty);
dag.addEdge(finalEdge);
}
-
}
return dag;