You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:01 UTC
[07/24] tez git commit: TEZ-3076. Reduce merge memory overhead to
support large number of in-memory mapoutputs (jeagles)
TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3ff360aa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3ff360aa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3ff360aa
Branch: refs/heads/TEZ-2980
Commit: 3ff360aa18373f2b4aa03648de905c690ce5a180
Parents: 7e636a5
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Jan 29 13:43:59 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Jan 29 13:43:59 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 1 -
.../library/common/InputAttemptIdentifier.java | 28 ++---
.../common/shuffle/DiskFetchedInput.java | 2 +-
.../library/common/shuffle/ShuffleUtils.java | 2 +-
.../impl/ShuffleInputEventHandlerImpl.java | 3 +-
.../common/shuffle/impl/ShuffleManager.java | 23 ++---
.../FetchedInputAllocatorOrderedGrouped.java | 3 +
.../shuffle/orderedgrouped/InMemoryReader.java | 102 ++++++++++++++++++-
.../shuffle/orderedgrouped/MapOutput.java | 32 +++---
.../shuffle/orderedgrouped/MergeManager.java | 11 +-
.../ShuffleInputEventHandlerOrderedGrouped.java | 3 +-
.../orderedgrouped/ShuffleScheduler.java | 29 +++---
.../runtime/library/common/sort/impl/IFile.java | 6 +-
.../library/common/shuffle/TestFetcher.java | 27 +++--
.../impl/TestShuffleInputEventHandlerImpl.java | 11 +-
.../shuffle/orderedgrouped/TestFetcher.java | 29 +++---
...tShuffleInputEventHandlerOrderedGrouped.java | 17 ++--
.../orderedgrouped/TestShuffleScheduler.java | 65 ++++++------
19 files changed, 238 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d69390c..6570f8b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
TEZ-3079. Fix tez-tfile parser documentation.
TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services.
TEZ-3036. Tez AM can hang on startup with no indication of error
@@ -322,6 +323,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services.
TEZ-3036. Tez AM can hang on startup with no indication of error
TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 3a602bc..6be682d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -132,7 +132,6 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index d70942c..cc9c6ea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -27,7 +27,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
@Private
public class InputAttemptIdentifier {
- private final InputIdentifier inputIdentifier;
+ private final int inputIdentifier;
private final int attemptNumber;
private final String pathComponent;
private final boolean shared;
@@ -49,18 +49,18 @@ public class InputAttemptIdentifier {
private final int spillEventId;
public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
- this(new InputIdentifier(inputIndex), attemptNumber, null);
+ this(inputIndex, attemptNumber, null);
}
- public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
+ public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent) {
this(inputIdentifier, attemptNumber, pathComponent, false);
}
- public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent, boolean shared) {
+ public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, boolean shared) {
this(inputIdentifier, attemptNumber, pathComponent, shared, SPILL_INFO.FINAL_MERGE_ENABLED, -1);
}
- public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent,
+ public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent,
boolean shared, SPILL_INFO fetchTypeInfo, int spillEventId) {
this.inputIdentifier = inputIdentifier;
this.attemptNumber = attemptNumber;
@@ -74,15 +74,7 @@ public class InputAttemptIdentifier {
}
}
- public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
- this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
- }
-
- public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent, boolean shared) {
- this(new InputIdentifier(taskIndex), attemptNumber, pathComponent, shared);
- }
-
- public InputIdentifier getInputIdentifier() {
+ public int getInputIdentifier() {
return this.inputIdentifier;
}
@@ -117,8 +109,7 @@ public class InputAttemptIdentifier {
final int prime = 31;
int result = 1;
result = prime * result + attemptNumber;
- result = prime * result
- + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode());
+ result = prime * result + inputIdentifier;
return result;
}
@@ -133,10 +124,7 @@ public class InputAttemptIdentifier {
InputAttemptIdentifier other = (InputAttemptIdentifier) obj;
if (attemptNumber != other.attemptNumber)
return false;
- if (inputIdentifier == null) {
- if (other.inputIdentifier != null)
- return false;
- } else if (!inputIdentifier.equals(other.inputIdentifier))
+ if (inputIdentifier != other.inputIdentifier)
return false;
// do not compare pathComponent as they may not always be present
return true;
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
index dfad39d..c873af7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
@@ -50,7 +50,7 @@ public class DiskFetchedInput extends FetchedInput {
this.localFS = FileSystem.getLocal(conf).getRaw();
this.outputPath = filenameAllocator.getInputFileForWrite(
- this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), this
+ this.inputAttemptIdentifier.getInputIdentifier(), this
.inputAttemptIdentifier.getSpillEventId(), actualSize);
// Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
// otherwise fetches for the same task but from different attempts would clobber each other.
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 431ba38..e8bf6ae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -498,7 +498,7 @@ public class ShuffleUtils {
private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) {
StringBuilder sb = new StringBuilder();
sb.append("{");
- sb.append(inputAttemptIdentifier.getInputIdentifier().getInputIndex());
+ sb.append(inputAttemptIdentifier.getInputIdentifier());
sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
if (inputAttemptIdentifier.getFetchTypeInfo()
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 8fb1568..adc3432 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -176,7 +175,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
InputAttemptIdentifier.SPILL_INFO spillInfo = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO
.FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
srcAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent
+ new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent
.getVersion(), pathComponent, isShared, spillInfo, spillEventId);
} else {
srcAttemptIdentifier =
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b3e050a..7f2054b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -67,7 +67,6 @@ import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
@@ -111,7 +110,7 @@ public class ShuffleManager implements FetcherCallback {
private final BlockingQueue<FetchedInput> completedInputs;
private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
- private final Set<InputIdentifier> completedInputSet;
+ private final Set<Integer> completedInputSet;
private final ConcurrentMap<String, InputHost> knownSrcHosts;
private final BlockingQueue<InputHost> pendingHosts;
private final Set<InputAttemptIdentifier> obsoletedInputs;
@@ -171,7 +170,7 @@ public class ShuffleManager implements FetcherCallback {
//To track shuffleInfo events when finalMerge is disabled OR pipelined shuffle is enabled in source.
@VisibleForTesting
- final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
+ final Map<Integer, ShuffleEventInfo> shuffleInfoEventsMap;
// TODO More counters - FetchErrors, speed?
@@ -205,7 +204,7 @@ public class ShuffleManager implements FetcherCallback {
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
- completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+ completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>(numInputs));
/**
* In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt.
* We do not know upfront the number of spills from source.
@@ -266,7 +265,7 @@ public class ShuffleManager implements FetcherCallback {
Arrays.sort(this.localDisks);
- shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>();
+ shuffleInfoEventsMap = new ConcurrentHashMap<Integer, ShuffleEventInfo>();
LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec="
+ (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
@@ -479,7 +478,7 @@ public class ShuffleManager implements FetcherCallback {
return;
}
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
if (shuffleInfoEventsMap.get(inputIdentifier) == null) {
shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier));
}
@@ -501,7 +500,7 @@ public class ShuffleManager implements FetcherCallback {
public void addCompletedInputWithNoData(
InputAttemptIdentifier srcAttemptIdentifier) {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
if (LOG.isDebugEnabled()) {
LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
}
@@ -558,7 +557,7 @@ public class ShuffleManager implements FetcherCallback {
ShuffleEventInfo(InputAttemptIdentifier input) {
- this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber();
+ this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber();
this.eventsProcessed = new BitSet();
this.attemptNum = input.getAttemptNumber();
}
@@ -594,7 +593,7 @@ public class ShuffleManager implements FetcherCallback {
public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
throws IOException {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
// Count irrespective of whether this is a copy of an already fetched input
lock.lock();
@@ -706,7 +705,7 @@ public class ShuffleManager implements FetcherCallback {
return;
}
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier);
//for empty partition case
@@ -769,9 +768,9 @@ public class ShuffleManager implements FetcherCallback {
"Fetch failure while fetching from "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
inputContext.getSourceVertexName(),
- srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+ srcAttemptIdentifier.getInputIdentifier(),
srcAttemptIdentifier.getAttemptNumber()),
- srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+ srcAttemptIdentifier.getInputIdentifier(),
srcAttemptIdentifier.getAttemptNumber());
List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
index ec1f8eb..7276f74 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
@@ -16,6 +16,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.FileChunk;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -29,6 +30,8 @@ public interface FetchedInputAllocatorOrderedGrouped {
void closeInMemoryFile(MapOutput mapOutput);
+ FileSystem getLocalFileSystem();
+
void closeOnDiskFile(FileChunk file);
void unreserve(long bytes);
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 75c552e..7860377 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -18,6 +18,7 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.File;
import java.io.FileOutputStream;
@@ -37,9 +38,103 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
@InterfaceStability.Unstable
public class InMemoryReader extends Reader {
+ private static class ByteArrayDataInput extends ByteArrayInputStream implements DataInput {
+
+ public ByteArrayDataInput(byte buf[], int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public void reset(byte[] input, int start, int length) {
+ this.buf = input;
+ this.count = start+length;
+ this.mark = start;
+ this.pos = start;
+ }
+
+ public byte[] getData() { return buf; }
+ public int getPosition() { return pos; }
+ public int getLength() { return count; }
+ public int getMark() { return mark; }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ return (byte)read();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private final InputAttemptIdentifier taskAttemptId;
private final MergeManager merger;
- DataInputBuffer memDataIn = new DataInputBuffer();
+ ByteArrayDataInput memDataIn;
private int start;
private int length;
private int originalKeyPos;
@@ -49,12 +144,12 @@ public class InMemoryReader extends Reader {
int length)
throws IOException {
super(null, length - start, null, null, null, false, 0, -1);
- this.merger = merger;
this.taskAttemptId = taskAttemptId;
+ this.merger = merger;
buffer = data;
bufferSize = (int) length;
- memDataIn.reset(buffer, start, length);
+ memDataIn = new ByteArrayDataInput(buffer, start, length);
this.start = start;
this.length = length;
}
@@ -160,7 +255,6 @@ public class InMemoryReader extends Reader {
public void close() {
// Release
- dataIn = null;
buffer = null;
// Inform the MergeManager
if (merger != null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index f19cd55..7e3d983 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -47,17 +47,14 @@ class MapOutput {
private final int id;
private final Type type;
private InputAttemptIdentifier attemptIdentifier;
- private final long size;
private final boolean primaryMapOutput;
private final FetchedInputAllocatorOrderedGrouped callback;
// MEMORY
- private final byte[] memory;
private BoundedByteArrayOutputStream byteStream;
// DISK
- private final FileSystem localFS;
private final Path tmpOutputPath;
private final FileChunk outputPath;
private OutputStream disk;
@@ -71,18 +68,13 @@ class MapOutput {
this.callback = callback;
this.primaryMapOutput = primaryMapOutput;
- this.localFS = fs;
- this.size = size;
-
// Other type specific values
if (type == Type.MEMORY) {
// since we are passing an int from createMemoryMapOutput, its safe to cast to int
this.byteStream = new BoundedByteArrayOutputStream((int)size);
- this.memory = byteStream.getBuffer();
} else {
this.byteStream = null;
- this.memory = null;
}
this.tmpOutputPath = tmpOutputPath;
@@ -97,7 +89,6 @@ class MapOutput {
} else {
this.outputPath = null;
}
-
}
public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
@@ -107,7 +98,7 @@ class MapOutput {
IOException {
FileSystem fs = FileSystem.getLocal(conf).getRaw();
Path outputpath = mapOutputFile.getInputFileForWrite(
- attemptIdentifier.getInputIdentifier().getInputIndex(), attemptIdentifier.getSpillEventId(), size);
+ attemptIdentifier.getInputIdentifier(), attemptIdentifier.getSpillEventId(), size);
// Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
// otherwise fetches for the same task but from different attempts would clobber each other.
Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));
@@ -115,7 +106,7 @@ class MapOutput {
MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, callback, size, outputpath, offset,
primaryMapOutput, fs, tmpOuputPath);
- mapOutput.disk = mapOutput.localFS.create(tmpOuputPath);
+ mapOutput.disk = fs.create(tmpOuputPath);
return mapOutput;
}
@@ -160,7 +151,7 @@ class MapOutput {
}
public byte[] getMemory() {
- return memory;
+ return byteStream.getBuffer();
}
public BoundedByteArrayOutputStream getArrayStream() {
@@ -180,14 +171,19 @@ class MapOutput {
}
public long getSize() {
- return size;
+ if (type == Type.MEMORY) {
+ return byteStream.getLimit();
+ } else if (type == Type.DISK || type == Type.DISK_DIRECT) {
+ return outputPath.getLength();
+ }
+ return -1;
}
public void commit() throws IOException {
if (type == Type.MEMORY) {
callback.closeInMemoryFile(this);
} else if (type == Type.DISK) {
- localFS.rename(tmpOutputPath, outputPath.getPath());
+ callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath());
callback.closeOnDiskFile(outputPath);
} else if (type == Type.DISK_DIRECT) {
callback.closeOnDiskFile(outputPath);
@@ -198,10 +194,10 @@ class MapOutput {
public void abort() {
if (type == Type.MEMORY) {
- callback.unreserve(memory.length);
+ callback.unreserve(byteStream.getBuffer().length);
} else if (type == Type.DISK) {
try {
- localFS.delete(tmpOutputPath, false);
+ callback.getLocalFileSystem().delete(tmpOutputPath, true);
} catch (IOException ie) {
LOG.info("failure to clean up " + tmpOutputPath, ie);
}
@@ -223,9 +219,9 @@ class MapOutput {
return 0;
}
- if (o1.size < o2.size) {
+ if (o1.getSize() < o2.getSize()) {
return -1;
- } else if (o1.size > o2.size) {
+ } else if (o1.getSize() > o2.getSize()) {
return 1;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 61ff338..dfa509f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -503,6 +503,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
}
@Override
+ public FileSystem getLocalFileSystem() {
+ return localFS;
+ }
+
+ @Override
public synchronized void closeOnDiskFile(FileChunk file) {
//including only path & offset for valdiations.
for (FileChunk fileChunk : onDiskMapOutputs) {
@@ -726,7 +731,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
// All disk writes done by this merge are overhead - due to the lack of
// adequate memory to keep all segments in memory.
outputPath = mapOutputFile.getInputFileForWrite(
- srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
+ srcTaskIdentifier.getInputIdentifier(), srcTaskIdentifier.getSpillEventId(),
mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
Writer writer = null;
@@ -863,7 +868,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
if (file0.isLocalFile()) {
// This is setup the same way a type DISK MapOutput is setup when fetching.
namePart = mapOutputFile.getSpillFileName(
- file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex(),
+ file0.getInputAttemptIdentifier().getInputIdentifier(),
file0.getInputAttemptIdentifier().getSpillEventId());
} else {
namePart = file0.getPath().getName().toString();
@@ -1032,7 +1037,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
long inMemToDiskBytes = 0;
boolean mergePhaseFinished = false;
if (inMemoryMapOutputs.size() > 0) {
- int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
+ int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier();
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
memDiskSegments,
this.postMergeMemLimit);
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index f8c9553..6e6d967 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
@@ -170,7 +169,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
InputAttemptIdentifier.SPILL_INFO info = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO
.FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
srcAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent
+ new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent
.getVersion(), pathComponent, false, info, spillEventId);
} else {
srcAttemptIdentifier =
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index dcfb274..8cba2a6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -71,7 +71,6 @@ import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -111,7 +110,7 @@ class ShuffleScheduler {
//To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is
// enabled in source.
@VisibleForTesting
- final Map<InputIdentifier, ShuffleEventInfo> pipelinedShuffleInfoEventsMap;
+ final Map<Integer, ShuffleEventInfo> pipelinedShuffleInfoEventsMap;
@VisibleForTesting
final Set<MapHost> pendingHosts = new HashSet<MapHost>();
@@ -349,7 +348,7 @@ class ShuffleScheduler {
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
- pipelinedShuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>();
+ pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>();
LOG.info("ShuffleScheduler running for sourceVertex: "
+ inputContext.getSourceVertexName() + " with configuration: "
+ "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
@@ -429,7 +428,7 @@ class ShuffleScheduler {
ShuffleEventInfo(InputAttemptIdentifier input) {
- this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber();
+ this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber();
this.eventsProcessed = new BitSet();
this.attemptNum = input.getAttemptNumber();
}
@@ -467,7 +466,7 @@ class ShuffleScheduler {
) throws IOException {
inputContext.notifyProgress();
- if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
+ if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier())) {
if (!isLocalFetch) {
/**
* Reset it only when it is a non-local-disk copy.
@@ -505,10 +504,10 @@ class ShuffleScheduler {
*/
if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
remainingMaps.decrementAndGet();
- setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
+ setInputFinished(srcAttemptIdentifier.getInputIdentifier());
numFetchedSpills++;
} else {
- InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+ int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
//Allow only one task attempt to proceed.
if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
return;
@@ -533,7 +532,7 @@ class ShuffleScheduler {
//check if we downloaded all spills pertaining to this InputAttemptIdentifier
if (eventInfo.isDone()) {
remainingMaps.decrementAndGet();
- setInputFinished(inputIdentifier.getInputIndex());
+ setInputFinished(inputIdentifier);
pipelinedShuffleInfoEventsMap.remove(inputIdentifier);
if (LOG.isTraceEnabled()) {
LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " +
@@ -560,7 +559,7 @@ class ShuffleScheduler {
if (LOG.isDebugEnabled()) {
LOG.debug("src task: "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+ inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier(),
srcAttemptIdentifier.getAttemptNumber()) + " done");
}
} else {
@@ -679,7 +678,7 @@ class ShuffleScheduler {
String errorMsg = "Failed " + attemptFailures + " times trying to "
+ "download from " + TezRuntimeUtils.getTaskAttemptIdentifier(
inputContext.getSourceVertexName(),
- srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getInputIdentifier(),
srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit;
IOException ioe = new IOException(errorMsg);
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
@@ -738,15 +737,15 @@ class ShuffleScheduler {
srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
+ srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils
.getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
- srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getInputIdentifier(),
srcAttempt.getAttemptNumber()) + " to AM.");
List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
failedEvents.add(InputReadErrorEvent.create(
"Fetch failure for " + TezRuntimeUtils
.getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
- srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getInputIdentifier(),
srcAttempt.getAttemptNumber()) + " to jobtracker.",
- srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getInputIdentifier(),
srcAttempt.getAttemptNumber()));
inputContext.sendEvents(failedEvents);
@@ -1014,7 +1013,7 @@ class ShuffleScheduler {
private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
return (!obsoleteInputs.contains(id) &&
- !isInputFinished(id.getInputIdentifier().getInputIndex()));
+ !isInputFinished(id.getInputIdentifier()));
}
public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
@@ -1029,7 +1028,7 @@ class ShuffleScheduler {
// This may be removed after TEZ-914
InputAttemptIdentifier id = listItr.next();
if (inputShouldBeConsumed(id)) {
- Integer inputNumber = Integer.valueOf(id.getInputIdentifier().getInputIndex());
+ Integer inputNumber = Integer.valueOf(id.getInputIdentifier());
List<InputAttemptIdentifier> oldIdList = dedupedList.get(inputNumber);
if (oldIdList == null || oldIdList.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 20f44dd..a99eb5e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -496,7 +496,7 @@ public class IFile {
protected byte[] buffer = null;
protected int bufferSize = DEFAULT_BUFFER_SIZE;
- protected DataInputStream dataIn;
+ protected DataInputStream dataIn = null;
protected int recNo = 1;
protected int originalKeyLength;
@@ -583,7 +583,9 @@ public class IFile {
this.in = null;
}
- this.dataIn = new DataInputStream(this.in);
+ if (in != null) {
+ this.dataIn = new DataInputStream(this.in);
+ }
this.readRecordsCounter = readsCounter;
this.bytesReadCounter = bytesReadCounter;
this.fileLength = length;
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 917dbcb..0aa112e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.junit.Assert;
import org.junit.Test;
@@ -236,36 +235,36 @@ public class TestFetcher {
@Test(timeout=5000)
public void testInputAttemptIdentifierMap() {
InputAttemptIdentifier[] srcAttempts = {
- new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
//duplicate entry
- new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
// pipeline shuffle based identifiers, with multiple attempts
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
- new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
- new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
};
InputAttemptIdentifier[] expectedSrcAttempts = {
- new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
// pipeline shuffle based identifiers
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
- new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
- new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
};
TezConfiguration conf = new TezConfiguration();
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index c452898..5bbf0fb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -53,7 +53,6 @@ import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
@@ -215,7 +214,7 @@ public class TestShuffleInputEventHandlerImpl {
Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
handler.handleEvents(Collections.singletonList(dme));
- InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+ InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(1, 0,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId1), eq(0));
@@ -223,7 +222,7 @@ public class TestShuffleInputEventHandlerImpl {
dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
handler.handleEvents(Collections.singletonList(dme));
- InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+ InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(1, 0,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0));
@@ -252,7 +251,7 @@ public class TestShuffleInputEventHandlerImpl {
Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
handler.handleEvents(Collections.singletonList(dme));
- InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 1,
+ InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 1,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0));
@@ -283,14 +282,14 @@ public class TestShuffleInputEventHandlerImpl {
Event dme = createDataMovementEvent(true, 0, 1, 0, false, bitSet, 4, 0);
handler.handleEvents(Collections.singletonList(dme));
- InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+ InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 0,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
verify(shuffleManager, times(1)).addCompletedInputWithNoData(expected);
//0--> 1 with spill id 1 (attemptNum 0)
handler.handleEvents(Collections.singletonList(dme));
dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
- expected = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+ expected = new InputAttemptIdentifier(1, 0,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
verify(shuffleManager, times(2)).addCompletedInputWithNoData(expected);
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index faa2d31..20fb9a9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -53,7 +53,6 @@ import com.google.common.collect.Lists;
import org.apache.tez.http.HttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -116,7 +115,7 @@ public class TestFetcher {
doReturn("src vertex").when(inputContext).getSourceVertexName();
MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
- InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt");
+ InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");
mapHost.addKnownMap(inputAttemptIdentifier);
List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier);
doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
@@ -484,36 +483,36 @@ public class TestFetcher {
@Test(timeout = 5000)
public void testInputAttemptIdentifierMap() {
InputAttemptIdentifier[] srcAttempts = {
- new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
//duplicate entry
- new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
// pipeline shuffle based identifiers, with multiple attempts
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
- new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
- new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
};
InputAttemptIdentifier[] expectedSrcAttempts = {
- new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+ new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
// pipeline shuffle based identifiers
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
- new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+ new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
- new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
- new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+ new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
};
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 88a1d20..de066fe 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -17,7 +17,6 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Before;
import org.junit.Test;
@@ -165,7 +164,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
int inputIdx = 0;
Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0);
InputAttemptIdentifier id1 =
- new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+ new InputAttemptIdentifier(inputIdx, attemptNum,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
handler.handleEvents(Collections.singletonList(dme1));
String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
@@ -176,7 +175,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
//Send final_update event.
Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1);
InputAttemptIdentifier id2 =
- new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+ new InputAttemptIdentifier(inputIdx, attemptNum,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
handler.handleEvents(Collections.singletonList(dme2));
baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
@@ -202,14 +201,14 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
inputIdx = 1;
Event dme3 = createDataMovementEvent(attemptNum, inputIdx, null, false, true,
true, 1);
- InputAttemptIdentifier id3 = new InputAttemptIdentifier(new InputIdentifier(inputIdx),
+ InputAttemptIdentifier id3 = new InputAttemptIdentifier(inputIdx,
attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
handler.handleEvents(Collections.singletonList(dme3));
//Send final_update event (empty partition directly invoking copySucceeded).
- InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx),
+ InputAttemptIdentifier id4 = new InputAttemptIdentifier(inputIdx,
attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
- assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex()));
+ assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier()));
scheduler.copySucceeded(id4, null, 0, 0, 0, null, false);
assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet
//Let the incremental event pass
@@ -229,7 +228,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
handler.handleEvents(Collections.singletonList(dme1));
InputAttemptIdentifier id1 =
- new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+ new InputAttemptIdentifier(inputIdx, attemptNum,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1));
@@ -243,7 +242,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
handler.handleEvents(Collections.singletonList(dme2));
InputAttemptIdentifier id2 =
- new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+ new InputAttemptIdentifier(inputIdx, attemptNum,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class));
}
@@ -329,4 +328,4 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
}
return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 1a6c3be..f7ef309 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -50,7 +50,6 @@ import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -89,7 +88,7 @@ public class TestShuffleScheduler {
// Schedule all copies.
for (int i = 0; i < numInputs; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
@@ -134,7 +133,7 @@ public class TestShuffleScheduler {
for (int i = 0; i < numInputs; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
@@ -191,7 +190,7 @@ public class TestShuffleScheduler {
//Generate 320 events
for (int i = 0; i < 320; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, "hostUrl", inputAttemptIdentifier);
}
@@ -199,7 +198,7 @@ public class TestShuffleScheduler {
//100 succeeds
for (int i = 0; i < 100; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
@@ -210,14 +209,14 @@ public class TestShuffleScheduler {
//99 fails
for (int i = 100; i < 199; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
}
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(200), 0, "attempt_");
+ new InputAttemptIdentifier(200, 0, "attempt_");
//Should fail here and report exception as reducer is not healthy
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 %
@@ -260,7 +259,7 @@ public class TestShuffleScheduler {
//Generate 0-200 events
for (int i = 0; i < 200; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, "hostUrl", inputAttemptIdentifier);
}
@@ -269,7 +268,7 @@ public class TestShuffleScheduler {
//Generate 200-320 events with empty partitions
for (int i = 200; i < 320; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copySucceeded(inputAttemptIdentifier, null, 0, 0, 0, null, true);
}
//120 are successful. so remaining is 200
@@ -279,7 +278,7 @@ public class TestShuffleScheduler {
//200 pending to be downloaded. Download 190.
for (int i = 0; i < 190; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
@@ -292,7 +291,7 @@ public class TestShuffleScheduler {
//10 fails
for (int i = 190; i < 200; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
}
@@ -304,7 +303,7 @@ public class TestShuffleScheduler {
scheduler.lastProgressTime = System.currentTimeMillis() - 250000;
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(190), 0, "attempt_");
+ new InputAttemptIdentifier(190, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" +
(190 % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
@@ -317,7 +316,7 @@ public class TestShuffleScheduler {
//fail to download 50 more times across attempts
for (int i = 190; i < 200; i++) {
inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
@@ -338,7 +337,7 @@ public class TestShuffleScheduler {
//fail another 30
for (int i = 110; i < 120; i++) {
inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
@@ -376,7 +375,7 @@ public class TestShuffleScheduler {
//Generate 320 events
for (int i = 0; i < 320; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, "hostUrl", inputAttemptIdentifier);
}
@@ -384,7 +383,7 @@ public class TestShuffleScheduler {
//319 succeeds
for (int i = 0; i < 319; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
@@ -394,7 +393,7 @@ public class TestShuffleScheduler {
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
+ new InputAttemptIdentifier(319, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
@@ -441,7 +440,7 @@ public class TestShuffleScheduler {
//Generate 320 events
for (int i = 0; i < 320; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, "hostUrl", inputAttemptIdentifier);
}
@@ -449,7 +448,7 @@ public class TestShuffleScheduler {
//Tasks fail in 20% of nodes 3 times, but are able to proceed further
for (int i = 0; i < 64; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
totalProducerNodes) + ":" + 10000, ""), false, true, false);
@@ -470,7 +469,7 @@ public class TestShuffleScheduler {
//319 succeeds
for (int i = 64; i < 319; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
@@ -480,7 +479,7 @@ public class TestShuffleScheduler {
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
+ new InputAttemptIdentifier(319, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
@@ -536,7 +535,7 @@ public class TestShuffleScheduler {
//Generate 319 events (last event has not arrived)
for (int i = 0; i < 319; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, "hostUrl", inputAttemptIdentifier);
}
@@ -544,7 +543,7 @@ public class TestShuffleScheduler {
//318 succeeds
for (int i = 0; i < 319; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
@@ -554,7 +553,7 @@ public class TestShuffleScheduler {
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(318), 0, "attempt_");
+ new InputAttemptIdentifier(318, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
@@ -615,7 +614,7 @@ public class TestShuffleScheduler {
//Generate 320 events (last event has not arrived)
for (int i = 0; i < 320; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
10000, i, "hostUrl", inputAttemptIdentifier);
}
@@ -623,7 +622,7 @@ public class TestShuffleScheduler {
//10 succeeds
for (int i = 0; i < 10; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
@@ -634,7 +633,7 @@ public class TestShuffleScheduler {
//5 fetches fail once
for (int i = 10; i < 15; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
}
@@ -648,7 +647,7 @@ public class TestShuffleScheduler {
//5 fetches fail repeatedly
for (int i = 10; i < 15; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ ":" + 10000, ""), false, true, false);
scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
@@ -691,7 +690,7 @@ public class TestShuffleScheduler {
//Generate 320 events
for (int i = 0; i < 320; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i,
"hostUrl", inputAttemptIdentifier);
}
@@ -699,7 +698,7 @@ public class TestShuffleScheduler {
//100 succeeds
for (int i = 0; i < 100; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
@@ -711,7 +710,7 @@ public class TestShuffleScheduler {
//99 fails
for (int i = 100; i < 199; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier,
new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
false, true, false);
@@ -754,7 +753,7 @@ public class TestShuffleScheduler {
final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle);
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt_");
+ new InputAttemptIdentifier(0, 0, "attempt_");
scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier);
assertTrue(scheduler.pendingHosts.size() == 1);
@@ -801,7 +800,7 @@ public class TestShuffleScheduler {
for (int i = 0; i < numInputs; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
- new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}