You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/08/12 21:45:46 UTC
tez git commit: TEZ-3408. Allow Task Output Files to reside in DAG
specific directories for Custom Shuffle Handler (Kuhu Shukla via jeagles)
Repository: tez
Updated Branches:
refs/heads/TEZ-3334 53ea6f5b3 -> 0e1d27743
TEZ-3408. Allow Task Output Files to reside in DAG specific directories for Custom Shuffle Handler (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0e1d2774
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0e1d2774
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0e1d2774
Branch: refs/heads/TEZ-3334
Commit: 0e1d2774307b196a9e763d52327570d8306630cb
Parents: 53ea6f5
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Aug 12 16:45:26 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Aug 12 16:45:26 2016 -0500
----------------------------------------------------------------------
TEZ-3334-CHANGES.txt | 1 +
.../processor/map/TestMapProcessor.java | 4 +-
.../apache/tez/auxservices/ShuffleHandler.java | 60 ++++++++++-----
.../tez/auxservices/TestShuffleHandler.java | 78 ++++++++++++--------
.../tez/runtime/library/common/Constants.java | 1 +
.../runtime/library/common/TezRuntimeUtils.java | 12 ++-
.../runtime/library/common/shuffle/Fetcher.java | 14 +++-
.../library/common/shuffle/ShuffleUtils.java | 6 ++
.../impl/SimpleFetchedInputAllocator.java | 9 ++-
.../orderedgrouped/FetcherOrderedGrouped.java | 14 +++-
.../shuffle/orderedgrouped/MergeManager.java | 6 +-
.../common/task/local/output/TezTaskOutput.java | 6 +-
.../task/local/output/TezTaskOutputFiles.java | 28 ++++---
.../runtime/library/input/UnorderedKVInput.java | 4 +-
.../impl/TestSimpleFetchedInputAllocator.java | 5 +-
.../TestUnorderedPartitionedKVWriter.java | 9 ++-
16 files changed, 175 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 2122cca..749edfe 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
INCOMPATIBLE CHANGES:
ALL CHANGES:
+ TEZ-3408. Allow Task Output Files to reside in DAG specific directories for Custom Shuffle Handler
TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to \u2018mapreduce_shuffle\u2019
TEZ-3390. Package Shuffle Handler as a shaded uber-jar
TEZ-3378. Move Shuffle Handler configuration into the Tez namespace
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 70f8763..b8f989c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -154,7 +154,9 @@ public class TestMapProcessor {
task.close();
OutputContext outputContext = task.getOutputContexts().iterator().next();
- TezTaskOutput mapOutputs = new TezTaskOutputFiles(jobConf, outputContext.getUniqueIdentifier());
+ TezTaskOutput mapOutputs = new TezTaskOutputFiles(
+ jobConf, outputContext.getUniqueIdentifier(),
+ outputContext.getDagIdentifier());
// TODO NEWTEZ FIXME OutputCommitter verification
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index af50cbf..d11dd2c 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -306,14 +306,16 @@ public class ShuffleHandler extends AuxiliaryService {
private String user;
private Map<String, Shuffle.MapOutputInfo> infoMap;
private String jobId;
+ private String dagId;
public ReduceContext(List<String> mapIds, int rId,
ChannelHandlerContext context, String usr,
Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
- String jobId) {
+ String jobId, String dagId) {
this.mapIds = mapIds;
this.reduceId = rId;
+ this.dagId = dagId;
/**
* Atomic count for tracking the no. of map outputs that are yet to
* complete. Multiple futureListeners' operationComplete() can decrement
@@ -828,7 +830,7 @@ public class ShuffleHandler extends AuxiliaryService {
@Override
public AttemptPathInfo load(AttemptPathIdentifier key) throws
Exception {
- String base = getBaseLocation(key.jobId, key.user);
+ String base = getBaseLocation(key.jobId, key.dagId, key.user);
String attemptBase = base + key.attemptId;
Path indexFileName = lDirAlloc.getLocalPathToRead(
attemptBase + "/" + INDEX_FILE_NAME, conf);
@@ -907,16 +909,18 @@ public class ShuffleHandler extends AuxiliaryService {
final List<String> mapIds = splitMaps(q.get("map"));
final List<String> reduceQ = q.get("reduce");
final List<String> jobQ = q.get("job");
+ final List<String> dagIdQ = q.get("dag");
if (LOG.isDebugEnabled()) {
LOG.debug("RECV: " + request.getUri() +
"\n mapId: " + mapIds +
"\n reduceId: " + reduceQ +
"\n jobId: " + jobQ +
+ "\n dagId: " + dagIdQ +
"\n keepAlive: " + keepAliveParam);
}
- if (mapIds == null || reduceQ == null || jobQ == null) {
- sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+ if (mapIds == null || reduceQ == null || jobQ == null || dagIdQ == null) {
+ sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST);
return;
}
if (reduceQ.size() != 1 || jobQ.size() != 1) {
@@ -933,9 +937,11 @@ public class ShuffleHandler extends AuxiliaryService {
}
int reduceId;
String jobId;
+ String dagId;
try {
reduceId = Integer.parseInt(reduceQ.get(0));
jobId = jobQ.get(0);
+ dagId = dagIdQ.get(0);
} catch (NumberFormatException e) {
sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
return;
@@ -965,7 +971,7 @@ public class ShuffleHandler extends AuxiliaryService {
String user = userRsrc.get(jobId);
try {
- populateHeaders(mapIds, jobId, user, reduceId, request,
+ populateHeaders(mapIds, jobId, dagId, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
ch.write(response);
@@ -977,7 +983,7 @@ public class ShuffleHandler extends AuxiliaryService {
ch.write(response);
//Initialize one ReduceContext object per messageReceived call
ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
- user, mapOutputInfoMap, jobId);
+ user, mapOutputInfoMap, jobId, dagId);
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
@@ -1008,8 +1014,9 @@ public class ShuffleHandler extends AuxiliaryService {
try {
MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
if (info == null) {
- info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
- reduceContext.getJobId(), reduceContext.getUser());
+ info = getMapOutputInfo(reduceContext.dagId, mapId,
+ reduceContext.getReduceId(), reduceContext.getJobId(),
+ reduceContext.getUser());
}
nextMap = sendMapOutput(
reduceContext.getCtx(),
@@ -1041,7 +1048,7 @@ public class ShuffleHandler extends AuxiliaryService {
return sb.toString();
}
- private String getBaseLocation(String jobId, String user) {
+ private String getBaseLocation(String jobId, String dagId, String user) {
final JobID jobID = JobID.forName(jobId);
final ApplicationId appID =
ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
@@ -1049,16 +1056,17 @@ public class ShuffleHandler extends AuxiliaryService {
final String baseStr =
USERCACHE + "/" + user + "/"
+ APPCACHE + "/"
- + appID.toString() + "/output" + "/";
+ + appID.toString() + "/dag_" + dagId + "/output" + "/";
return baseStr;
}
- protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
- String jobId, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+ int reduce, String jobId,
+ String user) throws IOException {
AttemptPathInfo pathInfo;
try {
AttemptPathIdentifier identifier = new AttemptPathIdentifier(
- jobId, user, mapId);
+ jobId, dagId, user, mapId);
pathInfo = pathCache.get(identifier);
if (LOG.isDebugEnabled()) {
LOG.debug("Retrieved pathInfo for " + identifier +
@@ -1087,13 +1095,17 @@ public class ShuffleHandler extends AuxiliaryService {
}
protected void populateHeaders(List<String> mapIds, String jobId,
- String user, int reduce, HttpRequest request, HttpResponse response,
- boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
+ String dagId, String user,
+ int reduce, HttpRequest request,
+ HttpResponse response,
+ boolean keepAliveParam,
+ Map<String, MapOutputInfo> mapOutputInfoMap)
throws IOException {
long contentLength = 0;
for (String mapId : mapIds) {
- MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
+ MapOutputInfo outputInfo =
+ getMapOutputInfo(dagId, mapId, reduce, jobId, user);
if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
mapOutputInfoMap.put(mapId, outputInfo);
}
@@ -1292,11 +1304,14 @@ public class ShuffleHandler extends AuxiliaryService {
static class AttemptPathIdentifier {
private final String jobId;
+ private final String dagId;
private final String user;
private final String attemptId;
- public AttemptPathIdentifier(String jobId, String user, String attemptId) {
+ public AttemptPathIdentifier(String jobId, String dagID, String user,
+ String attemptId) {
this.jobId = jobId;
+ this.dagId = dagID;
this.user = user;
this.attemptId = attemptId;
}
@@ -1315,6 +1330,10 @@ public class ShuffleHandler extends AuxiliaryService {
if (!attemptId.equals(that.attemptId)) {
return false;
}
+ if (dagId != that.dagId) {
+ return false;
+ }
+
if (!jobId.equals(that.jobId)) {
return false;
}
@@ -1325,6 +1344,7 @@ public class ShuffleHandler extends AuxiliaryService {
@Override
public int hashCode() {
int result = jobId.hashCode();
+ result = 31 * result + dagId.hashCode();
result = 31 * result + attemptId.hashCode();
return result;
}
@@ -1332,8 +1352,10 @@ public class ShuffleHandler extends AuxiliaryService {
@Override
public String toString() {
return "AttemptPathIdentifier{" +
- "attemptId='" + attemptId + '\'' +
- ", jobId='" + jobId + '\'' +
+ "jobId='" + jobId + '\'' +
+ ", dagId=" + dagId +
+ ", user='" + user + '\'' +
+ ", attemptId='" + attemptId + '\'' +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index c2bf361..31e32b4 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -99,7 +99,6 @@ import org.slf4j.LoggerFactory;
public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandler.class);
-
class MockShuffleHandler extends org.apache.tez.auxservices.ShuffleHandler {
@Override
protected Shuffle getShuffle(final Configuration conf) {
@@ -110,15 +109,19 @@ public class TestShuffleHandler {
throws IOException {
}
@Override
- protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
- String jobId, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+ int reduce, String jobId,
+ String user)
+ throws IOException {
// Do nothing.
return null;
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
- String user, int reduce, HttpRequest request,
- HttpResponse response, boolean keepAliveParam,
+ String dagId, String user, int reduce,
+ HttpRequest request,
+ HttpResponse response,
+ boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Do nothing.
}
@@ -232,14 +235,18 @@ public class TestShuffleHandler {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
- protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
- String jobId, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+ int reduce, String jobId,
+ String user)
+ throws IOException {
return null;
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
- String user, int reduce, HttpRequest request,
- HttpResponse response, boolean keepAliveParam,
+ String dagId, String user, int reduce,
+ HttpRequest request,
+ HttpResponse response,
+ boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Only set response headers and skip everything else
// send some dummy value for content-length
@@ -294,7 +301,7 @@ public class TestShuffleHandler {
// then closing the connection
URL url = new URL("http://127.0.0.1:"
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
- + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
+ + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
@@ -327,8 +334,10 @@ public class TestShuffleHandler {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
- protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
- String jobId, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+ int reduce,
+ String jobId, String user)
+ throws IOException {
return null;
}
@Override
@@ -339,9 +348,12 @@ public class TestShuffleHandler {
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
- String user, int reduce, HttpRequest request,
- HttpResponse response, boolean keepAliveParam,
- Map<String, MapOutputInfo> infoMap) throws IOException {
+ String dagId, String user,
+ int reduce, HttpRequest request,
+ HttpResponse response,
+ boolean keepAliveParam,
+ Map<String, MapOutputInfo> infoMap)
+ throws IOException {
// Send some dummy data (populate content length details)
ShuffleHeader header =
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
@@ -409,7 +421,7 @@ public class TestShuffleHandler {
+ shuffleHandler.getConfig().get(
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
URL url =
- new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+ new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&"
+ "map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -429,7 +441,7 @@ public class TestShuffleHandler {
// For keepAlive via URL
url =
- new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+ new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&"
+ "map=attempt_12345_1_m_1_0&keepAlive=true");
conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -465,7 +477,7 @@ public class TestShuffleHandler {
+ shuffleHandler.getConfig().get(
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
URL url =
- new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+ new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&"
+ "map=attempt_12345_1_m_1_0");
conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -503,7 +515,7 @@ public class TestShuffleHandler {
// then closing the connection
URL url = new URL("http://127.0.0.1:"
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
- + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
+ + "/mapOutput?job=job_12345_1&&dag=1reduce=1&map=attempt_12345_1_m_1_0");
for (int i = 0; i < failureNum; ++i) {
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -536,15 +548,19 @@ public class TestShuffleHandler {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
- protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
- String jobId, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
+ int reduce, String jobId,
+ String user)
+ throws IOException {
// Do nothing.
return null;
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
- String user, int reduce, HttpRequest request,
- HttpResponse response, boolean keepAliveParam,
+ String dagId, String user, int reduce,
+ HttpRequest request,
+ HttpResponse response,
+ boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Do nothing.
}
@@ -585,7 +601,7 @@ public class TestShuffleHandler {
for (int i = 0; i < connAttempts; i++) {
String URLstring = "http://127.0.0.1:"
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
- + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_"
+ + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_"
+ i + "_0";
URL url = new URL(URLstring);
conns[i] = (HttpURLConnection)url.openConnection();
@@ -685,7 +701,7 @@ public class TestShuffleHandler {
"http://127.0.0.1:"
+ shuffleHandler.getConfig().get(
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
- + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+ + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerId
+ "&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -722,7 +738,8 @@ public class TestShuffleHandler {
StringUtils.join(Path.SEPARATOR,
new String[] { logDir.getAbsolutePath(),
ShuffleHandler.USERCACHE, user,
- ShuffleHandler.APPCACHE, appId, "output", appAttemptId });
+ ShuffleHandler.APPCACHE, appId,"dag_1/" + "output",
+ appAttemptId });
File appAttemptDir = new File(attemptDir);
appAttemptDir.mkdirs();
System.out.println(appAttemptDir.getAbsolutePath());
@@ -924,7 +941,8 @@ public class TestShuffleHandler {
Token<JobTokenIdentifier> jt) throws IOException {
URL url = new URL("http://127.0.0.1:"
+ shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
- + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0");
+ + "/mapOutput?job=job_12345_0001&dag=1&reduce=0" +
+ "&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
String encHash = SecureShuffleUtils.hashFromString(
SecureShuffleUtils.buildMsgFrom(url),
@@ -967,7 +985,7 @@ public class TestShuffleHandler {
return new Shuffle(conf) {
@Override
protected void populateHeaders(List<String> mapIds,
- String outputBaseStr, String user, int reduce,
+ String outputBaseStr, String dagId, String user, int reduce,
HttpRequest request, HttpResponse response,
boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
throws IOException {
@@ -1021,7 +1039,7 @@ public class TestShuffleHandler {
"http://127.0.0.1:"
+ shuffleHandler.getConfig().get(
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
- + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+ + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerId
+ "&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
@@ -1116,7 +1134,7 @@ public class TestShuffleHandler {
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
- String uri = "/mapOutput?job=job_12345_1&reduce=1";
+ String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
for (int i = 0; i < 100; i++)
uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
return uri;
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
index 827cafe..81921b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
@@ -25,6 +25,7 @@ public class Constants {
// TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
public static final String TEZ = "tez";
+ public static final String DAG_PREFIX = "dag_";
public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
public static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 819423f..c0b7210 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -139,14 +139,18 @@ public class TezRuntimeUtils {
}
return partitioner;
}
-
- public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, OutputContext outputContext) {
+
+ public static TezTaskOutput instantiateTaskOutputManager(
+ Configuration conf, OutputContext outputContext) {
Class<?> clazz = conf.getClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
TezTaskOutputFiles.class);
try {
- Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+ Constructor<?> ctor = clazz.getConstructor(Configuration.class, String
+ .class, int.class);
ctor.setAccessible(true);
- TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+ TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf,
+ outputContext.getUniqueIdentifier(),
+ outputContext.getDagIdentifier());
return instance;
} catch (Exception e) {
throw new TezUncheckedException(
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 6cbff94..896f532 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -666,10 +666,16 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
return idxRecord;
}
- private static final String getMapOutputFile(String pathComponent) {
- return Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
- + pathComponent + Path.SEPARATOR
- + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
+ private final String getMapOutputFile(String pathComponent) {
+ String outputPath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
+ pathComponent + Path.SEPARATOR +
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
+
+ if(ShuffleUtils.isTezShuffleHandler(conf)) {
+ return Constants.DAG_PREFIX + this.dagIdentifier + Path.SEPARATOR +
+ outputPath;
+ }
+ return outputPath;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index fa8533c..5d2444c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -620,5 +620,11 @@ public class ShuffleUtils {
sslFactory);
return httpConnParams;
}
+
+ public static boolean isTezShuffleHandler(Configuration config) {
+ return config.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT).
+ contains("tez");
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
index 68c4781..f939cd1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
@@ -65,15 +65,18 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
private volatile long usedMemory = 0;
- public SimpleFetchedInputAllocator(String srcNameTrimmed, String uniqueIdentifier, Configuration conf,
- long maxTaskAvailableMemory, long memoryAvailable) {
+ public SimpleFetchedInputAllocator(String srcNameTrimmed,
+ String uniqueIdentifier, int dagID,
+ Configuration conf,
+ long maxTaskAvailableMemory,
+ long memoryAvailable) {
this.srcNameTrimmed = srcNameTrimmed;
this.conf = conf;
this.maxAvailableTaskMemory = maxTaskAvailableMemory;
this.initialMemoryAvailable = memoryAvailable;
this.fileNameAllocator = new TezTaskOutputFiles(conf,
- uniqueIdentifier);
+ uniqueIdentifier, dagID);
this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
// Setup configuration
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index bcb75d2..b6599dc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -717,9 +717,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
throws IOException {
LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
suffix = suffix != null ? suffix : "";
-
- String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
- pathComponent + Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
+ String outputPath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
+ pathComponent + Path.SEPARATOR +
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
+ String pathFromLocalDir = getPathForLocalDir(outputPath);
return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
}
@@ -750,5 +751,12 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
remaining.put(id.toString(), id);
}
}
+
+ private String getPathForLocalDir(String suffix) {
+ if(ShuffleUtils.isTezShuffleHandler(conf)) {
+ return Constants.DAG_PREFIX + dagId + Path.SEPARATOR + suffix;
+ }
+ return suffix;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 26bdca7..a6f554c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -177,8 +177,10 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
- this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
-
+ this.mapOutputFile = new TezTaskOutputFiles(conf,
+ inputContext.getUniqueIdentifier(),
+ inputContext.getDagIdentifier());
+
this.localFS = localFS;
this.rfs = ((LocalFileSystem)localFS).getRaw();
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
index c41e4a6..414f3d0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.Constants;
/**
* Manipulate the working area for the transient store for components in tez-runtime-library
@@ -37,6 +38,7 @@ public abstract class TezTaskOutput {
protected final Configuration conf;
protected final String uniqueId;
+ protected final String dagId;
/**
* @param conf the configuration from which local-dirs will be picked up
@@ -45,10 +47,12 @@ public abstract class TezTaskOutput {
* container is used for multiple tasks, this id should be unique for inputs /
* outputs spanning across tasks. This is also expected to be unique across all
* tasks for a vertex.
+ * @param dagID DAG identifier for the specific job
*/
- public TezTaskOutput(Configuration conf, String uniqueId) {
+ public TezTaskOutput(Configuration conf, String uniqueId, int dagID) {
this.conf = conf;
this.uniqueId = uniqueId;
+ this.dagId = Constants.DAG_PREFIX + dagID + Path.SEPARATOR;
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 1e6fca3..97a2509 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
/**
* Manipulate the working area for the transient store for components in tez-runtime-library
@@ -40,9 +41,9 @@ import org.apache.tez.runtime.library.common.Constants;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TezTaskOutputFiles extends TezTaskOutput {
-
- public TezTaskOutputFiles(Configuration conf, String uniqueId) {
- super(conf, uniqueId);
+
+ public TezTaskOutputFiles(Configuration conf, String uniqueId, int dagID) {
+ super(conf, uniqueId, dagID);
}
private static final Logger LOG = LoggerFactory.getLogger(TezTaskOutputFiles.class);
@@ -60,7 +61,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
/*
- * ${appDir}/output/${uniqueId}
+ * if service_id = mapreduce_shuffle then "${appDir}/output/${uniqueId}"
+ * if service_id = tez_shuffle then "${appDir}/dagId/output/${uniqueId}"
*/
private Path getAttemptOutputDir() {
if (LOG.isDebugEnabled()) {
@@ -68,7 +70,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/"
+ uniqueId);
}
- return new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, uniqueId);
+ String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+ return new Path(dagPath, uniqueId);
}
@@ -201,7 +204,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
public Path getSpillFileForWrite(int spillNumber, long size)
throws IOException {
Preconditions.checkArgument(spillNumber >= 0, "Provide a valid spill number " + spillNumber);
- Path taskAttemptDir = new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR,
+ String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+ Path taskAttemptDir = new Path(dagPath,
String.format(SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber));
Path outputDir = new Path(taskAttemptDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
return lDirAlloc.getLocalPathForWrite(outputDir.toString(), size, conf);
@@ -222,8 +226,9 @@ public class TezTaskOutputFiles extends TezTaskOutput {
public Path getSpillIndexFileForWrite(int spillNumber, long size)
throws IOException {
Preconditions.checkArgument(spillNumber >= 0, "Provide a valid spill number " + spillNumber);
- Path taskAttemptDir = new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR,
- String.format(SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber));
+ String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+ Path taskAttemptDir = new Path(dagPath, String.format(
+ SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber));
Path outputDir = new Path(taskAttemptDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
return lDirAlloc.getLocalPathForWrite(outputDir.toString(), size, conf);
@@ -247,7 +252,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
@Override
public Path getInputFileForWrite(int srcIdentifier,
int spillNum, long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcIdentifier, spillNum), size, conf);
+ String dagPath = getDagOutputDir(getSpillFileName(srcIdentifier, spillNum));
+ return lDirAlloc.getLocalPathForWrite(dagPath, size, conf);
}
/**
@@ -265,4 +271,8 @@ public class TezTaskOutputFiles extends TezTaskOutput {
public String getSpillFileName(int srcId, int spillNum) {
return String.format(SPILL_FILE_PATTERN, uniqueId, srcId, spillNum);
}
+
+ private String getDagOutputDir(String child) {
+ return ShuffleUtils.isTezShuffleHandler(conf) ? dagId.concat(child) : child;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index ec9a191..2d6683a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -134,7 +134,9 @@ public class UnorderedKVInput extends AbstractLogicalInput {
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
this.inputManager = new SimpleFetchedInputAllocator(
- TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()), getContext().getUniqueIdentifier(), conf,
+ TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()),
+ getContext().getUniqueIdentifier(),
+ getContext().getDagIdentifier(), conf,
getContext().getTotalMemoryAvailableToTask(),
memoryUpdateCallbackHandler.getMemoryAssigned());
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
index 2f89b0f..1b63b17 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java
@@ -52,8 +52,9 @@ public class TestSimpleFetchedInputAllocator {
long inMemThreshold = (long) (bufferPercent * jvmMax);
LOG.info("InMemThreshold: " + inMemThreshold);
- SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator("srcName", UUID.randomUUID().toString(),
- conf, Runtime.getRuntime().maxMemory(), inMemThreshold);
+ SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator(
+ "srcName", UUID.randomUUID().toString(), 123, conf,
+ Runtime.getRuntime().maxMemory(), inMemThreshold);
long requestSize = (long) (0.4f * inMemThreshold);
long compressedSize = 1l;
http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 4a0d1d5..031b44d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -258,6 +258,7 @@ public class TestUnorderedPartitionedKVWriter {
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
+ int dagId = 1;
OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
Random random = new Random();
@@ -391,7 +392,7 @@ public class TestUnorderedPartitionedKVWriter {
// Verify the data
// Verify the actual data
- TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+ TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
Path outputFilePath = kvWriter.finalOutPath;
Path spillFilePath = kvWriter.finalIndexPath;
if (numRecordsWritten > 0) {
@@ -526,6 +527,7 @@ public class TestUnorderedPartitionedKVWriter {
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
+ int dagId = 1;
OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
@@ -690,7 +692,7 @@ public class TestUnorderedPartitionedKVWriter {
verify(outputContext, atLeast(1)).notifyProgress();
// Verify if all spill files are available.
- TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+ TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
if (numRecordsWritten > 0) {
int numSpills = kvWriter.numSpills.get();
@@ -710,6 +712,7 @@ public class TestUnorderedPartitionedKVWriter {
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
+ int dagId = 1;
OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf);
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
@@ -847,7 +850,7 @@ public class TestUnorderedPartitionedKVWriter {
}
// Verify the actual data
- TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+ TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
Path outputFilePath = kvWriter.finalOutPath;
Path spillFilePath = kvWriter.finalIndexPath;