You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/29 02:15:08 UTC

git commit: TEZ-989. Allow addition of resources to the AM for a running session. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master f34c7f320 -> 34a557d13


TEZ-989. Allow addition of resources to the AM for a running session.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/34a557d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/34a557d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/34a557d1

Branch: refs/heads/master
Commit: 34a557d1318aee9b23f5ac7ded9bccfd72508de1
Parents: f34c7f3
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Mar 28 18:14:51 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Mar 28 18:14:51 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   |  13 +-
 .../java/org/apache/tez/client/TezSession.java  |  57 ++++++-
 .../src/main/proto/DAGClientAMProtocol.proto    |   1 +
 .../org/apache/tez/common/RuntimeUtils.java     | 117 +++++++++++++++
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  49 ++-----
 ...DAGClientAMProtocolBlockingPBServerImpl.java |  13 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 147 ++++++++++++++++---
 .../org/apache/tez/dag/app/RecoveryParser.java  |   7 +-
 .../dag/app/dag/RootInputInitializerRunner.java |   8 +-
 .../dag/app/dag/event/DAGEventRecoverEvent.java |  21 ++-
 .../tez/dag/app/dag/event/DAGEventStartDag.java |  38 +++++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  20 ++-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |   2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |   2 +-
 .../dag/history/events/DAGSubmittedEvent.java   |  29 +++-
 .../tez/dag/utils/RelocalizationUtils.java      |  68 +++++++++
 tez-dag/src/main/proto/HistoryEvents.proto      |   3 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   5 +-
 .../TestHistoryEventsProtoConversion.java       |   4 +-
 .../split/TezGroupedSplitsInputFormat.java      |  11 +-
 .../split/TezGroupedSplitsInputFormat.java      |   7 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |   1 +
 .../org/apache/tez/runtime/RuntimeUtils.java    | 116 ---------------
 .../apache/tez/runtime/TestRuntimeUtils.java    |   1 +
 .../vertexmanager/TestShuffleVertexManager.java |   2 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 131 ++++++++++++++++-
 .../java/org/apache/tez/test/TestTezJobs.java   |   1 -
 tez-tests/src/test/resources/test_jar           | Bin 0 -> 1101 bytes
 29 files changed, 649 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8b87fb0..2ec1939 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.UUID;
 import java.util.Vector;
 import java.util.Map.Entry;
 
