You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/22 20:31:06 UTC

svn commit: r1470655 - in /incubator/tez/branches/TEZ-1: tez-common/src/main/java/org/apache/tez/common/ tez-dag-api/src/main/java/org/apache/tez/dag/api/ tez-dag/src/main/java/org/apache/tez/dag/app/dag/ tez-dag/src/main/java/org/apache/tez/dag/app/da...

Author: sseth
Date: Mon Apr 22 18:31:05 2013
New Revision: 1470655

URL: http://svn.apache.org/r1470655
Log:
TEZ-56. Remove MR references rom DAG, Vertex, Task, TaskAttempt. (sseth)

Added:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
Modified:
    incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java

Modified: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java (original)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java Mon Apr 22 18:31:05 2013
@@ -250,4 +250,10 @@ public class TezJobConfig {
   public static final String TEZ_ENGINE_INPUT_BUFFER_PERCENT = 
       "tez.engine.task.input.buffer.percent";
   public static final float DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT = 0.0f;
+  
+  // TODO This should be in DAGConfiguration
+  /* config for tracking the local file where all the credentials for the job
+   * credentials.
+   */
+  public static final String DAG_CREDENTIALS_BINARY =  "tez.dag.credentials.binary";
 }

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java Mon Apr 22 18:31:05 2013
@@ -94,6 +94,9 @@ public class DAGConfiguration extends Co
   public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT = DAG_AM + "task.listener.thread-count";
   public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
   
