You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/08/12 21:45:46 UTC

tez git commit: TEZ-3408. Allow Task Output Files to reside in DAG specific directories for Custom Shuffle Handler (Kuhu Shukla via jeagles)

Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 53ea6f5b3 -> 0e1d27743


TEZ-3408. Allow Task Output Files to reside in DAG specific directories for Custom Shuffle Handler (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0e1d2774
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0e1d2774
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0e1d2774

Branch: refs/heads/TEZ-3334
Commit: 0e1d2774307b196a9e763d52327570d8306630cb
Parents: 53ea6f5
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Aug 12 16:45:26 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Aug 12 16:45:26 2016 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  1 +
 .../processor/map/TestMapProcessor.java         |  4 +-
 .../apache/tez/auxservices/ShuffleHandler.java  | 60 ++++++++++-----
 .../tez/auxservices/TestShuffleHandler.java     | 78 ++++++++++++--------
 .../tez/runtime/library/common/Constants.java   |  1 +
 .../runtime/library/common/TezRuntimeUtils.java | 12 ++-
 .../runtime/library/common/shuffle/Fetcher.java | 14 +++-
 .../library/common/shuffle/ShuffleUtils.java    |  6 ++
 .../impl/SimpleFetchedInputAllocator.java       |  9 ++-
 .../orderedgrouped/FetcherOrderedGrouped.java   | 14 +++-
 .../shuffle/orderedgrouped/MergeManager.java    |  6 +-
 .../common/task/local/output/TezTaskOutput.java |  6 +-
 .../task/local/output/TezTaskOutputFiles.java   | 28 ++++---
 .../runtime/library/input/UnorderedKVInput.java |  4 +-
 .../impl/TestSimpleFetchedInputAllocator.java   |  5 +-
 .../TestUnorderedPartitionedKVWriter.java       |  9 ++-
 16 files changed, 175 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 2122cca..749edfe 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3408. Allow Task Output Files to reside in DAG specific directories for Custom Shuffle Handler
   TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to \u2018mapreduce_shuffle\u2019
   TEZ-3390. Package Shuffle Handler as a shaded uber-jar
   TEZ-3378. Move Shuffle Handler configuration into the Tez namespace

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 70f8763..b8f989c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -154,7 +154,9 @@ public class TestMapProcessor {
     task.close();
     
     OutputContext outputContext = task.getOutputContexts().iterator().next();
-    TezTaskOutput mapOutputs = new TezTaskOutputFiles(jobConf, outputContext.getUniqueIdentifier());
+    TezTaskOutput mapOutputs = new TezTaskOutputFiles(
+        jobConf, outputContext.getUniqueIdentifier(),
+        outputContext.getDagIdentifier());
     
     
     // TODO NEWTEZ FIXME OutputCommitter verification

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index af50cbf..d11dd2c 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -306,14 +306,16 @@ public class ShuffleHandler extends AuxiliaryService {
     private String user;
     private Map<String, Shuffle.MapOutputInfo> infoMap;
     private String jobId;
+    private String dagId;
 
     public ReduceContext(List<String> mapIds, int rId,
                          ChannelHandlerContext context, String usr,
                          Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
-                         String jobId) {
+                         String jobId, String dagId) {
 
       this.mapIds = mapIds;
       this.reduceId = rId;
+      this.dagId = dagId;
       /**
       * Atomic count for tracking the no. of map outputs that are yet to
       * complete. Multiple futureListeners' operationComplete() can decrement
@@ -828,7 +830,7 @@ public class ShuffleHandler extends AuxiliaryService {
         @Override
         public AttemptPathInfo load(AttemptPathIdentifier key) throws
             Exception {
-          String base = getBaseLocation(key.jobId, key.user);
+          String base = getBaseLocation(key.jobId, key.dagId, key.user);
           String attemptBase = base + key.attemptId;
           Path indexFileName = lDirAlloc.getLocalPathToRead(
               attemptBase + "/" + INDEX_FILE_NAME, conf);
@@ -907,16 +909,18 @@ public class ShuffleHandler extends AuxiliaryService {
       final List<String> mapIds = splitMaps(q.get("map"));
       final List<String> reduceQ = q.get("reduce");
       final List<String> jobQ = q.get("job");
+      final List<String> dagIdQ = q.get("dag");
       if (LOG.isDebugEnabled()) {
         LOG.debug("RECV: " + request.getUri() +
             "\n  mapId: " + mapIds +
             "\n  reduceId: " + reduceQ +
             "\n  jobId: " + jobQ +
+            "\n  dagId: " + dagIdQ +
             "\n  keepAlive: " + keepAliveParam);
       }
 
-      if (mapIds == null || reduceQ == null || jobQ == null) {
-        sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+      if (mapIds == null || reduceQ == null || jobQ == null || dagIdQ == null) {
+        sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST);
         return;
       }
       if (reduceQ.size() != 1 || jobQ.size() != 1) {
@@ -933,9 +937,11 @@ public class ShuffleHandler extends AuxiliaryService {
       }
       int reduceId;
       String jobId;
+      String dagId;
       try {
         reduceId = Integer.parseInt(reduceQ.get(0));
         jobId = jobQ.get(0);
+        dagId = dagIdQ.get(0);
       } catch (NumberFormatException e) {
         sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
         return;
@@ -965,7 +971,7 @@ public class ShuffleHandler extends AuxiliaryService {
       String user = userRsrc.get(jobId);
 
       try {
-        populateHeaders(mapIds, jobId, user, reduceId, request,
+        populateHeaders(mapIds, jobId, dagId, user, reduceId, request,
           response, keepAliveParam, mapOutputInfoMap);
       } catch(IOException e) {
         ch.write(response);
@@ -977,7 +983,7 @@ public class ShuffleHandler extends AuxiliaryService {
       ch.write(response);
       //Initialize one ReduceContext object per messageReceived call
       ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
-          user, mapOutputInfoMap, jobId);
+          user, mapOutputInfoMap, jobId, dagId);
       for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
         ChannelFuture nextMap = sendMap(reduceContext);
         if(nextMap == null) {
@@ -1008,8 +1014,9 @@ public class ShuffleHandler extends AuxiliaryService {
         try {
           MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
           if (info == null) {
-            info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
-                reduceContext.getJobId(), reduceContext.getUser());
+            info = getMapOutputInfo(reduceContext.dagId, mapId,
+                reduceContext.getReduceId(), reduceContext.getJobId(),
+                reduceContext.getUser());
           }
           nextMap = sendMapOutput(
               reduceContext.getCtx(),
@@ -1041,7 +1048,7 @@ public class ShuffleHandler extends AuxiliaryService {
       return sb.toString();
     }
 
-    private String getBaseLocation(String jobId, String user) {
+    private String getBaseLocation(String jobId, String dagId, String user) {
       final JobID jobID = JobID.forName(jobId);
       final ApplicationId appID =
           ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
@@ -1049,16 +1056,17 @@ public class ShuffleHandler extends AuxiliaryService {
       final String baseStr =
           USERCACHE + "/" + user + "/"
               + APPCACHE + "/"
-              + appID.toString() + "/output" + "/";
+              + appID.toString() + "/dag_" + dagId + "/output" + "/";
       return baseStr;
     }
 
-    protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-        String jobId, String user) throws IOException {
+    protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+                                             int reduce, String jobId,
+                                             String user) throws IOException {
       AttemptPathInfo pathInfo;
       try {
         AttemptPathIdentifier identifier = new AttemptPathIdentifier(
-            jobId, user, mapId);
+            jobId, dagId, user, mapId);
         pathInfo = pathCache.get(identifier);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Retrieved pathInfo for " + identifier +
@@ -1087,13 +1095,17 @@ public class ShuffleHandler extends AuxiliaryService {
     }
 
     protected void populateHeaders(List<String> mapIds, String jobId,
-        String user, int reduce, HttpRequest request, HttpResponse response,
-        boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
+                                   String dagId, String user,
+                                   int reduce, HttpRequest request,
+                                   HttpResponse response,
+                                   boolean keepAliveParam,
+                                   Map<String, MapOutputInfo> mapOutputInfoMap)
         throws IOException {
 
       long contentLength = 0;
       for (String mapId : mapIds) {
-        MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
+        MapOutputInfo outputInfo =
+            getMapOutputInfo(dagId, mapId, reduce, jobId, user);
         if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
           mapOutputInfoMap.put(mapId, outputInfo);
         }
@@ -1292,11 +1304,14 @@ public class ShuffleHandler extends AuxiliaryService {
 
   static class AttemptPathIdentifier {
     private final String jobId;
+    private final String dagId;
     private final String user;
     private final String attemptId;
 
-    public AttemptPathIdentifier(String jobId, String user, String attemptId) {
+    public AttemptPathIdentifier(String jobId, String dagID, String user,
+                                 String attemptId) {
       this.jobId = jobId;
+      this.dagId = dagID;
       this.user = user;
       this.attemptId = attemptId;
     }
@@ -1315,6 +1330,10 @@ public class ShuffleHandler extends AuxiliaryService {
       if (!attemptId.equals(that.attemptId)) {
         return false;
       }
+      if (dagId != that.dagId) {
+        return false;
+      }
+
       if (!jobId.equals(that.jobId)) {
         return false;
       }
@@ -1325,6 +1344,7 @@ public class ShuffleHandler extends AuxiliaryService {
     @Override
     public int hashCode() {
       int result = jobId.hashCode();
+      result = 31 * result + dagId.hashCode();
       result = 31 * result + attemptId.hashCode();
       return result;
     }
@@ -1332,8 +1352,10 @@ public class ShuffleHandler extends AuxiliaryService {
     @Override
     public String toString() {
       return "AttemptPathIdentifier{" +
-          "attemptId='" + attemptId + '\'' +
-          ", jobId='" + jobId + '\'' +
+          "jobId='" + jobId + '\'' +
+          ", dagId=" + dagId +
+          ", user='" + user + '\'' +
+          ", attemptId='" + attemptId + '\'' +
           '}';
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index c2bf361..31e32b4 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -99,7 +99,6 @@ import org.slf4j.LoggerFactory;
 public class TestShuffleHandler {
   static final long MiB = 1024 * 1024;
   private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandler.class);
-
   class MockShuffleHandler extends org.apache.tez.auxservices.ShuffleHandler {
     @Override
     protected Shuffle getShuffle(final Configuration conf) {
@@ -110,15 +109,19 @@ public class TestShuffleHandler {
             throws IOException {
         }
         @Override
-        protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-            String jobId, String user) throws IOException {
+        protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+                                                 int reduce, String jobId,
+                                                 String user)
+            throws IOException {
           // Do nothing.
           return null;
         }
         @Override
         protected void populateHeaders(List<String> mapIds, String jobId,
-            String user, int reduce, HttpRequest request,
-            HttpResponse response, boolean keepAliveParam,
+                                       String dagId, String user, int reduce,
+                                       HttpRequest request,
+                                       HttpResponse response,
+                                       boolean keepAliveParam,
             Map<String, MapOutputInfo> infoMap) throws IOException {
           // Do nothing.
         }
@@ -232,14 +235,18 @@ public class TestShuffleHandler {
         // replace the shuffle handler with one stubbed for testing
         return new Shuffle(conf) {
           @Override
-          protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-              String jobId, String user) throws IOException {
+          protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+                                                   int reduce, String jobId,
+                                                   String user)
+              throws IOException {
             return null;
           }
           @Override
           protected void populateHeaders(List<String> mapIds, String jobId,
-              String user, int reduce, HttpRequest request,
-              HttpResponse response, boolean keepAliveParam,
+                                         String dagId, String user, int reduce,
+                                         HttpRequest request,
+                                         HttpResponse response,
+                                         boolean keepAliveParam,
               Map<String, MapOutputInfo> infoMap) throws IOException {
             // Only set response headers and skip everything else
             // send some dummy value for content-length
@@ -294,7 +301,7 @@ public class TestShuffleHandler {
     // then closing the connection
     URL url = new URL("http://127.0.0.1:"
       + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-      + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
+      + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
     HttpURLConnection conn = (HttpURLConnection)url.openConnection();
     conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
         ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
@@ -327,8 +334,10 @@ public class TestShuffleHandler {
         // replace the shuffle handler with one stubbed for testing
         return new Shuffle(conf) {
           @Override
-          protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-              String jobId, String user) throws IOException {
+          protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+                                                   int reduce,
+                                                   String jobId, String user)
+              throws IOException {
             return null;
           }
           @Override
@@ -339,9 +348,12 @@ public class TestShuffleHandler {
 
           @Override
           protected void populateHeaders(List<String> mapIds, String jobId,
-              String user, int reduce, HttpRequest request,
-              HttpResponse response, boolean keepAliveParam,
-              Map<String, MapOutputInfo> infoMap) throws IOException {
+                                         String dagId, String user,
+                                         int reduce, HttpRequest request,
+                                         HttpResponse response,
+                                         boolean keepAliveParam,
+                                         Map<String, MapOutputInfo> infoMap)
+              throws IOException {
             // Send some dummy data (populate content length details)
             ShuffleHeader header =
                 new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
@@ -409,7 +421,7 @@ public class TestShuffleHandler {
             + shuffleHandler.getConfig().get(
               ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
     URL url =
-        new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+        new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&"
             + "map=attempt_12345_1_m_1_0");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -429,7 +441,7 @@ public class TestShuffleHandler {
 
     // For keepAlive via URL
     url =
-        new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+        new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&"
             + "map=attempt_12345_1_m_1_0&keepAlive=true");
     conn = (HttpURLConnection) url.openConnection();
     conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -465,7 +477,7 @@ public class TestShuffleHandler {
               + shuffleHandler.getConfig().get(
                 ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
       URL url =
-          new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+          new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&"
               + "map=attempt_12345_1_m_1_0");
       conn = (HttpURLConnection) url.openConnection();
       conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -503,7 +515,7 @@ public class TestShuffleHandler {
     // then closing the connection
     URL url = new URL("http://127.0.0.1:"
       + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-      + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
+      + "/mapOutput?job=job_12345_1&&dag=1reduce=1&map=attempt_12345_1_m_1_0");
     for (int i = 0; i < failureNum; ++i) {
       HttpURLConnection conn = (HttpURLConnection)url.openConnection();
       conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -536,15 +548,19 @@ public class TestShuffleHandler {
         // replace the shuffle handler with one stubbed for testing
         return new Shuffle(conf) {
           @Override
-          protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-              String jobId, String user) throws IOException {
+          protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+                                                   int reduce, String jobId,
+                                                   String user)
+              throws IOException {
             // Do nothing.
             return null;
           }
           @Override
           protected void populateHeaders(List<String> mapIds, String jobId,
-              String user, int reduce, HttpRequest request,
-              HttpResponse response, boolean keepAliveParam,
+                                         String dagId, String user, int reduce,
+                                         HttpRequest request,
+                                         HttpResponse response,
+                                         boolean keepAliveParam,
               Map<String, MapOutputInfo> infoMap) throws IOException {
             // Do nothing.
           }
@@ -585,7 +601,7 @@ public class TestShuffleHandler {
     for (int i = 0; i < connAttempts; i++) {
       String URLstring = "http://127.0.0.1:"
            + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-           + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_"
+           + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_"
            + i + "_0";
       URL url = new URL(URLstring);
       conns[i] = (HttpURLConnection)url.openConnection();
@@ -685,7 +701,7 @@ public class TestShuffleHandler {
               "http://127.0.0.1:"
                   + shuffleHandler.getConfig().get(
                       ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-                  + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+                  + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerId
                   + "&map=attempt_12345_1_m_1_0");
       HttpURLConnection conn = (HttpURLConnection) url.openConnection();
       conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -722,7 +738,8 @@ public class TestShuffleHandler {
         StringUtils.join(Path.SEPARATOR,
             new String[] { logDir.getAbsolutePath(),
                 ShuffleHandler.USERCACHE, user,
-                ShuffleHandler.APPCACHE, appId, "output", appAttemptId });
+                ShuffleHandler.APPCACHE, appId,"dag_1/" + "output",
+                appAttemptId });
     File appAttemptDir = new File(attemptDir);
     appAttemptDir.mkdirs();
     System.out.println(appAttemptDir.getAbsolutePath());
@@ -924,7 +941,8 @@ public class TestShuffleHandler {
       Token<JobTokenIdentifier> jt) throws IOException {
     URL url = new URL("http://127.0.0.1:"
         + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-        + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0");
+        + "/mapOutput?job=job_12345_0001&dag=1&reduce=0" +
+        "&map=attempt_12345_1_m_1_0");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     String encHash = SecureShuffleUtils.hashFromString(
         SecureShuffleUtils.buildMsgFrom(url),
@@ -967,7 +985,7 @@ public class TestShuffleHandler {
         return new Shuffle(conf) {
           @Override
           protected void populateHeaders(List<String> mapIds,
-              String outputBaseStr, String user, int reduce,
+              String outputBaseStr, String dagId, String user, int reduce,
               HttpRequest request, HttpResponse response,
               boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
               throws IOException {
@@ -1021,7 +1039,7 @@ public class TestShuffleHandler {
               "http://127.0.0.1:"
                   + shuffleHandler.getConfig().get(
                       ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-                  + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+                  + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerId
                   + "&map=attempt_12345_1_m_1_0");
       HttpURLConnection conn = (HttpURLConnection) url.openConnection();
       conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -1116,7 +1134,7 @@ public class TestShuffleHandler {
     Mockito.doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
-        String uri = "/mapOutput?job=job_12345_1&reduce=1";
+        String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
         for (int i = 0; i < 100; i++)
           uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
         return uri;

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
index 827cafe..81921b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
@@ -25,6 +25,7 @@ public class Constants {
   // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
   
   public static final String TEZ = "tez";
+  public static final String DAG_PREFIX = "dag_";
 
   public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
   public static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 819423f..c0b7210 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -139,14 +139,18 @@ public class TezRuntimeUtils {
     }
     return partitioner;
   }
-  
-  public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, OutputContext outputContext) {
+
+  public static TezTaskOutput instantiateTaskOutputManager(
+      Configuration conf, OutputContext outputContext) {
     Class<?> clazz = conf.getClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
         TezTaskOutputFiles.class);
     try {
-      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String
+          .class, int.class);
       ctor.setAccessible(true);
-      TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+      TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf,
+          outputContext.getUniqueIdentifier(),
+          outputContext.getDagIdentifier());
       return instance;
     } catch (Exception e) {
       throw new TezUncheckedException(

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 6cbff94..896f532 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -666,10 +666,16 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     return idxRecord;
   }
 
-  private static final String getMapOutputFile(String pathComponent) {
-    return Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
-        + pathComponent + Path.SEPARATOR
-        + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
+  private final String getMapOutputFile(String pathComponent) {
+    String outputPath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
+        pathComponent + Path.SEPARATOR +
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
+
+    if(ShuffleUtils.isTezShuffleHandler(conf)) {
+      return Constants.DAG_PREFIX + this.dagIdentifier + Path.SEPARATOR +
+          outputPath;
+    }
+    return outputPath;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index fa8533c..5d2444c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -620,5 +620,11 @@ public class ShuffleUtils {
         sslFactory);
     return httpConnParams;
   }
+
+  public static boolean isTezShuffleHandler(Configuration config) {
+    return config.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT).
+        contains("tez");
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
index 68c4781..f939cd1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
@@ -65,15 +65,18 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
   
   private volatile long usedMemory = 0;
 
-  public SimpleFetchedInputAllocator(String srcNameTrimmed, String uniqueIdentifier, Configuration conf,
-      long maxTaskAvailableMemory, long memoryAvailable) {
+  public SimpleFetchedInputAllocator(String srcNameTrimmed,
+                                     String uniqueIdentifier, int dagID,
+                                     Configuration conf,
+                                     long maxTaskAvailableMemory,
+                                     long memoryAvailable) {
     this.srcNameTrimmed = srcNameTrimmed;
     this.conf = conf;    
     this.maxAvailableTaskMemory = maxTaskAvailableMemory;
     this.initialMemoryAvailable = memoryAvailable;
     
     this.fileNameAllocator = new TezTaskOutputFiles(conf,
-        uniqueIdentifier);
+        uniqueIdentifier, dagID);
     this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
     
     // Setup configuration

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index bcb75d2..b6599dc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -717,9 +717,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       throws IOException {
     LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
     suffix = suffix != null ? suffix : "";
-
-    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
-        pathComponent + Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
+    String outputPath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
+        pathComponent + Path.SEPARATOR +
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
+    String pathFromLocalDir = getPathForLocalDir(outputPath);
 
     return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
   }
@@ -750,5 +751,12 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       remaining.put(id.toString(), id);
     }
   }
+
+  private String getPathForLocalDir(String suffix) {
+    if(ShuffleUtils.isTezShuffleHandler(conf)) {
+      return Constants.DAG_PREFIX + dagId + Path.SEPARATOR + suffix;
+    }
+    return suffix;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 26bdca7..a6f554c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -177,8 +177,10 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     this.reduceCombineInputCounter = reduceCombineInputCounter;
     this.spilledRecordsCounter = spilledRecordsCounter;
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
-    
+    this.mapOutputFile = new TezTaskOutputFiles(conf,
+        inputContext.getUniqueIdentifier(),
+        inputContext.getDagIdentifier());
+
     this.localFS = localFS;
     this.rfs = ((LocalFileSystem)localFS).getRaw();
     

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
index c41e4a6..414f3d0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.Constants;
 
 /**
  * Manipulate the working area for the transient store for components in tez-runtime-library
@@ -37,6 +38,7 @@ public abstract class TezTaskOutput {
 
   protected final Configuration conf;
   protected final String uniqueId;
+  protected final String dagId;
 
   /**
    * @param conf     the configuration from which local-dirs will be picked up
@@ -45,10 +47,12 @@ public abstract class TezTaskOutput {
    *                 container is used for multiple tasks, this id should be unique for inputs /
    *                 outputs spanning across tasks. This is also expected to be unique across all
    *                 tasks for a vertex.
+   * @param dagID    DAG identifier for the specific job
    */
-  public TezTaskOutput(Configuration conf, String uniqueId) {
+  public TezTaskOutput(Configuration conf, String uniqueId, int dagID) {
     this.conf = conf;
     this.uniqueId = uniqueId;
+    this.dagId = Constants.DAG_PREFIX + dagID + Path.SEPARATOR;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 1e6fca3..97a2509 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 
 /**
  * Manipulate the working area for the transient store for components in tez-runtime-library
@@ -40,9 +41,9 @@ import org.apache.tez.runtime.library.common.Constants;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TezTaskOutputFiles extends TezTaskOutput {
-  
-  public TezTaskOutputFiles(Configuration conf, String uniqueId) {
-    super(conf, uniqueId);
+
+  public TezTaskOutputFiles(Configuration conf, String uniqueId, int dagID) {
+    super(conf, uniqueId, dagID);
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(TezTaskOutputFiles.class);
@@ -60,7 +61,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
     new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
 
   /*
-   * ${appDir}/output/${uniqueId}
+   * if service_id = mapreduce_shuffle  then "${appDir}/output/${uniqueId}"
+   * if service_id = tez_shuffle  then "${appDir}/dagId/output/${uniqueId}"
    */
   private Path getAttemptOutputDir() {
     if (LOG.isDebugEnabled()) {
@@ -68,7 +70,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
           + Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/"
           + uniqueId);
     }
-    return new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, uniqueId);
+    String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+    return new Path(dagPath, uniqueId);
   }
 
 
@@ -201,7 +204,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   public Path getSpillFileForWrite(int spillNumber, long size)
       throws IOException {
     Preconditions.checkArgument(spillNumber >= 0, "Provide a valid spill number " + spillNumber);
-    Path taskAttemptDir = new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR,
+    String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+    Path taskAttemptDir = new Path(dagPath,
         String.format(SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber));
     Path outputDir = new Path(taskAttemptDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
     return lDirAlloc.getLocalPathForWrite(outputDir.toString(), size, conf);
@@ -222,8 +226,9 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   public Path getSpillIndexFileForWrite(int spillNumber, long size)
       throws IOException {
     Preconditions.checkArgument(spillNumber >= 0, "Provide a valid spill number " + spillNumber);
-    Path taskAttemptDir = new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR,
-        String.format(SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber));
+    String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+    Path taskAttemptDir = new Path(dagPath, String.format(
+        SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber));
     Path outputDir = new Path(taskAttemptDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
         Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
     return lDirAlloc.getLocalPathForWrite(outputDir.toString(), size, conf);
@@ -247,7 +252,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   @Override
   public Path getInputFileForWrite(int srcIdentifier,
       int spillNum, long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcIdentifier, spillNum), size, conf);
+    String dagPath = getDagOutputDir(getSpillFileName(srcIdentifier, spillNum));
+    return lDirAlloc.getLocalPathForWrite(dagPath, size, conf);
   }
 
   /**
@@ -265,4 +271,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   public String getSpillFileName(int srcId, int spillNum) {
     return String.format(SPILL_FILE_PATTERN, uniqueId, srcId, spillNum);
   }
+
+  private String getDagOutputDir(String child) {
+    return ShuffleUtils.isTezShuffleHandler(conf) ? dagId.concat(child) : child;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index ec9a191..2d6683a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -134,7 +134,9 @@ public class UnorderedKVInput extends AbstractLogicalInput {
           TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
 
       this.inputManager = new SimpleFetchedInputAllocator(
-          TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()), getContext().getUniqueIdentifier(), conf,
+          TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()),
+          getContext().getUniqueIdentifier(),
+          getContext().getDagIdentifier(), conf,
           getContext().getTotalMemoryAvailableToTask(),
           memoryUpdateCallbackHandler.getMemoryAssigned());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
index 2f89b0f..1b63b17 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
@@ -52,8 +52,9 @@ public class TestSimpleFetchedInputAllocator {
     long inMemThreshold = (long) (bufferPercent * jvmMax);
     LOG.info("InMemThreshold: " + inMemThreshold);
 
-    SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator("srcName", UUID.randomUUID().toString(),
-        conf, Runtime.getRuntime().maxMemory(), inMemThreshold);
+    SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator(
+        "srcName", UUID.randomUUID().toString(), 123, conf,
+        Runtime.getRuntime().maxMemory(), inMemThreshold);
 
     long requestSize = (long) (0.4f * inMemThreshold);
     long compressedSize = 1l;

http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 4a0d1d5..031b44d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -258,6 +258,7 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
+    int dagId = 1;
     OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
     Random random = new Random();
 
@@ -391,7 +392,7 @@ public class TestUnorderedPartitionedKVWriter {
 
     // Verify the data
     // Verify the actual data
-    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
     Path outputFilePath = kvWriter.finalOutPath;
     Path spillFilePath = kvWriter.finalIndexPath;
     if (numRecordsWritten > 0) {
@@ -526,6 +527,7 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
+    int dagId = 1;
     OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
@@ -690,7 +692,7 @@ public class TestUnorderedPartitionedKVWriter {
     verify(outputContext, atLeast(1)).notifyProgress();
 
     // Verify if all spill files are available.
-    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
 
     if (numRecordsWritten > 0) {
       int numSpills = kvWriter.numSpills.get();
@@ -710,6 +712,7 @@ public class TestUnorderedPartitionedKVWriter {
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
+    int dagId = 1;
     OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
 
     Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
@@ -847,7 +850,7 @@ public class TestUnorderedPartitionedKVWriter {
     }
 
     // Verify the actual data
-    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+    TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
     Path outputFilePath = kvWriter.finalOutPath;
     Path spillFilePath = kvWriter.finalIndexPath;