You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/02/10 00:13:56 UTC

tez git commit: TEZ-3612. Tez Shuffle Handler Content length does not match actual (jeagles)

Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 c08eddf01 -> 1a746f7e1


TEZ-3612. Tez Shuffle Handler Content length does not match actual (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a746f7e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a746f7e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a746f7e

Branch: refs/heads/TEZ-3334
Commit: 1a746f7e1ab3bb22d0be7e492ab8402734242fe9
Parents: c08eddf
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Thu Feb 9 18:12:57 2017 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Thu Feb 9 18:12:57 2017 -0600

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  1 +
 .../apache/tez/auxservices/ShuffleHandler.java  | 35 ++++++++++++--------
 2 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1a746f7e/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 025f53d..7f0e1ee 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3612. Tez Shuffle Handler Content length does not match actual
   TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early
   TEZ-3606. Fix debug log for empty partitions to the expanded partitionId in the Composite case
   TEZ-3604. Remove the compositeInputAttemptIdentifier from remaining list upon fetch completion in the Ordered case

http://git-wip-us.apache.org/repos/asf/tez/blob/1a746f7e/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 91d42d8..85b781f 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -1182,7 +1182,21 @@ public class ShuffleHandler extends AuxiliaryService {
         throws IOException {
 
       long contentLength = 0;
+      // Content-Length only needs calculated for keep-alive keep alive
+      if (connectionKeepAliveEnabled || keepAliveParam) {
+        contentLength = getContentLength(mapIds, jobId, dagId, user, reduceRange, mapOutputInfoMap);
+      }
+
+      // Now set the response headers.
+      setResponseHeaders(response, keepAliveParam, contentLength);
+    }
+
+    long getContentLength(List<String> mapIds, String jobId, String dagId, String user, Range reduceRange, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException {
+      long contentLength = 0;
+      // Reduce count is written once per mapId
+      int reduceCountVSize = WritableUtils.getVIntSize(reduceRange.getLast() - reduceRange.getFirst() + 1);
       for (String mapId : mapIds) {
+        contentLength += reduceCountVSize;
         MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, jobId, user);
         if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
           mapOutputInfoMap.put(mapId, outputInfo);
@@ -1199,25 +1213,20 @@ public class ShuffleHandler extends AuxiliaryService {
           contentLength += indexRecord.getPartLength();
         }
       }
-
-      // Now set the response headers.
-      setResponseHeaders(response, keepAliveParam, contentLength);
+      return contentLength;
     }
 
-    protected void setResponseHeaders(HttpResponse response,
-        boolean keepAliveParam, long contentLength) {
-      if (!connectionKeepAliveEnabled && !keepAliveParam) {
+    protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) {
+      if (connectionKeepAliveEnabled || keepAliveParam) {
+        response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength));
+        response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+        response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut);
+        LOG.info("Content Length in shuffle : " + contentLength);
+      } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Setting connection close header...");
         }
         response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
-      } else {
-        response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
-          String.valueOf(contentLength));
-        response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-        response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout="
-            + connectionKeepAliveTimeOut);
-        LOG.info("Content Length in shuffle : " + contentLength);
       }
     }