You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/02/10 00:13:56 UTC
tez git commit: TEZ-3612. Tez Shuffle Handler Content length does not
match actual (jeagles)
Repository: tez
Updated Branches:
refs/heads/TEZ-3334 c08eddf01 -> 1a746f7e1
TEZ-3612. Tez Shuffle Handler Content length does not match actual (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a746f7e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a746f7e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a746f7e
Branch: refs/heads/TEZ-3334
Commit: 1a746f7e1ab3bb22d0be7e492ab8402734242fe9
Parents: c08eddf
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Thu Feb 9 18:12:57 2017 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Thu Feb 9 18:12:57 2017 -0600
----------------------------------------------------------------------
TEZ-3334-CHANGES.txt | 1 +
.../apache/tez/auxservices/ShuffleHandler.java | 35 ++++++++++++--------
2 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1a746f7e/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 025f53d..7f0e1ee 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
INCOMPATIBLE CHANGES:
ALL CHANGES:
+ TEZ-3612. Tez Shuffle Handler Content length does not match actual
TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early
TEZ-3606. Fix debug log for empty partitions to the expanded partitionId in the Composite case
TEZ-3604. Remove the compositeInputAttemptIdentifier from remaining list upon fetch completion in the Ordered case
http://git-wip-us.apache.org/repos/asf/tez/blob/1a746f7e/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 91d42d8..85b781f 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -1182,7 +1182,21 @@ public class ShuffleHandler extends AuxiliaryService {
throws IOException {
long contentLength = 0;
+ // Content-Length only needs calculated for keep-alive keep alive
+ if (connectionKeepAliveEnabled || keepAliveParam) {
+ contentLength = getContentLength(mapIds, jobId, dagId, user, reduceRange, mapOutputInfoMap);
+ }
+
+ // Now set the response headers.
+ setResponseHeaders(response, keepAliveParam, contentLength);
+ }
+
+ long getContentLength(List<String> mapIds, String jobId, String dagId, String user, Range reduceRange, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException {
+ long contentLength = 0;
+ // Reduce count is written once per mapId
+ int reduceCountVSize = WritableUtils.getVIntSize(reduceRange.getLast() - reduceRange.getFirst() + 1);
for (String mapId : mapIds) {
+ contentLength += reduceCountVSize;
MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, jobId, user);
if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
mapOutputInfoMap.put(mapId, outputInfo);
@@ -1199,25 +1213,20 @@ public class ShuffleHandler extends AuxiliaryService {
contentLength += indexRecord.getPartLength();
}
}
-
- // Now set the response headers.
- setResponseHeaders(response, keepAliveParam, contentLength);
+ return contentLength;
}
- protected void setResponseHeaders(HttpResponse response,
- boolean keepAliveParam, long contentLength) {
- if (!connectionKeepAliveEnabled && !keepAliveParam) {
+ protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) {
+ if (connectionKeepAliveEnabled || keepAliveParam) {
+ response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength));
+ response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut);
+ LOG.info("Content Length in shuffle : " + contentLength);
+ } else {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting connection close header...");
}
response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
- } else {
- response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
- String.valueOf(contentLength));
- response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
- response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout="
- + connectionKeepAliveTimeOut);
- LOG.info("Content Length in shuffle : " + contentLength);
}
}