You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/22 20:31:06 UTC
svn commit: r1470655 - in /incubator/tez/branches/TEZ-1:
tez-common/src/main/java/org/apache/tez/common/
tez-dag-api/src/main/java/org/apache/tez/dag/api/
tez-dag/src/main/java/org/apache/tez/dag/app/dag/
tez-dag/src/main/java/org/apache/tez/dag/app/da...
Author: sseth
Date: Mon Apr 22 18:31:05 2013
New Revision: 1470655
URL: http://svn.apache.org/r1470655
Log:
TEZ-56. Remove MR references rom DAG, Vertex, Task, TaskAttempt. (sseth)
Added:
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
Modified:
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
Modified: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java (original)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java Mon Apr 22 18:31:05 2013
@@ -250,4 +250,10 @@ public class TezJobConfig {
public static final String TEZ_ENGINE_INPUT_BUFFER_PERCENT =
"tez.engine.task.input.buffer.percent";
public static final float DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT = 0.0f;
+
+ // TODO This should be in DAGConfiguration
+ /* config for tracking the local file where all the credentials for the job
+ * credentials.
+ */
+ public static final String DAG_CREDENTIALS_BINARY = "tez.dag.credentials.binary";
}
Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java Mon Apr 22 18:31:05 2013
@@ -94,6 +94,9 @@ public class DAGConfiguration extends Co
public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT = DAG_AM + "task.listener.thread-count";
public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
+ public static final String DAG_AM_STAGING_DIR = DAG_AM + "staging-dir";
+ public static final String DAG_AM_STAGING_DIR_DEFAULT = "/tmp/hadoop-yarn/staging";
+
@Private
public void setEdgeProperties(List<Edge> edges) {
String[] edgeIds = new String[edges.size()];
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java Mon Apr 22 18:31:05 2013
@@ -23,9 +23,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.engine.records.TezDAGID;
@@ -69,7 +68,7 @@ public interface DAG {
/**
* @return the ACLs for this job for each type of JobACL given.
*/
- Map<JobACL, AccessControlList> getJobACLs();
+ Map<ApplicationAccessType, String> getJobACLs();
/**
* @return information for MR AppMasters (previously failed and current)
@@ -77,7 +76,7 @@ public interface DAG {
// TODO Recovery
//List<AMInfo> getAMInfos();
- boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
+ boolean checkAccess(UserGroupInformation callerUGI, ApplicationAccessType jobOperation);
VertexLocationHint getVertexLocationHint(TezVertexID vertexId);
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java Mon Apr 22 18:31:05 2013
@@ -37,22 +37,16 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobACLsManager;
-import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
-import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
@@ -81,8 +75,10 @@ import org.apache.tez.dag.app.dag.event.
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.utils.DAGApps;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.common.security.TokenCache;
import org.apache.tez.engine.records.TezDAGID;
import org.apache.tez.engine.records.TezTaskAttemptID;
import org.apache.tez.engine.records.TezVertexID;
@@ -101,9 +97,8 @@ public class DAGImpl implements org.apac
private final ApplicationAttemptId applicationAttemptId;
private final TezDAGID dagId;
private final Clock clock;
- private final JobACLsManager aclsManager;
+ private final ApplicationACLsManager aclsManager;
private final String username;
- private final Map<JobACL, AccessControlList> jobACLs;
// TODO Recovery
//private final List<AMInfo> amInfos;
@@ -332,7 +327,7 @@ public class DAGImpl implements org.apac
DAGLocationHint dagLocationHint) {
this.applicationAttemptId = applicationAttemptId;
this.dagId = dagId;
- this.dagName = conf.get(JobContext.JOB_NAME, "<missing job name>");
+ this.dagName = conf.get(DAGConfiguration.JOB_NAME, "<missing job name>");
this.conf = new DAGConfiguration(conf);
// TODO Metrics
//this.metrics = metrics;
@@ -354,9 +349,10 @@ public class DAGImpl implements org.apac
this.fsTokens = fsTokenCredentials;
this.jobTokenSecretManager = jobTokenSecretManager;
- this.aclsManager = new JobACLsManager(conf);
+ this.aclsManager = new ApplicationACLsManager(conf);
this.username = System.getProperty("user.name");
- this.jobACLs = aclsManager.constructJobACLs(conf);
+ // TODO Construct ApplicationACLs
+ // this.appACLs;
this.dagLocationHint = dagLocationHint;
@@ -385,12 +381,9 @@ public class DAGImpl implements org.apac
@Override
public boolean checkAccess(UserGroupInformation callerUGI,
- JobACL jobOperation) {
- AccessControlList jobACL = jobACLs.get(jobOperation);
- if (jobACL == null) {
- return true;
- }
- return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
+ ApplicationAccessType jobOperation) {
+ return aclsManager.checkAccess(callerUGI, jobOperation, userName,
+ this.dagId.getApplicationId());
}
@Override
@@ -744,8 +737,9 @@ public class DAGImpl implements org.apac
* @see org.apache.hadoop.mapreduce.v2.app2.job.Job#getJobACLs()
*/
@Override
- public Map<JobACL, AccessControlList> getJobACLs() {
- return Collections.unmodifiableMap(jobACLs);
+ public Map<ApplicationAccessType, String> getJobACLs() {
+ // TODO ApplicationACLs
+ return null;
}
// TODO Recovery
@@ -756,34 +750,34 @@ public class DAGImpl implements org.apac
}
*/
- /**
- * ChainMapper and ChainReducer must execute in parallel, so they're not
- * compatible with uberization/LocalContainerLauncher (100% sequential).
- */
- private boolean isChainJob(Configuration conf) {
- boolean isChainJob = false;
- try {
- String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
- if (mapClassName != null) {
- Class<?> mapClass = Class.forName(mapClassName);
- if (ChainMapper.class.isAssignableFrom(mapClass))
- isChainJob = true;
- }
- } catch (ClassNotFoundException cnfe) {
- // don't care; assume it's not derived from ChainMapper
- }
- try {
- String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
- if (reduceClassName != null) {
- Class<?> reduceClass = Class.forName(reduceClassName);
- if (ChainReducer.class.isAssignableFrom(reduceClass))
- isChainJob = true;
- }
- } catch (ClassNotFoundException cnfe) {
- // don't care; assume it's not derived from ChainReducer
- }
- return isChainJob;
- }
+// /**
+// * ChainMapper and ChainReducer must execute in parallel, so they're not
+// * compatible with uberization/LocalContainerLauncher (100% sequential).
+// */
+// private boolean isChainJob(Configuration conf) {
+// boolean isChainJob = false;
+// try {
+// String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
+// if (mapClassName != null) {
+// Class<?> mapClass = Class.forName(mapClassName);
+// if (ChainMapper.class.isAssignableFrom(mapClass))
+// isChainJob = true;
+// }
+// } catch (ClassNotFoundException cnfe) {
+// // don't care; assume it's not derived from ChainMapper
+// }
+// try {
+// String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
+// if (reduceClassName != null) {
+// Class<?> reduceClass = Class.forName(reduceClassName);
+// if (ChainReducer.class.isAssignableFrom(reduceClass))
+// isChainJob = true;
+// }
+// } catch (ClassNotFoundException cnfe) {
+// // don't care; assume it's not derived from ChainReducer
+// }
+// return isChainJob;
+// }
/*
private int getBlockSize() {
@@ -913,7 +907,7 @@ public class DAGImpl implements org.apac
String user =
UserGroupInformation.getCurrentUser().getShortUserName();
- Path path = MRApps.getStagingAreaDir(job.conf, user);
+ Path path = DAGApps.getStagingAreaDir(job.conf, user);
if(LOG.isDebugEnabled()) {
LOG.debug("startJobs: parent=" + path + " child=" + dagIdString);
}
@@ -940,6 +934,7 @@ public class DAGImpl implements org.apac
// Upload the jobTokens onto the remote FS so that ContainerManager can
// localize it to be used by the Containers(tasks)
Credentials tokenStorage = new Credentials();
+ // TODO Consider sending the jobToken over RPC.
TokenCache.setJobToken(job.jobToken, tokenStorage);
if (UserGroupInformation.isSecurityEnabled()) {
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java Mon Apr 22 18:31:05 2013
@@ -32,11 +32,6 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TezMRTypeConverter;
-import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@@ -59,8 +54,6 @@ import org.apache.hadoop.yarn.util.Recor
import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezTaskContext;
import org.apache.tez.common.counters.DAGCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAGConfiguration;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -96,7 +89,6 @@ import org.apache.tez.dag.app.dag.event.
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.speculate.SpeculatorEvent;
-import org.apache.tez.dag.app.taskclean.TaskCleanupEvent;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.records.TezTaskAttemptID;
import org.apache.tez.engine.records.TezTaskID;
@@ -124,7 +116,6 @@ public class TaskAttemptImpl implements
private final TezTaskAttemptID attemptId;
private final Clock clock;
// private final TaskAttemptListener taskAttemptListener;
- private final OutputCommitter committer;
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
@@ -261,7 +252,7 @@ public class TaskAttemptImpl implements
@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
TaskAttemptListener tal, Path jobFile, int partition,
- DAGConfiguration conf, OutputCommitter committer,
+ DAGConfiguration conf,
Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
String mrxModuleClassName, TaskLocationHint locationHint,
@@ -276,7 +267,6 @@ public class TaskAttemptImpl implements
this.jobFile = jobFile;
this.partition = partition;
this.conf = conf;
- this.committer = committer;
this.jobToken = jobToken;
this.credentials = credentials;
this.clock = clock;
@@ -703,64 +693,64 @@ public class TaskAttemptImpl implements
}
*/
- private WrappedProgressSplitsBlock getProgressSplitBlock() {
- return null;
- // TODO
- /*
- readLock.lock();
- try {
- if (progressSplitBlock == null) {
- progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
- MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
- MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
- }
- return progressSplitBlock;
- } finally {
- readLock.unlock();
- }
- */
- }
+// private WrappedProgressSplitsBlock getProgressSplitBlock() {
+// return null;
+// // TODO
+// /*
+// readLock.lock();
+// try {
+// if (progressSplitBlock == null) {
+// progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
+// MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
+// MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
+// }
+// return progressSplitBlock;
+// } finally {
+// readLock.unlock();
+// }
+// */
+// }
private void updateProgressSplits() {
- double newProgress = reportedStatus.progress;
- newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
- TezCounters counters = reportedStatus.counters;
- if (counters == null)
- return;
-
- WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
- if (splitsBlock != null) {
- long now = clock.getTime();
- long start = getLaunchTime();
-
- if (start == 0)
- return;
-
- if (start != 0 && now - start <= Integer.MAX_VALUE) {
- splitsBlock.getProgressWallclockTime().extend(newProgress,
- (int) (now - start));
- }
-
- TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
- if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
- splitsBlock.getProgressCPUTime().extend(newProgress,
- (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
- }
-
- TezCounter virtualBytes = counters
- .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
- if (virtualBytes != null) {
- splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
- (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
- }
-
- TezCounter physicalBytes = counters
- .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
- if (physicalBytes != null) {
- splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
- (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
- }
- }
+// double newProgress = reportedStatus.progress;
+// newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
+// TezCounters counters = reportedStatus.counters;
+// if (counters == null)
+// return;
+//
+// WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
+// if (splitsBlock != null) {
+// long now = clock.getTime();
+// long start = getLaunchTime();
+//
+// if (start == 0)
+// return;
+//
+// if (start != 0 && now - start <= Integer.MAX_VALUE) {
+// splitsBlock.getProgressWallclockTime().extend(newProgress,
+// (int) (now - start));
+// }
+//
+// TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
+// if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
+// splitsBlock.getProgressCPUTime().extend(newProgress,
+// (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
+// }
+//
+// TezCounter virtualBytes = counters
+// .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
+// if (virtualBytes != null) {
+// splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
+// (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+// }
+//
+// TezCounter physicalBytes = counters
+// .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
+// if (physicalBytes != null) {
+// splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
+// (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+// }
+// }
}
private void maybeSendSpeculatorContainerRequired() {
@@ -778,10 +768,10 @@ public class TaskAttemptImpl implements
}
private void sendTaskAttemptCleanupEvent() {
- TaskAttemptContext taContext =
- new TaskAttemptContextImpl(this.conf,
- TezMRTypeConverter.fromTez(this.attemptId));
- sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
+// TaskAttemptContext taContext =
+// new TaskAttemptContextImpl(this.conf,
+// TezMRTypeConverter.fromTez(this.attemptId));
+// sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
}
protected String[] resolveHosts(String[] src) {
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java Mon Apr 22 18:31:05 2013
@@ -29,7 +29,6 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.Clock;
@@ -71,7 +70,6 @@ import org.apache.tez.engine.records.Tez
import org.apache.tez.engine.records.TezTaskAttemptID;
import org.apache.tez.engine.records.TezTaskID;
import org.apache.tez.engine.records.TezVertexID;
-import org.apache.tez.mapreduce.task.InitialTaskWithInMemSort;
import com.google.common.annotations.VisibleForTesting;
@@ -85,7 +83,6 @@ public class TaskImpl implements Task, E
protected final DAGConfiguration conf;
protected final Path jobFile;
- protected final OutputCommitter committer;
protected final int partition;
protected final TaskAttemptListener taskAttemptListener;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
@@ -268,7 +265,7 @@ public class TaskImpl implements Task, E
public TaskImpl(TezVertexID vertexId, int partition,
EventHandler eventHandler, Path remoteJobConfFile, DAGConfiguration conf,
- TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+ TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
// TODO Recovery
@@ -295,7 +292,6 @@ public class TaskImpl implements Task, E
this.taskAttemptListener = taskAttemptListener;
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
- this.committer = committer;
this.credentials = credentials;
this.jobToken = jobToken;
// TODO Metrics
@@ -597,7 +593,7 @@ public class TaskImpl implements Task, E
@Override
public boolean needsWaitAfterOutputConsumable() {
- if (mrxModuleClassName.equals(InitialTaskWithInMemSort.class.getName())) {
+ if (mrxModuleClassName.contains("InitialTaskWithInMemSort")) {
return true;
} else {
return false;
@@ -617,7 +613,7 @@ public class TaskImpl implements Task, E
TaskAttemptImpl createAttempt(int attemptNumber) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
- taskAttemptListener, null, 0, conf, committer,
+ taskAttemptListener, null, 0, conf,
jobToken, credentials, clock, taskHeartbeatHandler,
appContext, mrxModuleClassName, locationHint, taskResource,
localResources, environment, (failedAttempts>0));
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Mon Apr 22 18:31:05 2013
@@ -842,7 +842,6 @@ public class VertexImpl implements org.a
null,
conf,
vertex.taskAttemptListener,
- null,
vertex.jobToken,
vertex.fsTokens,
vertex.clock,
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java?rev=1470655&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java Mon Apr 22 18:31:05 2013
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.DAGConfiguration;
+
+public class DAGApps {
+
+ private static final String STAGING_CONSTANT = ".staging";
+ public static Path getStagingAreaDir(Configuration conf, String user) {
+ return new Path(conf.get(DAGConfiguration.DAG_AM_STAGING_DIR,
+ DAGConfiguration.DAG_AM_STAGING_DIR_DEFAULT)
+ + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
+ }
+
+}
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java?rev=1470655&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java Mon Apr 22 18:31:05 2013
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@Private
+@Unstable
+public class Master {
+
+ public enum State {
+ INITIALIZING, RUNNING;
+ }
+
+ public static String getMasterUserName(Configuration conf) {
+ return conf.get(YarnConfiguration.RM_PRINCIPAL);
+ }
+
+ public static InetSocketAddress getMasterAddress(Configuration conf) {
+ return conf
+ .getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ }
+
+ public static String getMasterPrincipal(Configuration conf)
+ throws IOException {
+ String masterHostname = getMasterAddress(conf).getHostName();
+ // get kerberos principal for use as delegation token renewer
+ return SecurityUtil.getServerPrincipal(getMasterUserName(conf),
+ masterHostname);
+ }
+
+}
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java?rev=1470655&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java Mon Apr 22 18:31:05 2013
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.tez.common.TezJobConfig;
+
+
+/**
+ * This class provides user facing APIs for transferring secrets from
+ * the job client to the tasks.
+ * The secrets can be stored just before submission of jobs and read during
+ * the task execution.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TokenCache {
+
+ private static final Log LOG = LogFactory.getLog(TokenCache.class);
+
+
+ /**
+ * auxiliary method to get user's secret keys..
+ * @param alias
+ * @return secret key from the storage
+ */
+ public static byte[] getSecretKey(Credentials credentials, Text alias) {
+ if(credentials == null)
+ return null;
+ return credentials.getSecretKey(alias);
+ }
+
+ /**
+ * Convenience method to obtain delegation tokens from namenodes
+ * corresponding to the paths passed.
+ * @param credentials
+ * @param ps array of paths
+ * @param conf configuration
+ * @throws IOException
+ */
+ public static void obtainTokensForNamenodes(Credentials credentials,
+ Path[] ps, Configuration conf) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ obtainTokensForNamenodesInternal(credentials, ps, conf);
+ }
+
+ /**
+ * Remove jobtoken referrals which don't make sense in the context
+ * of the task execution.
+ *
+ * @param conf
+ */
+ public static void cleanUpTokenReferral(Configuration conf) {
+ conf.unset(TezJobConfig.DAG_CREDENTIALS_BINARY);
+ }
+
+ static void obtainTokensForNamenodesInternal(Credentials credentials,
+ Path[] ps, Configuration conf) throws IOException {
+ Set<FileSystem> fsSet = new HashSet<FileSystem>();
+ for(Path p: ps) {
+ fsSet.add(p.getFileSystem(conf));
+ }
+ for (FileSystem fs : fsSet) {
+ obtainTokensForNamenodesInternal(fs, credentials, conf);
+ }
+ }
+
+ /**
+ * get delegation token for a specific FS
+ * @param fs
+ * @param credentials
+ * @param p
+ * @param conf
+ * @throws IOException
+ */
+ static void obtainTokensForNamenodesInternal(FileSystem fs,
+ Credentials credentials, Configuration conf) throws IOException {
+ String delegTokenRenewer = Master.getMasterPrincipal(conf);
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ throw new IOException(
+ "Can't get Master Kerberos principal for use as renewer");
+ }
+ mergeBinaryTokens(credentials, conf);
+
+ final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
+ credentials);
+ if (tokens != null) {
+ for (Token<?> token : tokens) {
+ LOG.info("Got dt for " + fs.getUri() + "; "+token);
+ }
+ }
+ }
+
+ private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
+ String binaryTokenFilename =
+ conf.get(TezJobConfig.DAG_CREDENTIALS_BINARY);
+ if (binaryTokenFilename != null) {
+ Credentials binary;
+ try {
+ binary = Credentials.readTokenStorageFile(
+ new Path("file:///" + binaryTokenFilename), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ // supplement existing tokens with the tokens in the binary file
+ creds.mergeAll(binary);
+ }
+ }
+
+ /**
+ * file name used on HDFS for generated job token
+ */
+ @InterfaceAudience.Private
+ public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+ /**
+ * conf setting for job tokens cache file name
+ */
+ @InterfaceAudience.Private
+ public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
+ private static final Text JOB_TOKEN = new Text("JobToken");
+ private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
+
+ /**
+ * load job token from a file
+ * @param conf
+ * @throws IOException
+ */
+ @InterfaceAudience.Private
+ public static Credentials loadTokens(String jobTokenFile, Configuration conf)
+ throws IOException {
+ Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
+
+ Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Task: Loaded jobTokenFile from: "+
+ localJobTokenFile.toUri().getPath()
+ +"; num of sec keys = " + ts.numberOfSecretKeys() +
+ " Number of tokens " + ts.numberOfTokens());
+ }
+ return ts;
+ }
+ /**
+ * store job token
+ * @param t
+ */
+ @InterfaceAudience.Private
+ public static void setJobToken(Token<? extends TokenIdentifier> t,
+ Credentials credentials) {
+ credentials.addToken(JOB_TOKEN, t);
+ }
+ /**
+ *
+ * @return job token
+ */
+ @SuppressWarnings("unchecked")
+ @InterfaceAudience.Private
+ public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
+ return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
+ }
+
+ @InterfaceAudience.Private
+ public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
+ credentials.addSecretKey(SHUFFLE_TOKEN, key);
+ }
+
+ @InterfaceAudience.Private
+ public static byte[] getShuffleSecretKey(Credentials credentials) {
+ return getSecretKey(credentials, SHUFFLE_TOKEN);
+ }
+}
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Mon Apr 22 18:31:05 2013
@@ -114,6 +114,8 @@ public class DeprecatedKeys {
_(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
_(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+
+ _(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
}
private static void _(String oldKey, String newKey) {