@@ -55,7 +54,6 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -480,7 +478,16 @@ public class TezClientUtils {
         DagTypeConverters.convertFromLocalResources(sessionJars);
       sessionJarsPBOutStream = FileSystem.create(fs, sessionJarsPath,
         new FsPermission(TEZ_AM_FILE_PERMISSION));
-      proto.writeTo(sessionJarsPBOutStream);
+      proto.writeDelimitedTo(sessionJarsPBOutStream);
+      
+      // Write out the initial list of resources which will be available in the AM
+      DAGProtos.PlanLocalResourcesProto amResourceProto;
+      if (amConfig.getLocalResources() != null && !amConfig.getLocalResources().isEmpty()) {
+        amResourceProto = DagTypeConverters.convertFromLocalResources(localResources);
+      } else {
+        amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance(); 
+      }
+      amResourceProto.writeDelimitedTo(sessionJarsPBOutStream);
     } finally {
       if (sessionJarsPBOutStream != null) {
         sessionJarsPBOutStream.close();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index ff7f00a..f131005 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.common.TezYARNUtils;
@@ -54,6 +55,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
 
 public class TezSession {
@@ -137,14 +139,50 @@ public class TezSession {
    * Submit a DAG to a Tez Session. Blocks until either the DAG is submitted to
    * the session or configured timeout period expires. Cleans up session if the
    * submission timed out.
+   * 
+   * Recommended API.
+   * 
    * @param dag DAG to be submitted to Session
    * @return DAGClient to monitor the DAG
    * @throws TezException
    * @throws IOException
    * @throws SessionNotRunning if session is not alive
    * @throws DAGSubmissionTimedOut if submission timed out
+   */  
+  public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException,
+      InterruptedException {
+    return submitDAG(dag, null);
+  }
+
+  /**
+   * Submit a DAG to a Tez Session. Blocks until either the DAG is submitted to
+   * the session or configured timeout period expires. Cleans up session if the
+   * submission timed out.
+   * 
+   * 
+   * 
+   * Allows specification of additional resources which will be added to the
+   * running AMs classpath for execution of this DAG.</p>
+   * Caveats: Resources stack across DAG submissions and are never removed from the classpath. Only
+   * LocalResourceType.FILE is supported. All resources will be treated as private.
+   * 
+   * This method is not recommended unless their is no choice but to add resources to an existing session.
+   * Recommended usage is to setup AM local resources up front via AMConfiguration.
+   * 
+   * @param dag
+   *          DAG to be submitted to Session
+   * @param additionalAmResources
+   *          additional resources which should be localized in the AM while
+   *          executing this DAG.
+   * @return DAGClient to monitor the DAG
+   * @throws TezException
+   * @throws IOException
+   * @throws SessionNotRunning
+   *           if session is not alive
+   * @throws DAGSubmissionTimedOut
+   *           if submission timed out
    */
-  public synchronized DAGClient submitDAG(DAG dag)
+  public synchronized DAGClient submitDAG(DAG dag, Map<String, LocalResource> additionalAmResources)
     throws TezException, IOException, InterruptedException {
     verifySessionStateForSubmission();
 
@@ -152,6 +190,13 @@ public class TezSession {
     LOG.info("Submitting dag to TezSession"
       + ", sessionName=" + sessionName
       + ", applicationId=" + applicationId);
+    
+    if (additionalAmResources != null && !additionalAmResources.isEmpty()) {
+      for (LocalResource lr : additionalAmResources.values()) {
+        Preconditions.checkArgument(lr.getType() == LocalResourceType.FILE, "LocalResourceType: "
+            + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
+      }
+    }
 
     // Obtain DAG specific credentials.
     TezClientUtils.setupDAGCredentials(dag, sessionCredentials, sessionConfig.getTezConfiguration());
@@ -167,8 +212,12 @@ public class TezSession {
     }
     
     DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
-    SubmitDAGRequestProto requestProto =
-        SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
+    SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
+    requestBuilder.setDAGPlan(dagPlan).build();
+    if (additionalAmResources != null && !additionalAmResources.isEmpty()) {
+      requestBuilder.setAdditionalAmResources(DagTypeConverters
+          .convertFromLocalResources(additionalAmResources));
+    }
 
     DAGClientAMProtocolBlockingPB proxy = waitForProxy();
     if (proxy == null) {
@@ -183,7 +232,7 @@ public class TezSession {
     }
 
     try {
-      dagId = proxy.submitDAG(null, requestProto).getDagId();
+      dagId = proxy.submitDAG(null, requestBuilder.build()).getDagId();
     } catch (ServiceException e) {
       throw new TezException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index 525679e..8d33a19 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -61,6 +61,7 @@ message TryKillDAGResponseProto {
 
 message SubmitDAGRequestProto {
   optional DAGPlan d_a_g_plan = 1;
+  optional PlanLocalResourcesProto additional_am_resources = 2;
 }
 
 message SubmitDAGResponseProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java b/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
new file mode 100644
index 0000000..c3d98f7
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+@Private
+public class RuntimeUtils {
+  
+  private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
+
+  @Private
+  public static Class<?> getClazz(String className) {
+    Class<?> clazz = CLAZZ_CACHE.get(className);
+    if (clazz == null) {
+      try {
+        clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
+      } catch (ClassNotFoundException e) {
+        throw new TezUncheckedException("Unable to load class: " + className, e);
+      }
+    }
+    return clazz;
+  }
+
+  private static <T> T getNewInstance(Class<T> clazz) {
+    T instance;
+    try {
+      instance = clazz.newInstance();
+    } catch (InstantiationException e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
+    }
+    return instance;
+  }
+
+  @Private
+  public static <T> T createClazzInstance(String className) {
+    Class<?> clazz = getClazz(className);
+    @SuppressWarnings("unchecked")
+    T instance = (T) getNewInstance(clazz);
+    return instance;
+  }
+  
+  
+  @Private
+  public static synchronized void addResourcesToClasspath(List<URL> urls) {
+    ClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Thread
+        .currentThread().getContextClassLoader());
+    Thread.currentThread().setContextClassLoader(classLoader);
+  }
+  
+  
+  // Parameters for addResourcesToSystemClassLoader
+  private static final Class<?>[] parameters = new Class[]{URL.class};
+  private static Method sysClassLoaderMethod = null;
+  
+  
+  @Private  
+  public static synchronized void addResourcesToSystemClassLoader(List<URL> urls) throws TezException {
+    URLClassLoader sysLoader = (URLClassLoader)ClassLoader.getSystemClassLoader();
+    
+    if (sysClassLoaderMethod == null) {
+      Class<?> sysClass = URLClassLoader.class;
+      Method method;
+      try {
+        method = sysClass.getDeclaredMethod("addURL", parameters);
+      } catch (SecurityException e) {
+        throw new TezException("Failed to get handle on method addURL", e);
+      } catch (NoSuchMethodException e) {
+        throw new TezException("Failed to get handle on method addURL", e);
+      }
+      method.setAccessible(true);
+      sysClassLoaderMethod = method;
+    }
+    for (URL url : urls) {
+      try {
+        sysClassLoaderMethod.invoke(sysLoader, new Object[] { url });
+      } catch (IllegalArgumentException e) {
+        throw new TezException("Failed to invoke addURL for rsrc: " + url, e);
+      } catch (IllegalAccessException e) {
+        throw new TezException("Failed to invoke addURL for rsrc: " + url, e);
+      } catch (InvocationTargetException e) {
+        throw new TezException("Failed to invoke addURL for rsrc: " + url, e);
+      }
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index d64cc7f..d68dfad 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -32,7 +33,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -70,8 +70,8 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -87,8 +87,10 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
+import com.google.common.base.Function;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -490,7 +492,18 @@ public class YarnTezDagChild {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Additional Resources added to container: " + additionalResources);
         }
-        processAdditionalResources(additionalResources, defaultConf);
+
+        LOG.info("Localizing additional local resources for Task : " + additionalResources);
+        List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources(
+            Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() {
+              @Override
+              public URI apply(TezLocalResource input) {
+                return input.getUri();
+              }
+            }), defaultConf);
+        RelocalizationUtils.addUrlsToClassPath(downloadedUrls);
+
+        LOG.info("Done localizing additional resources");
         final TaskSpec taskSpec = containerTask.getTaskSpec();
         if (LOG.isDebugEnabled()) {
           LOG.debug("New container task context:"
@@ -708,34 +721,4 @@ public class YarnTezDagChild {
     }
     TezUtils.updateLoggers(addend);
   }
-
-  private static void processAdditionalResources(Map<String, TezLocalResource> additionalResources,
-      Configuration conf) throws IOException, TezException {
-    if (additionalResources == null || additionalResources.isEmpty()) {
-      return;
-    }
-
-    LOG.info("Downloading " + additionalResources.size() + "additional resources");
-    List<URL> urls = Lists.newArrayListWithCapacity(additionalResources.size());
-
-    for (Entry<String, TezLocalResource> lrEntry : additionalResources.entrySet()) {
-      Path dFile = downloadResource(lrEntry.getKey(), lrEntry.getValue(), conf);
-      urls.add(dFile.toUri().toURL());
-    }
-    RuntimeUtils.addResourcesToClasspath(urls);
-    LOG.info("Done localizing additional resources");
-  }
-
-  private static Path downloadResource(String destName, TezLocalResource resource,
-      Configuration conf) throws IOException {
-    resource.getUri();
-    FileSystem fs = FileSystem.get(resource.getUri(), conf);
-    Path cwd = new Path(System.getenv(Environment.PWD.name()));
-    Path dFile = new Path(cwd, destName);
-    Path srcPath = new Path(resource.getUri());
-    fs.copyToLocalFile(srcPath, dFile);
-    return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
-  }
-  
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index cca0827..f205f9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -19,7 +19,9 @@
 package org.apache.tez.dag.api.client.rpc;
 
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezException;
@@ -49,8 +51,7 @@ import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
-public class DAGClientAMProtocolBlockingPBServerImpl implements
-    DAGClientAMProtocolBlockingPB {
+public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProtocolBlockingPB {
 
   DAGClientHandler real;
 
@@ -122,7 +123,12 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
       SubmitDAGRequestProto request) throws ServiceException {
     try{
       DAGPlan dagPlan = request.getDAGPlan();
-      String dagId = real.submitDAG(dagPlan);
+      Map<String, LocalResource> additionalResources = null;
+      if (request.hasAdditionalAmResources()) {
+        additionalResources = DagTypeConverters.convertFromPlanLocalResources(request
+            .getAdditionalAmResources());
+      }
+      String dagId = real.submitDAG(dagPlan, additionalResources);
       return SubmitDAGResponseProto.newBuilder().setDagId(dagId).build();
     } catch(TezException e) {
       throw wrapException(e);
@@ -166,5 +172,4 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
       throw wrapException(e);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 526bea7..6a92161 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -25,6 +25,9 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,6 +37,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -81,6 +85,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.impl.LogUtils;
@@ -111,6 +116,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -138,10 +144,13 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.utils.Graph;
+import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.codehaus.jettison.json.JSONException;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
 
 /**
  * The Map-Reduce Application Master.
@@ -201,6 +210,8 @@ public class DAGAppMaster extends AbstractService {
   private VertexEventDispatcher vertexEventDispatcher;
   private TaskSchedulerEventHandler taskSchedulerEventHandler;
   private HistoryEventHandler historyEventHandler;
+  private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
+  private final Map<String, LocalResource> cumulativeAdditionalResources = new HashMap<String, LocalResource>();
   private final Map<String, LocalResource> sessionResources =
     new HashMap<String, LocalResource>();
 
@@ -370,10 +381,14 @@ public class DAGAppMaster extends AbstractService {
       try {
         sessionResourcesStream = new FileInputStream(
           TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
-        PlanLocalResourcesProto localResourcesProto =
-          PlanLocalResourcesProto.parseFrom(sessionResourcesStream);
+        PlanLocalResourcesProto sessionLocalResourcesProto =
+          PlanLocalResourcesProto.parseDelimitedFrom(sessionResourcesStream);
+        PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto
+            .parseDelimitedFrom(sessionResourcesStream);
         sessionResources.putAll(DagTypeConverters.convertFromPlanLocalResources(
-          localResourcesProto));
+          sessionLocalResourcesProto));
+        amResources.putAll(DagTypeConverters.convertFromPlanLocalResources(amLocalResourceProto));
+        amResources.putAll(sessionResources);
       } finally {
         if (sessionResourcesStream != null) {
           sessionResourcesStream.close();
@@ -902,8 +917,8 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  synchronized String submitDAGToAppMaster(DAGPlan dagPlan)
-      throws TezException  {
+  synchronized String submitDAGToAppMaster(DAGPlan dagPlan,
+      Map<String, LocalResource> additionalResources) throws TezException {
     if(currentDAG != null
         && !state.equals(DAGAppMasterState.IDLE)) {
       throw new TezException("App master already running a DAG");
@@ -919,6 +934,8 @@ public class DAGAppMaster extends AbstractService {
     LOG.info("Starting DAG submitted via RPC");
 
     if (LOG.isDebugEnabled()) {
+      LOG.debug("Invoked with additional local resources: " + additionalResources);
+      
       LOG.debug("Writing DAG plan to: "
           + TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
 
@@ -935,7 +952,7 @@ public class DAGAppMaster extends AbstractService {
     }
 
     submittedDAGs.incrementAndGet();
-    startDAG(dagPlan);
+    startDAG(dagPlan, additionalResources);
     return currentDAG.getID().toString();
   }
 
@@ -981,7 +998,7 @@ public class DAGAppMaster extends AbstractService {
         + ", processor=" + preWarmContext.getProcessorDescriptor().getClassName()
         + ", numContainers=" + preWarmContext.getNumTasks()
         + ", containerResource=" + preWarmContext.getResource());
-    startDAG(dag.createDag(amConf));
+    startDAG(dag.createDag(amConf), null);
   }
 
   public class DAGClientHandler {
@@ -1036,8 +1053,9 @@ public class DAGAppMaster extends AbstractService {
       sendEvent(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
     }
 
-    public synchronized String submitDAG(DAGPlan dagPlan) throws TezException {
-      return submitDAGToAppMaster(dagPlan);
+    public synchronized String submitDAG(DAGPlan dagPlan,
+        Map<String, LocalResource> additionalAmResources) throws TezException {
+      return submitDAGToAppMaster(dagPlan, additionalAmResources);
     }
 
     public synchronized void shutdownAM() {
@@ -1075,6 +1093,59 @@ public class DAGAppMaster extends AbstractService {
 
   }
 
+  private Map<String, LocalResource> getAdditionalLocalResourceDiff(
+      Map<String, LocalResource> additionalResources) {
+    if (additionalResources == null) {
+      return Collections.emptyMap();
+    } else {
+      // Check for existing resources.
+      Iterator<Entry<String, LocalResource>> lrIter = additionalResources.entrySet().iterator();
+      while (lrIter.hasNext()) {
+        Entry<String, LocalResource> lrEntry = lrIter.next();
+        LocalResource existing = amResources.get(lrEntry.getKey());
+        if (existing != null) {
+          if (!existing.equals(lrEntry.getValue())) {
+            throw new TezUncheckedException(
+                "Cannot add different additional resources with the same name : "
+                    + lrEntry.getKey() + ", Existing: [" + existing + "], New: ["
+                    + lrEntry.getValue() + "]");
+          } else {
+            lrIter.remove();
+          }
+        }
+      }
+      return containerSignatureMatcher.getAdditionalResources(amResources, additionalResources);
+    }
+  }
+
+  private List<URL> processAdditionalResources(Map<String, LocalResource> lrDiff)
+      throws TezException {
+    if (lrDiff == null || lrDiff.isEmpty()) {
+      return Collections.emptyList();
+    } else {
+      LOG.info("Localizing additional local resources for AM : " + lrDiff);
+      List<URL> downloadedURLs;
+      try {
+        downloadedURLs = RelocalizationUtils.processAdditionalResources(
+            Maps.transformValues(lrDiff, new Function<LocalResource, URI>() {
+
+              @Override
+              public URI apply(LocalResource input) {
+                try {
+                  return TezConverterUtils.getURIFromYarnURL(input.getResource());
+                } catch (URISyntaxException e) {
+                  throw new TezUncheckedException("Failed while handling : " + input, e);
+                }
+              }
+            }), getConfig());
+      } catch (IOException e) {
+        throw new TezException(e);
+      }
+      LOG.info("Done downloading additional AM resources");
+      return downloadedURLs;
+    }
+  }
+
   private class RunningAppContext implements AppContext {
 
     private DAG dag;
@@ -1396,7 +1467,7 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private RecoveredDAGData recoverDAG() throws IOException {
+  private RecoveredDAGData recoverDAG() throws IOException, TezException {
     if (recoveryEnabled) {
       if (this.appAttemptID.getAttemptId() > 1) {
         LOG.info("Recovering data from previous attempts"
@@ -1424,8 +1495,7 @@ public class DAGAppMaster extends AbstractService {
     DefaultMetricsSystem.initialize("DAGAppMaster");
 
     this.appsStartTime = clock.getTime();
-    AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
-        appsStartTime);
+    AMStartedEvent startEvent = new AMStartedEvent(appAttemptID, appsStartTime);
     historyEventHandler.handle(
         new DAGHistoryEvent(startEvent));
 
@@ -1451,6 +1521,13 @@ public class DAGAppMaster extends AbstractService {
     }
 
     if (recoveredDAGData != null) {
+      List<URL> classpathUrls = null;
+      if (recoveredDAGData.cumulativeAdditionalResources != null) {
+        classpathUrls = processAdditionalResources(recoveredDAGData.cumulativeAdditionalResources);
+        amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
+        cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
+      }
+      
       if (recoveredDAGData.isCompleted
           || recoveredDAGData.nonRecoverable) {
         LOG.info("Found previous DAG in completed or non-recoverable state"
@@ -1464,21 +1541,21 @@ public class DAGAppMaster extends AbstractService {
         if (recoveredDAGData.nonRecoverable) {
           DAGEventRecoverEvent recoverDAGEvent =
               new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
-                  DAGState.FAILED);
+                  DAGState.FAILED, classpathUrls);
           dagEventDispatcher.handle(recoverDAGEvent);
           this.state = DAGAppMasterState.RUNNING;
         } else {
           DAGEventRecoverEvent recoverDAGEvent =
               new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
-                  recoveredDAGData.dagState);
+                  recoveredDAGData.dagState, classpathUrls);
           dagEventDispatcher.handle(recoverDAGEvent);
           this.state = DAGAppMasterState.RUNNING;
         }
       } else {
         LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
         _updateLoggers(recoveredDAGData.recoveredDAG, "");
-        DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAGData.recoveredDAG.getID(),
-            DAGEventType.DAG_RECOVER);
+        DAGEventRecoverEvent recoverDAGEvent = new DAGEventRecoverEvent(
+            recoveredDAGData.recoveredDAG.getID(), classpathUrls);
         dagEventDispatcher.handle(recoverDAGEvent);
         this.state = DAGAppMasterState.RUNNING;
       }
@@ -1706,7 +1783,7 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private void startDAG() throws IOException {
+  private void startDAG() throws IOException, TezException {
     FileInputStream dagPBBinaryStream = null;
     try {
       DAGPlan dagPlan = null;
@@ -1716,7 +1793,7 @@ public class DAGAppMaster extends AbstractService {
           TezConfiguration.TEZ_PB_PLAN_BINARY_NAME);
       dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
 
-      startDAG(dagPlan);
+      startDAG(dagPlan, null);
 
     } finally {
       if (dagPBBinaryStream != null) {
@@ -1725,7 +1802,8 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private void startDAG(DAGPlan dagPlan) {
+  private void startDAG(DAGPlan dagPlan, Map<String, LocalResource> additionalAMResources)
+      throws TezException {
     long submitTime = this.clock.getTime();
     this.state = DAGAppMasterState.RUNNING;
     this.appName = dagPlan.getName();
@@ -1739,24 +1817,47 @@ public class DAGAppMaster extends AbstractService {
         LOG.debug("DAG has vertex " + v.getName());
       }
     }
+    
+    Map<String, LocalResource> lrDiff = getAdditionalLocalResourceDiff(additionalAMResources);
+    if (lrDiff != null) {
+      amResources.putAll(lrDiff);
+      cumulativeAdditionalResources.putAll(lrDiff);
+    }
 
     LOG.info("Running DAG: " + dagPlan.getName());
     // Job name is the same as the app name until we support multiple dags
     // for an app later
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
-        submitTime, dagPlan, this.appAttemptID);
+        submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources);
     try {
       historyEventHandler.handleCriticalEvent(
           new DAGHistoryEvent(newDAG.getID(), submittedEvent));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    startDAG(newDAG);
+    
+    startDAGExecution(newDAG, lrDiff);
   }
 
-  private void startDAG(DAG dag) {
+  private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources)
+      throws TezException {
     currentDAG = dag;
 
+    // Try localizing the actual resources.
+    List<URL> additionalUrlsForClasspath;
+    try {
+      additionalUrlsForClasspath = dag.getDagUGI().doAs(new PrivilegedExceptionAction<List<URL>>() {
+        @Override
+        public List<URL> run() throws Exception {
+          return processAdditionalResources(additionalAmResources);
+        }
+      });
+    } catch (IOException e) {
+      throw new TezException(e);
+    } catch (InterruptedException e) {
+      throw new TezException(e);
+    }
+
     // End of creating the job.
     ((RunningAppContext) context).setDAG(currentDAG);
 
@@ -1769,7 +1870,7 @@ public class DAGAppMaster extends AbstractService {
 
     // All components have started, start the job.
     /** create a job-start event to get this ball rolling */
-    DAGEvent startDagEvent = new DAGEvent(currentDAG.getID(), DAGEventType.DAG_START);
+    DAGEvent startDagEvent = new DAGEventStartDag(currentDAG.getID(), additionalUrlsForClasspath);
     /** send the job-start event. this triggers the job execution. */
     sendEvent(startDagEvent);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 1a9b760..093069c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
@@ -103,6 +104,7 @@ public class RecoveryParser {
     public boolean isCompleted = false;
     public boolean nonRecoverable = false;
     public String reason = null;
+    public Map<String, LocalResource> cumulativeAdditionalResources = null;
   }
 
   private static void parseSummaryFile(FSDataInputStream inputStream)
@@ -627,11 +629,14 @@ public class RecoveryParser {
       switch (eventType) {
         case DAG_SUBMITTED:
         {
+          DAGSubmittedEvent submittedEvent = (DAGSubmittedEvent) event;
           LOG.info("Recovering from event"
               + ", eventType=" + eventType
               + ", event=" + event.toString());
-          recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+          recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(submittedEvent.getDAGPlan(),
               lastInProgressDAG);
+          recoveredDAGData.cumulativeAdditionalResources = submittedEvent
+            .getCumulativeAdditionalLocalResources();
           recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
           dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
           if (recoveredDAGData.nonRecoverable) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
index 7d3cd28..6b65af3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
 import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -148,13 +149,12 @@ public class RootInputInitializerRunner {
       return events;
     }
 
-    private TezRootInputInitializer createInitializer()
-        throws ClassNotFoundException, InstantiationException,
+    private TezRootInputInitializer createInitializer() throws InstantiationException,
         IllegalAccessException {
       String className = input.getInitializerClassName();
       @SuppressWarnings("unchecked")
-      Class<? extends TezRootInputInitializer> clazz = (Class<? extends TezRootInputInitializer>) Class
-          .forName(className);
+      Class<? extends TezRootInputInitializer> clazz = (Class<? extends TezRootInputInitializer>) RuntimeUtils
+          .getClazz(className);
       TezRootInputInitializer initializer = clazz.newInstance();
       return initializer;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
index e64ad13..45e44f3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
@@ -18,20 +18,37 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import java.net.URL;
+import java.util.List;
+
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.records.TezDAGID;
 
 public class DAGEventRecoverEvent extends DAGEvent {
 
   private final DAGState desiredState;
+  private final List<URL> additionalUrlsForClasspath;
 
-  public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState) {
+  public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState,
+      List<URL> additionalUrlsForClasspath) {
     super(dagId, DAGEventType.DAG_RECOVER);
     this.desiredState = desiredState;
+    this.additionalUrlsForClasspath = additionalUrlsForClasspath;
   }
-
+  
+  public DAGEventRecoverEvent(TezDAGID dagId, List<URL> additionalUrlsForClasspath) {
+    this(dagId, null, additionalUrlsForClasspath);
+  }
+  
   public DAGState getDesiredState() {
     return desiredState;
   }
+  
+  public List<URL> getAdditionalUrlsForClasspath() {
+    return this.additionalUrlsForClasspath;
+  }
 
+  public boolean hasDesiredState() {
+    return this.desiredState != null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventStartDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventStartDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventStartDag.java
new file mode 100644
index 0000000..9e6d75b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventStartDag.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.net.URL;
+import java.util.List;
+
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventStartDag extends DAGEvent {
+
+  private final List<URL> additionalUrlsForClasspath;
+  
+  public DAGEventStartDag(TezDAGID dagId, List<URL> additionalUrlsForClasspath) {
+    super(dagId, DAGEventType.DAG_START);
+    this.additionalUrlsForClasspath = additionalUrlsForClasspath;
+  }
+
+  public List<URL> getAdditionalUrlsForClasspath() {
+    return this.additionalUrlsForClasspath;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0e4c504..c92d51b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.io.IOException;
+import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -88,6 +89,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
@@ -105,6 +107,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.OutputCommitter;
 
@@ -1301,17 +1304,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     @Override
     public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
-      if (dagEvent instanceof DAGEventRecoverEvent) {
+      DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
+      if (recoverEvent.hasDesiredState()) {
         // DAG completed or final end state known
-        DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
         dag.recoveredState = recoverEvent.getDesiredState();
       }
+      if (recoverEvent.getAdditionalUrlsForClasspath() != null) {
+        LOG.info("Added additional resources : [" + recoverEvent.getAdditionalUrlsForClasspath()
+            + "] to classpath");
+        RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath());
+      }
 
       switch (dag.recoveredState) {
         case NEW:
           // send DAG an Init and start events
           dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
-          dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_START));
+          dag.eventHandler.handle(new DAGEventStartDag(dag.getID(), null));
           return DAGState.NEW;
         case INITED:
           // DAG inited but not started
@@ -1464,8 +1472,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
      */
     @Override
     public void transition(DAGImpl dag, DAGEvent event) {
+      DAGEventStartDag startEvent = (DAGEventStartDag) event;
       dag.startTime = dag.clock.getTime();
       dag.logJobHistoryStartedEvent();
+      List<URL> additionalUrlsForClasspath = startEvent.getAdditionalUrlsForClasspath();
+      if (additionalUrlsForClasspath != null) {
+        LOG.info("Added additional resources : [" + additionalUrlsForClasspath  + "] to classpath");
+        RelocalizationUtils.addUrlsToClassPath(additionalUrlsForClasspath);
+      }
       // TODO Metrics
       //job.metrics.runningJob(job);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index aecb5d2..2e07a4d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -37,7 +38,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6f1398a..8e7bf8d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -122,7 +123,6 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 6f75481..f4404d5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -42,7 +43,6 @@ import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index af83dc8..18f2205 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -21,9 +21,12 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -45,16 +48,19 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   private long submitTime;
   private DAGProtos.DAGPlan dagPlan;
   private ApplicationAttemptId applicationAttemptId;
+  private Map<String, LocalResource> cumulativeAdditionalLocalResources;
 
   public DAGSubmittedEvent() {
   }
 
   public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
-      DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId) {
+      DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
+      Map<String, LocalResource> cumulativeAdditionalLocalResources) {
     this.dagID = dagID;
     this.submitTime = submitTime;
     this.dagPlan = dagPlan;
     this.applicationAttemptId = applicationAttemptId;
+    this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
   }
 
   @Override
@@ -129,21 +135,28 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   }
 
   public DAGSubmittedProto toProto() {
-    return DAGSubmittedProto.newBuilder()
+    DAGSubmittedProto.Builder builder =DAGSubmittedProto.newBuilder()
         .setDagId(dagID.toString())
         .setApplicationAttemptId(applicationAttemptId.toString())
         .setDagPlan(dagPlan)
-        .setSubmitTime(submitTime)
-        .build();
-
+        .setSubmitTime(submitTime);
+    if (cumulativeAdditionalLocalResources != null && !cumulativeAdditionalLocalResources.isEmpty()) {
+      builder.setCumulativeAdditionalAmResources(DagTypeConverters
+          .convertFromLocalResources(cumulativeAdditionalLocalResources));
+    }
+    return builder.build();
   }
-
+ 
   public void fromProto(DAGSubmittedProto proto) {
     this.dagID = TezDAGID.fromString(proto.getDagId());
     this.dagPlan = proto.getDagPlan();
     this.submitTime = proto.getSubmitTime();
     this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
         proto.getApplicationAttemptId());
+    if (proto.hasCumulativeAdditionalAmResources()) {
+      this.cumulativeAdditionalLocalResources = DagTypeConverters.convertFromPlanLocalResources(proto
+          .getCumulativeAdditionalAmResources());
+    }
   }
 
   @Override
@@ -198,6 +211,10 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   public ApplicationAttemptId getApplicationAttemptId() {
     return applicationAttemptId;
   }
+  
+  public Map<String, LocalResource> getCumulativeAdditionalLocalResources() {
+    return cumulativeAdditionalLocalResources;
+  }
 
   public long getSubmitTime() {
     return submitTime;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
new file mode 100644
index 0000000..7a774b2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.utils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.dag.api.TezException;
+
+import com.google.common.collect.Lists;
+
+public class RelocalizationUtils {
+  
+  public static List<URL> processAdditionalResources(Map<String, URI> additionalResources,
+      Configuration conf) throws IOException, TezException {
+    if (additionalResources == null || additionalResources.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<URL> urls = Lists.newArrayListWithCapacity(additionalResources.size());
+
+    for (Entry<String, URI> lrEntry : additionalResources.entrySet()) {
+      Path dFile = downloadResource(lrEntry.getKey(), lrEntry.getValue(), conf);
+      urls.add(dFile.toUri().toURL());
+    }
+    return urls;
+  }
+
+  public static void addUrlsToClassPath(List<URL> urls) {
+    RuntimeUtils.addResourcesToClasspath(urls);
+  }
+  
+  private static Path downloadResource(String destName, URI uri, Configuration conf)
+      throws IOException {
+    FileSystem fs = FileSystem.get(uri, conf);
+    Path cwd = new Path(System.getenv(Environment.PWD.name()));
+    Path dFile = new Path(cwd, destName);
+    Path srcPath = new Path(uri);
+    fs.copyToLocalFile(srcPath, dFile);
+    return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index c640baa..c73870a 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -44,6 +44,7 @@ message DAGSubmittedProto {
   optional DAGPlan dag_plan = 2;
   optional int64 submit_time = 3;
   optional string application_attempt_id = 4;
+  optional PlanLocalResourcesProto cumulative_additional_am_resources = 5;
 }
 
 message DAGInitializedProto {
@@ -183,4 +184,4 @@ message SummaryEventProto {
 message VertexFinishStateProto {
   optional string vertex_id = 1;
   optional int32 state = 2;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 60c7b31..4e790e6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -77,6 +77,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate.UpdateType;
+import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -654,7 +655,7 @@ public class TestDAGImpl {
   @SuppressWarnings("unchecked")
   private void startDAG(DAGImpl impl) {
     dispatcher.getEventHandler().handle(
-        new DAGEvent(impl.getID(), DAGEventType.DAG_START));
+        new DAGEventStartDag(impl.getID(), null));
     dispatcher.await();
     Assert.assertEquals(DAGState.RUNNING, impl.getState());
   }
@@ -1043,7 +1044,7 @@ public class TestDAGImpl {
   @Test(timeout = 5000)
   public void testInvalidEvent() {
     dispatcher.getEventHandler().handle(
-        new DAGEvent(dagId, DAGEventType.DAG_START));
+        new DAGEventStartDag(dagId, null));
     dispatcher.await();
     Assert.assertEquals(DAGState.ERROR, dag.getState());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b9cae70..e533ffd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -42,7 +43,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
@@ -141,7 +141,7 @@ public class TestHistoryEventsProtoConversion {
         ApplicationId.newInstance(0, 1), 1), 1001l,
         DAGPlan.newBuilder().setName("foo").build(),
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1));
+            ApplicationId.newInstance(0, 1), 1), null);
     DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 6a215de..6e94c22 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.common.base.Preconditions;
@@ -90,15 +91,11 @@ public class TezGroupedSplitsInputFormat<K, V>
       }
     }
   }
-  
+
   static Class<?> getClassFromName(String name) {
-    try {
-      return Class.forName(name);
-    } catch (ClassNotFoundException e1) {
-      throw new TezUncheckedException(e1);
-    }
+    return RuntimeUtils.getClazz(name);
   }
-  
+
   public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
 
     TezGroupedSplit groupedSplit;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 874d9a4..5fa3e79 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.common.base.Preconditions;
@@ -127,11 +128,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V>
   }
   
   static Class<?> getClassFromName(String name) {
-    try {
-      return Class.forName(name);
-    } catch (ClassNotFoundException e1) {
-      throw new TezUncheckedException(e1);
-    }
+    return RuntimeUtils.getClazz(name);
   }
   
   public class TezGroupedSplitsRecordReader  extends RecordReader<K, V> {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index d2d9b62..d211a62 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -42,6 +42,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
deleted file mode 100644
index d414f31..0000000
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeUtils.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-
-public class RuntimeUtils {
-  
-  private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
-
-  @Private
-  public static Class<?> getClazz(String className) {
-    Class<?> clazz = CLAZZ_CACHE.get(className);
-    if (clazz == null) {
-      try {
-        clazz = Class.forName(className);
-      } catch (ClassNotFoundException e) {
-        throw new TezUncheckedException("Unable to load class: " + className, e);
-      }
-    }
-    return clazz;
-  }
-
-  private static <T> T getNewInstance(Class<T> clazz) {
-    T instance;
-    try {
-      instance = clazz.newInstance();
-    } catch (InstantiationException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
-    } catch (IllegalAccessException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
-    }
-    return instance;
-  }
-
-  @Private
-  public static <T> T createClazzInstance(String className) {
-    Class<?> clazz = getClazz(className);
-    @SuppressWarnings("unchecked")
-    T instance = (T) getNewInstance(clazz);
-    return instance;
-  }
-  
-  
-  @Private
-  public static synchronized void addResourcesToClasspath(List<URL> urls) {
-    ClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Thread
-        .currentThread().getContextClassLoader());
-    Thread.currentThread().setContextClassLoader(classLoader);
-  }
-  
-  
-  // Parameters for addResourcesToSystemClassLoader
-  private static final Class<?>[] parameters = new Class[]{URL.class};
-  private static Method sysClassLoaderMethod = null;
-  
-  
-  @Private  
-  public static synchronized void addResourcesToSystemClassLoader(List<URL> urls) throws TezException {
-    URLClassLoader sysLoader = (URLClassLoader)ClassLoader.getSystemClassLoader();
-    
-    if (sysClassLoaderMethod == null) {
-      Class<?> sysClass = URLClassLoader.class;
-      Method method;
-      try {
-        method = sysClass.getDeclaredMethod("addURL", parameters);
-      } catch (SecurityException e) {
-        throw new TezException("Failed to get handle on method addURL", e);
-      } catch (NoSuchMethodException e) {
-        throw new TezException("Failed to get handle on method addURL", e);
-      }
-      method.setAccessible(true);
-      sysClassLoaderMethod = method;
-    }
-    for (URL url : urls) {
-      try {
-        sysClassLoaderMethod.invoke(sysLoader, new Object[] { url });
-      } catch (IllegalArgumentException e) {
-        throw new TezException("Failed to invoke addURL for rsrc: " + url, e);
-      } catch (IllegalAccessException e) {
-        throw new TezException("Failed to invoke addURL for rsrcs: " + urls, e);
-      } catch (InvocationTargetException e) {
-        throw new TezException("Failed to invoke addURL for rsrcs: " + urls, e);
-      }
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
index 5f76afa..f374cc0 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.dag.api.TezException;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 334ebb4..2db972f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
@@ -37,7 +38,6 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index bb71dfe..2c7661f 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -53,6 +54,7 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -83,7 +85,10 @@ import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 import org.apache.tez.test.MiniTezCluster;
@@ -92,6 +97,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+
 public class TestMRRJobsDAGApi {
 
   private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
@@ -121,6 +127,7 @@ public class TestMRRJobsDAGApi {
           1, 1, 1);
       Configuration conf = new Configuration();
       conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
       mrrTezCluster.init(conf);
       mrrTezCluster.start();
     }
@@ -172,6 +179,92 @@ public class TestMRRJobsDAGApi {
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
   }
 
+  // Submit 2 jobs via RPC using a custom initializer. The second job is submitted with an
+  // additional local resource, which is verified by the initializer.
+  @Test(timeout = 120000)
+  public void testAMRelocalization() throws Exception {
+    Map<String, String> commonEnv = createCommonEnv();
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+        .valueOf(new Random().nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    TezConfiguration tezConf = new TezConfiguration(
+        mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+
+    Map<String, LocalResource> amLocalResources =
+        new HashMap<String, LocalResource>();
+
+    AMConfiguration amConfig = new AMConfiguration(
+        commonEnv, amLocalResources,
+        tezConf, null);
+    TezSessionConfiguration tezSessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    TezSession tezSession = new TezSession("testrelocalizationsession", tezSessionConfig);
+    tezSession.start();
+    Assert.assertEquals(TezSessionStatus.INITIALIZING,
+        tezSession.getSessionStatus());
+
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertFalse(remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
+    
+    // Start the second job with some additional resources.
+    
+    Path relocFilePath = new Path("/tmp/test.jar");
+    relocFilePath = remoteFs.makeQualified(relocFilePath);
+    
+    java.net.URL url = Thread.currentThread().getContextClassLoader().getResource("test_jar");
+    FileSystem localFs = FileSystem.getLocal(tezConf);
+    Path localPath = new Path(url.toURI());
+    localPath = localFs.makeQualified(localPath);
+    LOG.info("Copying file from local path: " + localPath);
+
+    remoteFs.copyFromLocalFile(localPath, relocFilePath);
+
+    Map<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
+    additionalResources.put("test.jar", LocalResource.newInstance(
+        ConverterUtils.getYarnUrlFromPath(relocFilePath), LocalResourceType.FILE,
+        LocalResourceVisibility.PRIVATE, 0, 0));
+
+    Assert.assertEquals(TezSessionStatus.READY,
+        tezSession.getSessionStatus());
+    finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertEquals(TezSessionStatus.READY,
+        tezSession.getSessionStatus());
+    Assert.assertTrue(remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
+
+    ApplicationId appId = tezSession.getApplicationId();
+    tezSession.stop();
+    Assert.assertEquals(TezSessionStatus.SHUTDOWN,
+        tezSession.getSessionStatus());
+
+    YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(mrrTezCluster.getConfig());
+    yarnClient.start();
+
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState().equals(
+          YarnApplicationState.FINISHED)
+          || appReport.getYarnApplicationState().equals(
+              YarnApplicationState.FAILED)
+          || appReport.getYarnApplicationState().equals(
+              YarnApplicationState.KILLED)) {
+        break;
+      }
+    }
+
+    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+    Assert.assertEquals(YarnApplicationState.FINISHED,
+        appReport.getYarnApplicationState());
+    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+        appReport.getFinalApplicationStatus());
+  }
+  
   // Submits a DAG to AM via RPC after AM has started
   @Test(timeout = 120000)
   public void testMultipleMRRSleepJobViaSession() throws IOException,
@@ -199,12 +292,12 @@ public class TestMRRJobsDAGApi {
         tezSession.getSessionStatus());
 
     State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
-        tezSession, false);
+        tezSession, false, null, null);
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
     Assert.assertEquals(TezSessionStatus.READY,
         tezSession.getSessionStatus());
     finalState = testMRRSleepJobDagSubmitCore(true, false, false,
-        tezSession, false);
+        tezSession, false, null, null);
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
     Assert.assertEquals(TezSessionStatus.READY,
         tezSession.getSessionStatus());
@@ -269,7 +362,7 @@ public class TestMRRJobsDAGApi {
       InterruptedException, TezException, ClassNotFoundException,
       YarnException {
     return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
-        closeSessionBeforeSubmit, null, genSplitsInAM);
+        closeSessionBeforeSubmit, null, genSplitsInAM, null, null);
   }
 
   private Map<String, String> createCommonEnv() {
@@ -282,7 +375,9 @@ public class TestMRRJobsDAGApi {
       boolean killDagWhileRunning,
       boolean closeSessionBeforeSubmit,
       TezSession reUseTezSession,
-      boolean genSplitsInAM) throws IOException,
+      boolean genSplitsInAM,
+      Class<? extends TezRootInputInitializer> initializerClass,
+      Map<String, LocalResource> additionalLocalResources) throws IOException,
       InterruptedException, TezException, ClassNotFoundException,
       YarnException {
     LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
@@ -358,8 +453,10 @@ public class TestMRRJobsDAGApi {
     
     DAG dag = new DAG("testMRRSleepJobDagSubmit");
     int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
-    Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
+    Class<? extends TezRootInputInitializer> inputInitializerClazz =
+        genSplitsInAM ? (initializerClass == null ? MRInputAMSplitGenerator.class : initializerClass)
         : null;
+    LOG.info("Using initializer class: " + initializerClass);
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(stage1Payload),
         stage1NumTasks, Resource.newInstance(256, 1));
@@ -496,7 +593,7 @@ public class TestMRRJobsDAGApi {
     if(dagViaRPC) {
       LOG.info("Submitting dag to tez session with appId="
           + tezSession.getApplicationId());
-      dagClient = tezSession.submitDAG(dag);
+      dagClient = tezSession.submitDAG(dag, additionalLocalResources);
       Assert.assertEquals(TezSessionStatus.RUNNING,
           tezSession.getSessionStatus());
     }
@@ -556,4 +653,26 @@ public class TestMRRJobsDAGApi {
       throw new TezUncheckedException("VertexGroups Test Failed");
     }
   }
+
+  // This class should not be used by more than one test in a single run, since
+  // the path it writes to is not dynamic.
+  public static class MRInputAMSplitGeneratorRelocalizationTest extends MRInputAMSplitGenerator {
+    public List<Event> initialize(TezRootInputInitializerContext rootInputContext)  throws Exception {
+      MRInputUserPayloadProto userPayloadProto = MRHelpers
+          .parseMRInputPayload(rootInputContext.getUserPayload());
+      Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+          .getConfigurationBytes());
+
+      try {
+        RuntimeUtils.getClazz("org.apache.tez.test.Test");
+        LOG.info("Class found");
+        FileSystem fs = FileSystem.get(conf);
+        fs.mkdirs(new Path("/tmp/relocalizationfilefound"));
+      } catch (TezUncheckedException e) {
+        LOG.info("Class not found");
+      }
+
+      return super.initialize(rootInputContext);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index ebbc3c1..dd2fd75 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -141,6 +141,5 @@ public class TestTezJobs {
     assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
     assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
     ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
-
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/34a557d1/tez-tests/src/test/resources/test_jar
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/resources/test_jar b/tez-tests/src/test/resources/test_jar
new file mode 100644
index 0000000..47414a2
Binary files /dev/null and b/tez-tests/src/test/resources/test_jar differ