You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ks...@apache.org on 2017/06/02 22:46:27 UTC
tez git commit: TEZ-3732. Reduce Object size of
InputAttemptIdentifier and MapOutput for large jobs (Jonathan Eagles via
kshukla)
Repository: tez
Updated Branches:
refs/heads/master 0ee044c0e -> 29b45bc11
TEZ-3732. Reduce Object size of InputAttemptIdentifier and MapOutput for large jobs (Jonathan Eagles via kshukla)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/29b45bc1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/29b45bc1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/29b45bc1
Branch: refs/heads/master
Commit: 29b45bc1140a3a3a44a59a53375db1941184ba18
Parents: 0ee044c
Author: Kuhu Shukla <ks...@yahoo-inc.com>
Authored: Fri Jun 2 17:43:54 2017 -0500
Committer: Kuhu Shukla <ks...@yahoo-inc.com>
Committed: Fri Jun 2 17:43:54 2017 -0500
----------------------------------------------------------------------
.../library/common/InputAttemptIdentifier.java | 17 +-
.../shuffle/orderedgrouped/MapOutput.java | 242 ++++++++++++-------
2 files changed, 172 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/29b45bc1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index cc9c6ea..16172e1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -45,7 +45,7 @@ public class InputAttemptIdentifier {
* These fields are added for additional information about the source and are not meant to
* alter the way these sources would be stored in hashmap.
*/
- private final SPILL_INFO fetchTypeInfo;
+ private final byte fetchTypeInfo;
private final int spillEventId;
public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
@@ -66,7 +66,7 @@ public class InputAttemptIdentifier {
this.attemptNumber = attemptNumber;
this.pathComponent = pathComponent;
this.shared = shared;
- this.fetchTypeInfo = fetchTypeInfo;
+ this.fetchTypeInfo = (byte)fetchTypeInfo.ordinal();
this.spillEventId = spillEventId;
if (pathComponent != null && !pathComponent.startsWith(PATH_PREFIX)) {
throw new TezUncheckedException(
@@ -91,7 +91,12 @@ public class InputAttemptIdentifier {
}
public SPILL_INFO getFetchTypeInfo() {
- return fetchTypeInfo;
+ if (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE.ordinal()) {
+ return SPILL_INFO.INCREMENTAL_UPDATE;
+ } else if (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal()) {
+ return SPILL_INFO.FINAL_UPDATE;
+ }
+ return SPILL_INFO.FINAL_MERGE_ENABLED;
}
public int getSpillEventId() {
@@ -99,8 +104,8 @@ public class InputAttemptIdentifier {
}
public boolean canRetrieveInputInChunks() {
- return (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE) ||
- (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE);
+ return (fetchTypeInfo == SPILL_INFO.INCREMENTAL_UPDATE.ordinal()) ||
+ (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal());
}
// PathComponent & shared does not need to be part of the hashCode and equals computation.
@@ -134,6 +139,6 @@ public class InputAttemptIdentifier {
public String toString() {
return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
+ ", attemptNumber=" + attemptNumber + ", pathComponent="
- + pathComponent + ", spillType=" + fetchTypeInfo.ordinal() + ", spillId=" + spillEventId +"]";
+ + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId +"]";
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/29b45bc1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index b8dacef..488dd80 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -33,11 +33,11 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-class MapOutput {
+abstract class MapOutput {
private static final Logger LOG = LoggerFactory.getLogger(MapOutput.class);
private static AtomicInteger ID = new AtomicInteger(0);
- public static enum Type {
+ public enum Type {
WAIT,
MEMORY,
DISK,
@@ -45,50 +45,17 @@ class MapOutput {
}
private final int id;
- private final Type type;
private InputAttemptIdentifier attemptIdentifier;
private final boolean primaryMapOutput;
- private final FetchedInputAllocatorOrderedGrouped callback;
+ protected final FetchedInputAllocatorOrderedGrouped callback;
- // MEMORY
- private BoundedByteArrayOutputStream byteStream;
-
- // DISK
- private final Path tmpOutputPath;
- private final FileChunk outputPath;
- private OutputStream disk;
-
- private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
- long size, Path outputPath, long offset, boolean primaryMapOutput,
- FileSystem fs, Path tmpOutputPath) {
+ private MapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
+ boolean primaryMapOutput) {
this.id = ID.incrementAndGet();
- this.type = type;
this.attemptIdentifier = attemptIdentifier;
this.callback = callback;
this.primaryMapOutput = primaryMapOutput;
-
- // Other type specific values
-
- if (type == Type.MEMORY) {
- // since we are passing an int from createMemoryMapOutput, its safe to cast to int
- this.byteStream = new BoundedByteArrayOutputStream((int)size);
- } else {
- this.byteStream = null;
- }
-
- this.tmpOutputPath = tmpOutputPath;
- this.disk = null;
-
- if (type == Type.DISK || type == Type.DISK_DIRECT) {
- if (type == Type.DISK_DIRECT) {
- this.outputPath = new FileChunk(outputPath, offset, size, true, attemptIdentifier);
- } else {
- this.outputPath = new FileChunk(outputPath, offset, size, false, attemptIdentifier);
- }
- } else {
- this.outputPath = null;
- }
}
public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
@@ -104,8 +71,8 @@ class MapOutput {
Path tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
long offset = 0;
- MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, callback, size, outputPath, offset,
- primaryMapOutput, fs, tmpOutputPath);
+ DiskMapOutput mapOutput = new DiskMapOutput(attemptIdentifier, callback, size, outputPath, offset,
+ primaryMapOutput, tmpOutputPath);
mapOutput.disk = fs.create(tmpOutputPath);
return mapOutput;
@@ -114,19 +81,18 @@ class MapOutput {
public static MapOutput createLocalDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
FetchedInputAllocatorOrderedGrouped callback, Path path, long offset,
long size, boolean primaryMapOutput) {
- return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, callback, size, path, offset,
- primaryMapOutput, null, null);
+ return new DiskDirectMapOutput(attemptIdentifier, callback, size, path, offset,
+ primaryMapOutput);
}
public static MapOutput createMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
FetchedInputAllocatorOrderedGrouped callback, int size,
boolean primaryMapOutput) {
- return new MapOutput(Type.MEMORY, attemptIdentifier, callback, size, null, -1, primaryMapOutput,
- null, null);
+ return new InMemoryMapOutput(attemptIdentifier, callback, size, primaryMapOutput);
}
public static MapOutput createWaitMapOutput(InputAttemptIdentifier attemptIdentifier) {
- return new MapOutput(Type.WAIT, attemptIdentifier, null, -1, null, -1, false, null, null);
+ return new WaitMapOutput(attemptIdentifier);
}
public boolean isPrimaryMapOutput() {
@@ -147,69 +113,39 @@ class MapOutput {
}
public FileChunk getOutputPath() {
- return outputPath;
+ return null;
}
public byte[] getMemory() {
- return byteStream.getBuffer();
+ return null;
}
public BoundedByteArrayOutputStream getArrayStream() {
- return byteStream;
+ return null;
}
public OutputStream getDisk() {
- return disk;
+ return null;
}
public InputAttemptIdentifier getAttemptIdentifier() {
return this.attemptIdentifier;
}
- public Type getType() {
- return type;
- }
+ public abstract Type getType();
public long getSize() {
- if (type == Type.MEMORY) {
- return byteStream.getLimit();
- } else if (type == Type.DISK || type == Type.DISK_DIRECT) {
- return outputPath.getLength();
- }
return -1;
}
public void commit() throws IOException {
- if (type == Type.MEMORY) {
- callback.closeInMemoryFile(this);
- } else if (type == Type.DISK) {
- callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath());
- callback.closeOnDiskFile(outputPath);
- } else if (type == Type.DISK_DIRECT) {
- callback.closeOnDiskFile(outputPath);
- } else {
- throw new IOException("Cannot commit MapOutput of type WAIT!");
- }
}
public void abort() {
- if (type == Type.MEMORY) {
- callback.unreserve(byteStream.getBuffer().length);
- } else if (type == Type.DISK) {
- try {
- callback.getLocalFileSystem().delete(tmpOutputPath, true);
- } catch (IOException ie) {
- LOG.info("failure to clean up " + tmpOutputPath, ie);
- }
- } else if (type == Type.DISK_DIRECT) { //nothing to do.
- } else {
- throw new IllegalArgumentException
- ("Cannot commit MapOutput with of type WAIT!");
- }
}
public String toString() {
- return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
+ return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + getType() + ")";
}
public static class MapOutputComparator
@@ -232,4 +168,148 @@ class MapOutput {
}
}
}
+
+ private static class DiskDirectMapOutput extends MapOutput {
+ private final FileChunk outputPath;
+ private DiskDirectMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
+ long size, Path outputPath, long offset, boolean primaryMapOutput) {
+ super(attemptIdentifier, callback, primaryMapOutput);
+ this.outputPath = new FileChunk(outputPath, offset, size, true, attemptIdentifier);
+ }
+
+ @Override
+ public FileChunk getOutputPath() {
+ return outputPath;
+ }
+
+ @Override
+ public long getSize() {
+ return outputPath.getLength();
+ }
+
+ @Override
+ public void commit() throws IOException {
+ callback.closeOnDiskFile(outputPath);
+ }
+
+ @Override
+ public void abort() {
+ // nothing to do
+ }
+
+ @Override
+ public Type getType() {
+ return Type.DISK_DIRECT;
+ }
+ }
+
+ private static class DiskMapOutput extends MapOutput {
+ private final Path tmpOutputPath;
+ private final FileChunk outputPath;
+ private OutputStream disk;
+ private DiskMapOutput(InputAttemptIdentifier attemptIdentifier, FetchedInputAllocatorOrderedGrouped callback,
+ long size, Path outputPath, long offset, boolean primaryMapOutput, Path tmpOutputPath) {
+ super(attemptIdentifier, callback, primaryMapOutput);
+
+ this.tmpOutputPath = tmpOutputPath;
+ this.disk = null;
+ this.outputPath = new FileChunk(outputPath, offset, size, false, attemptIdentifier);
+ }
+
+ @Override
+ public FileChunk getOutputPath() {
+ return outputPath;
+ }
+
+ @Override
+ public OutputStream getDisk() {
+ return disk;
+ }
+
+ @Override
+ public long getSize() {
+ return outputPath.getLength();
+ }
+
+ @Override
+ public void commit() throws IOException {
+ callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath());
+ callback.closeOnDiskFile(outputPath);
+ }
+
+ @Override
+ public void abort() {
+ try {
+ callback.getLocalFileSystem().delete(tmpOutputPath, true);
+ } catch (IOException ie) {
+ LOG.info("failure to clean up " + tmpOutputPath, ie);
+ }
+ }
+
+ @Override
+ public Type getType() {
+ return Type.DISK;
+ }
+ }
+
+ private static class InMemoryMapOutput extends MapOutput {
+ private BoundedByteArrayOutputStream byteStream;
+ private InMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
+ FetchedInputAllocatorOrderedGrouped callback,
+ long size, boolean primaryMapOutput) {
+ super(attemptIdentifier, callback, primaryMapOutput);
+ this.byteStream = new BoundedByteArrayOutputStream((int)size);
+ }
+
+ @Override
+ public byte[] getMemory() {
+ return byteStream.getBuffer();
+ }
+
+ @Override
+ public BoundedByteArrayOutputStream getArrayStream() {
+ return byteStream;
+ }
+
+ @Override
+ public long getSize() {
+ return byteStream.getLimit();
+ }
+
+ @Override
+ public void commit() throws IOException {
+ callback.closeInMemoryFile(this);
+ }
+
+ @Override
+ public void abort() {
+ callback.unreserve(byteStream.getBuffer().length);
+ }
+
+ @Override
+ public Type getType() {
+ return Type.MEMORY;
+ }
+ }
+
+ private static class WaitMapOutput extends MapOutput {
+ private WaitMapOutput(InputAttemptIdentifier attemptIdentifier) {
+ super(attemptIdentifier, null, false);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ throw new IOException("Cannot commit MapOutput of type WAIT!");
+ }
+
+ @Override
+ public void abort() {
+ throw new IllegalArgumentException("Cannot commit MapOutput of type WAIT!");
+ }
+
+ @Override
+ public Type getType() {
+ return Type.WAIT;
+ }
+ }
}