+  public static final String DAG_AM_STAGING_DIR = DAG_AM + "staging-dir";
+  public static final String DAG_AM_STAGING_DIR_DEFAULT = "/tmp/hadoop-yarn/staging";
+  
   @Private
   public void setEdgeProperties(List<Edge> edges) {
     String[] edgeIds = new String[edges.size()];

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java Mon Apr 22 18:31:05 2013
@@ -23,9 +23,8 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.engine.records.TezDAGID;
@@ -69,7 +68,7 @@ public interface DAG {
   /**
    * @return the ACLs for this job for each type of JobACL given. 
    */
-  Map<JobACL, AccessControlList> getJobACLs();
+  Map<ApplicationAccessType, String> getJobACLs();
 
   /**
    * @return information for MR AppMasters (previously failed and current)
@@ -77,7 +76,7 @@ public interface DAG {
   // TODO Recovery
   //List<AMInfo> getAMInfos();
   
-  boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
+  boolean checkAccess(UserGroupInformation callerUGI, ApplicationAccessType jobOperation);
 
   VertexLocationHint getVertexLocationHint(TezVertexID vertexId);
 }

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java Mon Apr 22 18:31:05 2013
@@ -37,22 +37,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobACLsManager;
-import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
-import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
@@ -81,8 +75,10 @@ import org.apache.tez.dag.app.dag.event.
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.utils.DAGApps;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.records.TezDAGID;
 import org.apache.tez.engine.records.TezTaskAttemptID;
 import org.apache.tez.engine.records.TezVertexID;
@@ -101,9 +97,8 @@ public class DAGImpl implements org.apac
   private final ApplicationAttemptId applicationAttemptId;
   private final TezDAGID dagId;
   private final Clock clock;
-  private final JobACLsManager aclsManager;
+  private final ApplicationACLsManager aclsManager;
   private final String username;
-  private final Map<JobACL, AccessControlList> jobACLs;
 
   // TODO Recovery
   //private final List<AMInfo> amInfos;
@@ -332,7 +327,7 @@ public class DAGImpl implements org.apac
       DAGLocationHint dagLocationHint) {
     this.applicationAttemptId = applicationAttemptId;
     this.dagId = dagId;
-    this.dagName = conf.get(JobContext.JOB_NAME, "<missing job name>");
+    this.dagName = conf.get(DAGConfiguration.JOB_NAME, "<missing job name>");
     this.conf = new DAGConfiguration(conf);
     // TODO Metrics
     //this.metrics = metrics;
@@ -354,9 +349,10 @@ public class DAGImpl implements org.apac
     this.fsTokens = fsTokenCredentials;
     this.jobTokenSecretManager = jobTokenSecretManager;
 
-    this.aclsManager = new JobACLsManager(conf);
+    this.aclsManager = new ApplicationACLsManager(conf);
     this.username = System.getProperty("user.name");
-    this.jobACLs = aclsManager.constructJobACLs(conf);
+    // TODO Construct ApplicationACLs
+    //      this.appACLs;
 
     this.dagLocationHint = dagLocationHint;
 
@@ -385,12 +381,9 @@ public class DAGImpl implements org.apac
 
   @Override
   public boolean checkAccess(UserGroupInformation callerUGI,
-      JobACL jobOperation) {
-    AccessControlList jobACL = jobACLs.get(jobOperation);
-    if (jobACL == null) {
-      return true;
-    }
-    return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
+      ApplicationAccessType jobOperation) {
+    return aclsManager.checkAccess(callerUGI, jobOperation, userName,
+        this.dagId.getApplicationId());
   }
 
   @Override
@@ -744,8 +737,9 @@ public class DAGImpl implements org.apac
    * @see org.apache.hadoop.mapreduce.v2.app2.job.Job#getJobACLs()
    */
   @Override
-  public Map<JobACL, AccessControlList> getJobACLs() {
-    return Collections.unmodifiableMap(jobACLs);
+  public Map<ApplicationAccessType, String> getJobACLs() {
+    // TODO ApplicationACLs
+    return null;
   }
 
   // TODO Recovery
@@ -756,34 +750,34 @@ public class DAGImpl implements org.apac
   }
   */
 
-  /**
-   * ChainMapper and ChainReducer must execute in parallel, so they're not
-   * compatible with uberization/LocalContainerLauncher (100% sequential).
-   */
-  private boolean isChainJob(Configuration conf) {
-    boolean isChainJob = false;
-    try {
-      String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
-      if (mapClassName != null) {
-        Class<?> mapClass = Class.forName(mapClassName);
-        if (ChainMapper.class.isAssignableFrom(mapClass))
-          isChainJob = true;
-      }
-    } catch (ClassNotFoundException cnfe) {
-      // don't care; assume it's not derived from ChainMapper
-    }
-    try {
-      String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
-      if (reduceClassName != null) {
-        Class<?> reduceClass = Class.forName(reduceClassName);
-        if (ChainReducer.class.isAssignableFrom(reduceClass))
-          isChainJob = true;
-      }
-    } catch (ClassNotFoundException cnfe) {
-      // don't care; assume it's not derived from ChainReducer
-    }
-    return isChainJob;
-  }
+//  /**
+//   * ChainMapper and ChainReducer must execute in parallel, so they're not
+//   * compatible with uberization/LocalContainerLauncher (100% sequential).
+//   */
+//  private boolean isChainJob(Configuration conf) {
+//    boolean isChainJob = false;
+//    try {
+//      String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
+//      if (mapClassName != null) {
+//        Class<?> mapClass = Class.forName(mapClassName);
+//        if (ChainMapper.class.isAssignableFrom(mapClass))
+//          isChainJob = true;
+//      }
+//    } catch (ClassNotFoundException cnfe) {
+//      // don't care; assume it's not derived from ChainMapper
+//    }
+//    try {
+//      String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
+//      if (reduceClassName != null) {
+//        Class<?> reduceClass = Class.forName(reduceClassName);
+//        if (ChainReducer.class.isAssignableFrom(reduceClass))
+//          isChainJob = true;
+//      }
+//    } catch (ClassNotFoundException cnfe) {
+//      // don't care; assume it's not derived from ChainReducer
+//    }
+//    return isChainJob;
+//  }
 
   /*
   private int getBlockSize() {
@@ -913,7 +907,7 @@ public class DAGImpl implements org.apac
       
       String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
-      Path path = MRApps.getStagingAreaDir(job.conf, user);
+      Path path = DAGApps.getStagingAreaDir(job.conf, user);
       if(LOG.isDebugEnabled()) {
         LOG.debug("startJobs: parent=" + path + " child=" + dagIdString);
       }
@@ -940,6 +934,7 @@ public class DAGImpl implements org.apac
       // Upload the jobTokens onto the remote FS so that ContainerManager can
       // localize it to be used by the Containers(tasks)
       Credentials tokenStorage = new Credentials();
+      // TODO Consider sending the jobToken over RPC.
       TokenCache.setJobToken(job.jobToken, tokenStorage);
 
       if (UserGroupInformation.isSecurityEnabled()) {

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java Mon Apr 22 18:31:05 2013
@@ -32,11 +32,6 @@ import java.util.concurrent.locks.Reentr
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TezMRTypeConverter;
-import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -59,8 +54,6 @@ import org.apache.hadoop.yarn.util.Recor
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.common.counters.DAGCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAGConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -96,7 +89,6 @@ import org.apache.tez.dag.app.dag.event.
 import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.app.speculate.SpeculatorEvent;
-import org.apache.tez.dag.app.taskclean.TaskCleanupEvent;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezTaskAttemptID;
 import org.apache.tez.engine.records.TezTaskID;
@@ -124,7 +116,6 @@ public class TaskAttemptImpl implements 
   private final TezTaskAttemptID attemptId;
   private final Clock clock;
 //  private final TaskAttemptListener taskAttemptListener;
-  private final OutputCommitter committer;
   private final List<String> diagnostics = new ArrayList<String>();
   private final Lock readLock;
   private final Lock writeLock;
@@ -261,7 +252,7 @@ public class TaskAttemptImpl implements 
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
       TaskAttemptListener tal, Path jobFile, int partition, 
-      DAGConfiguration conf, OutputCommitter committer,
+      DAGConfiguration conf,
       Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       String mrxModuleClassName, TaskLocationHint locationHint,
@@ -276,7 +267,6 @@ public class TaskAttemptImpl implements 
     this.jobFile = jobFile;
     this.partition = partition;
     this.conf = conf;
-    this.committer = committer;
     this.jobToken = jobToken;
     this.credentials = credentials;
     this.clock = clock;
@@ -703,64 +693,64 @@ public class TaskAttemptImpl implements 
   }
   */
   
-  private WrappedProgressSplitsBlock getProgressSplitBlock() {
-    return null;
-    // TODO
-    /*
-    readLock.lock();
-    try {
-      if (progressSplitBlock == null) {
-        progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
-            MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
-            MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
-      }
-      return progressSplitBlock;
-    } finally {
-      readLock.unlock();
-    }
-    */
-  }
+//  private WrappedProgressSplitsBlock getProgressSplitBlock() {
+//    return null;
+//    // TODO
+//    /*
+//    readLock.lock();
+//    try {
+//      if (progressSplitBlock == null) {
+//        progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
+//            MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
+//            MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
+//      }
+//      return progressSplitBlock;
+//    } finally {
+//      readLock.unlock();
+//    }
+//    */
+//  }
   
   private void updateProgressSplits() {
-    double newProgress = reportedStatus.progress;
-    newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
-    TezCounters counters = reportedStatus.counters;
-    if (counters == null)
-      return;
-
-    WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
-    if (splitsBlock != null) {
-      long now = clock.getTime();
-      long start = getLaunchTime();
-      
-      if (start == 0)
-        return;
-
-      if (start != 0 && now - start <= Integer.MAX_VALUE) {
-        splitsBlock.getProgressWallclockTime().extend(newProgress,
-            (int) (now - start));
-      }
-
-      TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
-      if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
-        splitsBlock.getProgressCPUTime().extend(newProgress,
-            (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
-      }
-
-      TezCounter virtualBytes = counters
-        .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
-      if (virtualBytes != null) {
-        splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
-            (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
-      }
-
-      TezCounter physicalBytes = counters
-        .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
-      if (physicalBytes != null) {
-        splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
-            (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
-      }
-    }
+//    double newProgress = reportedStatus.progress;
+//    newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
+//    TezCounters counters = reportedStatus.counters;
+//    if (counters == null)
+//      return;
+//
+//    WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
+//    if (splitsBlock != null) {
+//      long now = clock.getTime();
+//      long start = getLaunchTime();
+//      
+//      if (start == 0)
+//        return;
+//
+//      if (start != 0 && now - start <= Integer.MAX_VALUE) {
+//        splitsBlock.getProgressWallclockTime().extend(newProgress,
+//            (int) (now - start));
+//      }
+//
+//      TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
+//      if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
+//        splitsBlock.getProgressCPUTime().extend(newProgress,
+//            (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
+//      }
+//
+//      TezCounter virtualBytes = counters
+//        .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
+//      if (virtualBytes != null) {
+//        splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
+//            (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+//      }
+//
+//      TezCounter physicalBytes = counters
+//        .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
+//      if (physicalBytes != null) {
+//        splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
+//            (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+//      }
+//    }
   }
   
   private void maybeSendSpeculatorContainerRequired() {
@@ -778,10 +768,10 @@ public class TaskAttemptImpl implements 
   }
   
   private void sendTaskAttemptCleanupEvent() {
-    TaskAttemptContext taContext = 
-        new TaskAttemptContextImpl(this.conf, 
-            TezMRTypeConverter.fromTez(this.attemptId));
-    sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
+//    TaskAttemptContext taContext = 
+//        new TaskAttemptContextImpl(this.conf, 
+//            TezMRTypeConverter.fromTez(this.attemptId));
+//    sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
   }
 
   protected String[] resolveHosts(String[] src) {

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java Mon Apr 22 18:31:05 2013
@@ -29,7 +29,6 @@ import java.util.concurrent.locks.Reentr
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.Clock;
@@ -71,7 +70,6 @@ import org.apache.tez.engine.records.Tez
 import org.apache.tez.engine.records.TezTaskAttemptID;
 import org.apache.tez.engine.records.TezTaskID;
 import org.apache.tez.engine.records.TezVertexID;
-import org.apache.tez.mapreduce.task.InitialTaskWithInMemSort;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -85,7 +83,6 @@ public class TaskImpl implements Task, E
 
   protected final DAGConfiguration conf;
   protected final Path jobFile;
-  protected final OutputCommitter committer;
   protected final int partition;
   protected final TaskAttemptListener taskAttemptListener;
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
@@ -268,7 +265,7 @@ public class TaskImpl implements Task, E
 
   public TaskImpl(TezVertexID vertexId, int partition,
       EventHandler eventHandler, Path remoteJobConfFile, DAGConfiguration conf,
-      TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+      TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
       // TODO Recovery
@@ -295,7 +292,6 @@ public class TaskImpl implements Task, E
     this.taskAttemptListener = taskAttemptListener;
     this.taskHeartbeatHandler = thh;
     this.eventHandler = eventHandler;
-    this.committer = committer;
     this.credentials = credentials;
     this.jobToken = jobToken;
     // TODO Metrics
@@ -597,7 +593,7 @@ public class TaskImpl implements Task, E
 
   @Override
   public boolean needsWaitAfterOutputConsumable() {
-    if (mrxModuleClassName.equals(InitialTaskWithInMemSort.class.getName())) {
+    if (mrxModuleClassName.contains("InitialTaskWithInMemSort")) {
       return true;
     } else {
       return false;
@@ -617,7 +613,7 @@ public class TaskImpl implements Task, E
 
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
-        taskAttemptListener, null, 0, conf, committer,
+        taskAttemptListener, null, 0, conf,
         jobToken, credentials, clock, taskHeartbeatHandler,
         appContext, mrxModuleClassName, locationHint, taskResource,
         localResources, environment, (failedAttempts>0));

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Mon Apr 22 18:31:05 2013
@@ -842,7 +842,6 @@ public class VertexImpl implements org.a
                 null,
                 conf,
                 vertex.taskAttemptListener,
-                null,
                 vertex.jobToken,
                 vertex.fsTokens,
                 vertex.clock,

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java?rev=1470655&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java Mon Apr 22 18:31:05 2013
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.DAGConfiguration;
+
+public class DAGApps {
+
+  private static final String STAGING_CONSTANT = ".staging";
+  public static Path getStagingAreaDir(Configuration conf, String user) {
+    return new Path(conf.get(DAGConfiguration.DAG_AM_STAGING_DIR,
+        DAGConfiguration.DAG_AM_STAGING_DIR_DEFAULT)
+        + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
+  }
+  
+}

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java?rev=1470655&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/Master.java Mon Apr 22 18:31:05 2013
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@Private
+@Unstable
+public class Master {
+
+  public enum State {
+    INITIALIZING, RUNNING;
+  }
+
+  public static String getMasterUserName(Configuration conf) {
+    return conf.get(YarnConfiguration.RM_PRINCIPAL);
+  }
+
+  public static InetSocketAddress getMasterAddress(Configuration conf) {
+    return conf
+        .getSocketAddr(YarnConfiguration.RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
+  public static String getMasterPrincipal(Configuration conf)
+      throws IOException {
+    String masterHostname = getMasterAddress(conf).getHostName();
+    // get kerberos principal for use as delegation token renewer
+    return SecurityUtil.getServerPrincipal(getMasterUserName(conf),
+        masterHostname);
+  }
+
+}

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java?rev=1470655&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/TokenCache.java Mon Apr 22 18:31:05 2013
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.tez.common.TezJobConfig;
+
+
+/**
+ * This class provides user facing APIs for transferring secrets from
+ * the job client to the tasks.
+ * The secrets can be stored just before submission of jobs and read during
+ * the task execution.  
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TokenCache {
+  
+  private static final Log LOG = LogFactory.getLog(TokenCache.class);
+
+  
+  /**
+   * auxiliary method to get user's secret keys..
+   * @param alias
+   * @return secret key from the storage
+   */
+  public static byte[] getSecretKey(Credentials credentials, Text alias) {
+    if(credentials == null)
+      return null;
+    return credentials.getSecretKey(alias);
+  }
+  
+  /**
+   * Convenience method to obtain delegation tokens from namenodes 
+   * corresponding to the paths passed.
+   * @param credentials
+   * @param ps array of paths
+   * @param conf configuration
+   * @throws IOException
+   */
+  public static void obtainTokensForNamenodes(Credentials credentials,
+      Path[] ps, Configuration conf) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    obtainTokensForNamenodesInternal(credentials, ps, conf);
+  }
+
+  /**
+   * Remove jobtoken referrals which don't make sense in the context
+   * of the task execution.
+   *
+   * @param conf
+   */
+  public static void cleanUpTokenReferral(Configuration conf) {
+    conf.unset(TezJobConfig.DAG_CREDENTIALS_BINARY);
+  }
+
+  static void obtainTokensForNamenodesInternal(Credentials credentials,
+      Path[] ps, Configuration conf) throws IOException {
+    Set<FileSystem> fsSet = new HashSet<FileSystem>();
+    for(Path p: ps) {
+      fsSet.add(p.getFileSystem(conf));
+    }
+    for (FileSystem fs : fsSet) {
+      obtainTokensForNamenodesInternal(fs, credentials, conf);
+    }
+  }
+
+  /**
+   * get delegation token for a specific FS
+   * @param fs
+   * @param credentials
+   * @param p
+   * @param conf
+   * @throws IOException
+   */
+  static void obtainTokensForNamenodesInternal(FileSystem fs, 
+      Credentials credentials, Configuration conf) throws IOException {
+    String delegTokenRenewer = Master.getMasterPrincipal(conf);
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      throw new IOException(
+          "Can't get Master Kerberos principal for use as renewer");
+    }
+    mergeBinaryTokens(credentials, conf);
+
+    final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
+                                                     credentials);
+    if (tokens != null) {
+      for (Token<?> token : tokens) {
+        LOG.info("Got dt for " + fs.getUri() + "; "+token);
+      }
+    }
+  }
+
+  private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
+    String binaryTokenFilename =
+        conf.get(TezJobConfig.DAG_CREDENTIALS_BINARY);
+    if (binaryTokenFilename != null) {
+      Credentials binary;
+      try {
+        binary = Credentials.readTokenStorageFile(
+            new Path("file:///" +  binaryTokenFilename), conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      // supplement existing tokens with the tokens in the binary file
+      creds.mergeAll(binary);
+    }
+  }
+  
+  /**
+   * file name used on HDFS for generated job token
+   */
+  @InterfaceAudience.Private
+  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
+
+  /**
+   * conf setting for job tokens cache file name
+   */
+  @InterfaceAudience.Private
+  public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
+  private static final Text JOB_TOKEN = new Text("JobToken");
+  private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
+  
+  /**
+   * load job token from a file
+   * @param conf
+   * @throws IOException
+   */
+  @InterfaceAudience.Private
+  public static Credentials loadTokens(String jobTokenFile, Configuration conf) 
+  throws IOException {
+    Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
+
+    Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Task: Loaded jobTokenFile from: "+
+          localJobTokenFile.toUri().getPath() 
+          +"; num of sec keys  = " + ts.numberOfSecretKeys() +
+          " Number of tokens " +  ts.numberOfTokens());
+    }
+    return ts;
+  }
+  /**
+   * store job token
+   * @param t
+   */
+  @InterfaceAudience.Private
+  public static void setJobToken(Token<? extends TokenIdentifier> t, 
+      Credentials credentials) {
+    credentials.addToken(JOB_TOKEN, t);
+  }
+  /**
+   * 
+   * @return job token
+   */
+  @SuppressWarnings("unchecked")
+  @InterfaceAudience.Private
+  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
+    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
+  }
+
+  @InterfaceAudience.Private
+  public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
+    credentials.addSecretKey(SHUFFLE_TOKEN, key);
+  }
+
+  @InterfaceAudience.Private
+  public static byte[] getShuffleSecretKey(Credentials credentials) {
+    return getSecretKey(credentials, SHUFFLE_TOKEN);
+  }
+}

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1470655&r1=1470654&r2=1470655&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Mon Apr 22 18:31:05 2013
@@ -114,6 +114,8 @@ public class DeprecatedKeys {
     _(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
     
     _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+    
+    _(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
   }
 
   private static void _(String oldKey, String newKey) {