You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ks...@apache.org on 2018/07/06 14:21:54 UTC
tez git commit: TEZ-3954. Reduce Tez Shuffle Handler Memory needs for
holding TezIndexRecords (Jonathan Eagles via kshukla)
Repository: tez
Updated Branches:
refs/heads/master 06757e9d0 -> 3baab5581
TEZ-3954. Reduce Tez Shuffle Handler Memory needs for holding TezIndexRecords (Jonathan Eagles via kshukla)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3baab558
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3baab558
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3baab558
Branch: refs/heads/master
Commit: 3baab55810ce5477c4048125e2b192bde9ec134d
Parents: 06757e9
Author: Kuhu Shukla <ks...@yahoo-inc.com>
Authored: Fri Jul 6 09:16:44 2018 -0500
Committer: Kuhu Shukla <ks...@yahoo-inc.com>
Committed: Fri Jul 6 09:16:44 2018 -0500
----------------------------------------------------------------------
.../apache/tez/auxservices/ShuffleHandler.java | 50 ++++++++++++++++----
.../tez/auxservices/TestShuffleHandler.java | 8 ++--
2 files changed, 45 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3baab558/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index e22928e..24a821f 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -1146,7 +1146,7 @@ public class ShuffleHandler extends AuxiliaryService {
try {
MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
if (info == null) {
- info = getMapOutputInfo(reduceContext.dagId, mapId,
+ info = getMapOutputInfo(reduceContext.dagId, mapId, reduceContext.getReduceRange(),
reduceContext.getJobId(),
reduceContext.getUser());
}
@@ -1204,7 +1204,7 @@ public class ShuffleHandler extends AuxiliaryService {
}
protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
- String jobId,
+ Range reduceRange, String jobId,
String user) throws IOException {
AttemptPathInfo pathInfo;
try {
@@ -1233,8 +1233,13 @@ public class ShuffleHandler extends AuxiliaryService {
pathInfo.indexPath);
}
+ MapOutputInfo outputInfo;
+ if (reduceRange.first == reduceRange.last) {
+ outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord.getIndex(reduceRange.first), reduceRange);
+ } else {
- MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord);
+ outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord, reduceRange);
+ }
return outputInfo;
}
@@ -1262,12 +1267,12 @@ public class ShuffleHandler extends AuxiliaryService {
int reduceCountVSize = WritableUtils.getVIntSize(reduceRange.getLast() - reduceRange.getFirst() + 1);
for (String mapId : mapIds) {
contentLength += reduceCountVSize;
- MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, jobId, user);
+ MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, reduceRange, jobId, user);
if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
mapOutputInfoMap.put(mapId, outputInfo);
}
for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
- TezIndexRecord indexRecord = outputInfo.spillRecord.getIndex(reduce);
+ TezIndexRecord indexRecord = outputInfo.getIndex(reduce);
ShuffleHeader header =
new ShuffleHeader(mapId, indexRecord.getPartLength(), indexRecord.getRawLength(), reduce);
@@ -1295,12 +1300,37 @@ public class ShuffleHandler extends AuxiliaryService {
}
class MapOutputInfo {
- final Path mapOutputFileName;
- final TezSpillRecord spillRecord;
+ private final Path mapOutputFileName;
+ private TezSpillRecord spillRecord;
+ private TezIndexRecord indexRecord;
+ private final Range reduceRange;
+
+ MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord, Range reduceRange) {
+ this.mapOutputFileName = mapOutputFileName;
+ this.indexRecord = indexRecord;
+ this.reduceRange = reduceRange;
+ }
- MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord) {
+ MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord, Range reduceRange) {
this.mapOutputFileName = mapOutputFileName;
this.spillRecord = spillRecord;
+ this.reduceRange = reduceRange;
+ }
+
+ TezIndexRecord getIndex(int index) {
+ if (index < reduceRange.first || index > reduceRange.last) {
+ throw new IllegalArgumentException("Reduce Index: " + index + " out of range for " + mapOutputFileName);
+ }
+ if (spillRecord != null) {
+ return spillRecord.getIndex(index);
+ } else {
+ return indexRecord;
+ }
+ }
+
+ public void finish() {
+ spillRecord = null;
+ indexRecord = null;
}
}
@@ -1356,7 +1386,7 @@ public class ShuffleHandler extends AuxiliaryService {
WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1);
ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
- TezIndexRecord index = outputInfo.spillRecord.getIndex(reduce);
+ TezIndexRecord index = outputInfo.getIndex(reduce);
// Records are only valid if they have a non-zero part length
if (index.getPartLength() != 0) {
if (firstIndex == null) {
@@ -1368,6 +1398,8 @@ public class ShuffleHandler extends AuxiliaryService {
ShuffleHeader header = new ShuffleHeader(mapId, index.getPartLength(), index.getRawLength(), reduce);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
+ // Free the memory needed to store the spill and index records
+ outputInfo.finish();
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3baab558/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 11c92fb..7d53abc 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -111,7 +111,7 @@ public class TestShuffleHandler {
}
@Override
protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
- String jobId,
+ Range reduceRange, String jobId,
String user)
throws IOException {
// Do nothing.
@@ -236,7 +236,7 @@ public class TestShuffleHandler {
return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
- String jobId,
+ Range reduceRange, String jobId,
String user)
throws IOException {
return null;
@@ -346,7 +346,7 @@ public class TestShuffleHandler {
return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
- String jobId, String user)
+ Range reduceRange, String jobId, String user)
throws IOException {
return null;
}
@@ -568,7 +568,7 @@ public class TestShuffleHandler {
return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,
- String jobId,
+ Range reduceRange, String jobId,
String user)
throws IOException {
// Do nothing.