You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ks...@apache.org on 2017/06/02 22:46:27 UTC

tez git commit: TEZ-3732. Reduce Object size of InputAttemptIdentifier and MapOutput for large jobs (Jonathan Eagles via kshukla)

Repository: tez
Updated Branches:
  refs/heads/master 0ee044c0e -> 29b45bc11


TEZ-3732. Reduce Object size of InputAttemptIdentifier and MapOutput for large jobs (Jonathan Eagles via kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/29b45bc1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/29b45bc1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/29b45bc1

Branch: refs/heads/master
Commit: 29b45bc1140a3a3a44a59a53375db1941184ba18
Parents: 0ee044c
Author: Kuhu Shukla <ks...@yahoo-inc.com>
Authored: Fri Jun 2 17:43:54 2017 -0500
Committer: Kuhu Shukla <ks...@yahoo-inc.com>
Committed: Fri Jun 2 17:43:54 2017 -0500

----------------------------------------------------------------------
 .../library/common/InputAttemptIdentifier.java  |  17 +-
 .../shuffle/orderedgrouped/MapOutput.java       | 242 ++++++++++++-------
 2 files changed, 172 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/29b45bc1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index cc9c6ea..16172e1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -45,7 +45,7 @@ public class InputAttemptIdentifier {
    * These fields are added for additional information about the source and are not meant to
    * alter the way these sources would be stored in hashmap.
    */
-  private final SPILL_INFO fetchTypeInfo;
+  private final byte fetchTypeInfo;
   private final int spillEventId;
 
   public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
@@ -66,7 +66,7 @@ public class InputAttemptIdentifier {
     this.attemptNumber = attemptNumber;
     this.pathComponent = pathComponent;
     this.shared = shared;
-    this.fetchTypeInfo = fetchTypeInfo;
+    this.fetchTypeInfo = (byte)fetchTypeInfo.ordinal();
     this.spillEventId = spillEventId;
     if (pathComponent != null && !pathComponent.startsWith(PATH_PREFIX)) {
       throw new TezUncheckedException(
@@ -91,7 +91,12 @@ public class InputAttemptIdentifier {
   }
 
   public SPILL_INFO getFetchTypeInfo() {
-    return fetchTypeInfo;
+    if (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE.ordinal()) {
+      return SPILL_INFO.INCREMENTAL_UPDATE;
+    } else if (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal()) {
+      return SPILL_INFO.FINAL_UPDATE;
+    }
+    return SPILL_INFO.FINAL_MERGE_ENABLED;
   }
 
   public int getSpillEventId() {
@@ -99,8 +104,8 @@ public class InputAttemptIdentifier {
   }
 
   public boolean canRetrieveInputInChunks() {
-    return (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE) ||
-        (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE);
+    return (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE.ordinal()) ||
+        (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal());
   }
 
   // PathComponent & shared does not need to be part of the hashCode and equals computation.
@@ -134,6 +139,6 @@ public class InputAttemptIdentifier {
   public String toString() {
     return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
         + ", attemptNumber=" + attemptNumber + ", pathComponent="
-        + pathComponent + ", spillType=" + fetchTypeInfo.ordinal() + ", spillId=" + spillEventId  +"]";
+        + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId  +"]";
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/29b45bc1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index b8dacef..488dd80 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -33,11 +33,11 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 
 
-class MapOutput {
+abstract class MapOutput {
   private static final Logger LOG = LoggerFactory.getLogger(MapOutput.class);
   private static AtomicInteger ID = new AtomicInteger(0);
   
-  public static enum Type {
+  public enum Type {
     WAIT,
     MEMORY,
     DISK,
@@ -45,50 +45,17 @@ class MapOutput {
   }
 
   private final int id;
-  private final Type type;
   private InputAttemptIdentifier attemptIdentifier;
 
   private final boolean primaryMapOutput;
-  private final FetchedInputAllocatorOrderedGrouped callback;
+  protected final FetchedInputAllocatorOrderedGrouped callback;
 
-  // MEMORY
-  private BoundedByteArrayOutputStream byteStream;
-
-  // DISK
-  private final Path tmpOutputPath;
-  private final FileChunk outputPath;
-  private OutputStream disk;
-
-  private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
-                    long size, Path outputPath, long offset, boolean primaryMapOutput,
-                    FileSystem fs, Path tmpOutputPath) {
+  private MapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
+                    boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
-    this.type = type;
     this.attemptIdentifier = attemptIdentifier;
     this.callback = callback;
     this.primaryMapOutput = primaryMapOutput;
-
-    // Other type specific values
-
-    if (type == Type.MEMORY) {
-      // since we are passing an int from createMemoryMapOutput, its safe to cast to int
-      this.byteStream = new BoundedByteArrayOutputStream((int)size);
-    } else {
-      this.byteStream = null;
-    }
-
-    this.tmpOutputPath = tmpOutputPath;
-    this.disk = null;
-
-    if (type == Type.DISK || type == Type.DISK_DIRECT) {
-      if (type == Type.DISK_DIRECT) {
-        this.outputPath = new FileChunk(outputPath, offset, size, true, attemptIdentifier);
-      } else {
-        this.outputPath = new FileChunk(outputPath, offset, size, false, attemptIdentifier);
-      }
-    } else {
-      this.outputPath = null;
-    }
   }
 
   public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
@@ -104,8 +71,8 @@ class MapOutput {
     Path tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
     long offset = 0;
 
-    MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, callback, size, outputPath, offset,
-        primaryMapOutput, fs, tmpOutputPath);
+    DiskMapOutput mapOutput = new DiskMapOutput(attemptIdentifier, callback, size, outputPath, offset,
+        primaryMapOutput, tmpOutputPath);
     mapOutput.disk = fs.create(tmpOutputPath);
 
     return mapOutput;
@@ -114,19 +81,18 @@ class MapOutput {
   public static MapOutput createLocalDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
                                                    FetchedInputAllocatorOrderedGrouped callback, Path path,  long offset,
                                                    long size, boolean primaryMapOutput)  {
-    return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, callback, size, path, offset,
-        primaryMapOutput, null, null);
+    return new DiskDirectMapOutput(attemptIdentifier, callback, size, path, offset,
+        primaryMapOutput);
   }
 
   public static MapOutput createMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
                                                 FetchedInputAllocatorOrderedGrouped callback, int size,
                                                 boolean primaryMapOutput)  {
-    return new MapOutput(Type.MEMORY, attemptIdentifier, callback, size, null, -1, primaryMapOutput,
-        null, null);
+    return new InMemoryMapOutput(attemptIdentifier, callback, size, primaryMapOutput);
   }
 
   public static MapOutput createWaitMapOutput(InputAttemptIdentifier attemptIdentifier) {
-    return new MapOutput(Type.WAIT, attemptIdentifier, null, -1, null, -1, false, null, null);
+    return new WaitMapOutput(attemptIdentifier);
   }
 
   public boolean isPrimaryMapOutput() {
@@ -147,69 +113,39 @@ class MapOutput {
   }
 
   public FileChunk getOutputPath() {
-    return outputPath;
+    return null;
   }
 
   public byte[] getMemory() {
-    return byteStream.getBuffer();
+    return null;
   }
 
   public BoundedByteArrayOutputStream getArrayStream() {
-    return byteStream;
+    return null;
   }
   
   public OutputStream getDisk() {
-    return disk;
+    return null;
   }
 
   public InputAttemptIdentifier getAttemptIdentifier() {
     return this.attemptIdentifier;
   }
 
-  public Type getType() {
-    return type;
-  }
+  public abstract Type getType();
 
   public long getSize() {
-    if (type == Type.MEMORY) {
-      return byteStream.getLimit();
-    } else if (type == Type.DISK || type == Type.DISK_DIRECT) {
-      return outputPath.getLength();
-    }
     return -1;
   }
 
   public void commit() throws IOException {
-    if (type == Type.MEMORY) {
-      callback.closeInMemoryFile(this);
-    } else if (type == Type.DISK) {
-      callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath());
-      callback.closeOnDiskFile(outputPath);
-    } else if (type == Type.DISK_DIRECT) {
-      callback.closeOnDiskFile(outputPath);
-    } else {
-      throw new IOException("Cannot commit MapOutput of type WAIT!");
-    }
   }
   
   public void abort() {
-    if (type == Type.MEMORY) {
-      callback.unreserve(byteStream.getBuffer().length);
-    } else if (type == Type.DISK) {
-      try {
-        callback.getLocalFileSystem().delete(tmpOutputPath, true);
-      } catch (IOException ie) {
-        LOG.info("failure to clean up " + tmpOutputPath, ie);
-      }
-    } else if (type == Type.DISK_DIRECT) { //nothing to do.
-    } else {
-      throw new IllegalArgumentException
-                   ("Cannot commit MapOutput with of type WAIT!");
-    }
   }
   
   public String toString() {
-    return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
+    return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + getType() + ")";
   }
   
   public static class MapOutputComparator 
@@ -232,4 +168,148 @@ class MapOutput {
       }
     }
   }
+
+  private static class DiskDirectMapOutput extends MapOutput {
+    private final FileChunk outputPath;
+    private DiskDirectMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
+                      long size, Path outputPath, long offset, boolean primaryMapOutput) {
+      super(attemptIdentifier, callback, primaryMapOutput);
+      this.outputPath = new FileChunk(outputPath, offset, size, true, attemptIdentifier);
+    }
+
+    @Override
+    public FileChunk getOutputPath() {
+      return outputPath;
+    }
+
+    @Override
+    public long getSize() {
+      return outputPath.getLength();
+    }
+
+    @Override
+    public void commit() throws IOException {
+      callback.closeOnDiskFile(outputPath);
+    }
+
+    @Override
+    public void abort() {
+      // nothing to do
+    }
+
+    @Override
+    public Type getType() {
+      return Type.DISK_DIRECT;
+    }
+  }
+
+  private static class DiskMapOutput extends MapOutput {
+    private final Path tmpOutputPath;
+    private final FileChunk outputPath;
+    private OutputStream disk;
+    private DiskMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
+                                long size, Path outputPath, long offset, boolean primaryMapOutput, Path tmpOutputPath) {
+      super(attemptIdentifier, callback, primaryMapOutput);
+
+      this.tmpOutputPath = tmpOutputPath;
+      this.disk = null;
+      this.outputPath = new FileChunk(outputPath, offset, size, false, attemptIdentifier);
+    }
+
+    @Override
+    public FileChunk getOutputPath() {
+      return outputPath;
+    }
+
+    @Override
+    public OutputStream getDisk() {
+      return disk;
+    }
+
+    @Override
+    public long getSize() {
+      return outputPath.getLength();
+    }
+
+    @Override
+    public void commit() throws IOException {
+      callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath());
+      callback.closeOnDiskFile(outputPath);
+    }
+
+    @Override
+    public void abort() {
+      try {
+        callback.getLocalFileSystem().delete(tmpOutputPath, true);
+      } catch (IOException ie) {
+        LOG.info("failure to clean up " + tmpOutputPath, ie);
+      }
+    }
+
+    @Override
+    public Type getType() {
+      return Type.DISK;
+    }
+  }
+
+  private static class InMemoryMapOutput extends MapOutput {
+    private BoundedByteArrayOutputStream byteStream;
+    private InMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
+                              FetchedInputAllocatorOrderedGrouped callback,
+                              long size, boolean primaryMapOutput) {
+      super(attemptIdentifier, callback, primaryMapOutput);
+      this.byteStream = new BoundedByteArrayOutputStream((int)size);
+    }
+
+    @Override
+    public byte[] getMemory() {
+      return byteStream.getBuffer();
+    }
+
+    @Override
+    public BoundedByteArrayOutputStream getArrayStream() {
+      return byteStream;
+    }
+
+    @Override
+    public long getSize() {
+      return byteStream.getLimit();
+    }
+
+    @Override
+    public void commit() throws IOException {
+      callback.closeInMemoryFile(this);
+    }
+
+    @Override
+    public void abort() {
+      callback.unreserve(byteStream.getBuffer().length);
+    }
+
+    @Override
+    public Type getType() {
+      return Type.MEMORY;
+    }
+  }
+
+  private static class WaitMapOutput extends MapOutput {
+    private WaitMapOutput(InputAttemptIdentifier attemptIdentifier) {
+      super(attemptIdentifier, null, false);
+    }
+
+    @Override
+    public void commit() throws IOException {
+      throw new IOException("Cannot commit MapOutput of type WAIT!");
+    }
+
+    @Override
+    public void abort() {
+      throw new IllegalArgumentException("Cannot commit MapOutput of type WAIT!");
+    }
+
+    @Override
+    public Type getType() {
+      return Type.WAIT;
+    }
+  }
 }