You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/10/21 20:17:44 UTC

tez git commit: TEZ-2850. Tez MergeManager OOM for small Map Outputs (jeagles)

Repository: tez
Updated Branches:
  refs/heads/master 8c61a6609 -> a9cfeb914


TEZ-2850. Tez MergeManager OOM for small Map Outputs (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a9cfeb91
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a9cfeb91
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a9cfeb91

Branch: refs/heads/master
Commit: a9cfeb914001c381877657ea39b5de5451740050
Parents: 8c61a66
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Oct 21 13:15:28 2015 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Oct 21 13:16:33 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../runtime/library/common/sort/impl/IFile.java | 25 ++++++++++++--------
 2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a9cfeb91/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2e5bbe7..0a07dde 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2850. Tez MergeManager OOM for small Map Outputs
   TEZ-1888. Fix javac warnings all over codebase.
   TEZ-2886. Ability to merge AM credentials with DAG credentials.
   TEZ-2896. Fix thread names used during Input/Output initialization.
@@ -217,6 +218,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2850. Tez MergeManager OOM for small Map Outputs
   TEZ-2886. Ability to merge AM credentials with DAG credentials.
   TEZ-2896. Fix thread names used during Input/Output initialization.
   TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables
@@ -500,6 +502,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2850. Tez MergeManager OOM for small Map Outputs
   TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
   TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2716. DefaultSorter.isRleNeeded not thread safe

http://git-wip-us.apache.org/repos/asf/tez/blob/a9cfeb91/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 8dcbf6d..20f44dd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -501,10 +501,10 @@ public class IFile {
     protected int recNo = 1;
     protected int originalKeyLength;
     protected int prevKeyLength;
-    protected int currentKeyLength;
-    protected int currentValueLength;
     byte keyBytes[] = new byte[0];
 
+    protected int currentKeyLength;
+    protected int currentValueLength;
     long startPos;
 
     /**
@@ -564,21 +564,26 @@ public class IFile {
                   TezCounter readsCounter, TezCounter bytesReadCounter,
                   boolean readAhead, int readAheadLength,
                   int bufferSize, boolean isCompressed) throws IOException {
-      checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/*, isCompressed*/);
-      if (isCompressed && codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-        if (decompressor != null) {
-          this.in = codec.createInputStream(checksumIn, decompressor);
+      if (in != null) {
+        checksumIn = new IFileInputStream(in, length, readAhead,
+            readAheadLength/* , isCompressed */);
+        if (isCompressed && codec != null) {
+          decompressor = CodecPool.getDecompressor(codec);
+          if (decompressor != null) {
+            this.in = codec.createInputStream(checksumIn, decompressor);
+          } else {
+            LOG.warn("Could not obtain decompressor from CodecPool");
+            this.in = checksumIn;
+          }
         } else {
-          LOG.warn("Could not obtain decompressor from CodecPool");
           this.in = checksumIn;
         }
+        startPos = checksumIn.getPosition();
       } else {
-        this.in = checksumIn;
+        this.in = null;
       }
 
       this.dataIn = new DataInputStream(this.in);
-      startPos = checksumIn.getPosition();
       this.readRecordsCounter = readsCounter;
       this.bytesReadCounter = bytesReadCounter;
       this.fileLength = length;