You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/03/02 21:02:18 UTC
tez git commit: TEZ-3115. Shuffle string handling adds significant
memory overhead (jeagles)
Repository: tez
Updated Branches:
refs/heads/master ac0fd8bb3 -> 3f5a7f35d
TEZ-3115. Shuffle string handling adds significant memory overhead (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3f5a7f35
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3f5a7f35
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3f5a7f35
Branch: refs/heads/master
Commit: 3f5a7f35dd5a8e28fcf56c2e2bdad86f8d7febbf
Parents: ac0fd8b
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Mar 2 14:02:04 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Mar 2 14:02:04 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../library/common/shuffle/ShuffleUtils.java | 11 +-
.../orderedgrouped/FetcherOrderedGrouped.java | 26 ++-
.../common/shuffle/orderedgrouped/MapHost.java | 137 +++++++++---
.../common/shuffle/orderedgrouped/Shuffle.java | 5 +-
.../ShuffleInputEventHandlerOrderedGrouped.java | 30 +--
.../orderedgrouped/ShuffleScheduler.java | 88 ++++++--
.../shuffle/orderedgrouped/TestFetcher.java | 45 ++--
...tShuffleInputEventHandlerOrderedGrouped.java | 18 +-
.../orderedgrouped/TestShuffleScheduler.java | 213 ++++++++++---------
10 files changed, 349 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 47913ab..5a7ae58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
+ TEZ-3115. Shuffle string handling adds significant memory overhead
TEZ-3151. Expose DAG credentials to plugins.
TEZ-3149. Tez-tools: Add username in DagInfo.
TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier.
@@ -395,6 +396,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES:
+ TEZ-3115. Shuffle string handling adds significant memory overhead
TEZ-3149. Tez-tools: Add username in DagInfo.
TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier.
TEZ-3141. mapreduce.task.timeout is not translated to container heartbeat timeout
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index e8bf6ae..013a002 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -190,18 +190,13 @@ public class ShuffleUtils {
}
}
- // TODO NEWTEZ handle ssl shuffle
public static StringBuilder constructBaseURIForShuffleHandler(String host,
int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
- return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
- partition, appId, dagIdentifier, sslShuffle);
- }
-
- public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
- int partition, String appId, int dagIdentifier, boolean sslShuffle) {
final String http_protocol = (sslShuffle) ? "https://" : "http://";
StringBuilder sb = new StringBuilder(http_protocol);
- sb.append(hostIdentifier);
+ sb.append(host);
+ sb.append(":");
+ sb.append(port);
sb.append("/");
sb.append("mapOutput?job=");
sb.append(appId.replace("application", "job"));
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 93f083d..51bdf68 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -72,7 +72,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private final ExceptionReporter exceptionReporter;
private final int id;
private final String logIdentifier;
- private final String localShuffleHostPort;
+ private final String localShuffleHost;
+ private final int localShufflePort;
+ private final String applicationId;
+ private final int dagId;
private final MapHost mapHost;
private final int currentPartition;
@@ -82,6 +85,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private final JobTokenSecretManager jobTokenSecretManager;
final HttpConnectionParams httpConnectionParams;
+ private final boolean sslShuffle;
@VisibleForTesting
volatile boolean stopped = false;
@@ -118,7 +122,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
TezCounter wrongMapErrsCounter,
TezCounter connectionErrsCounter,
TezCounter wrongReduceErrsCounter,
- boolean asyncHttp) {
+ String applicationId,
+ int dagId,
+ boolean asyncHttp,
+ boolean sslShuffle) {
this.scheduler = scheduler;
this.allocator = allocator;
this.metrics = metrics;
@@ -134,6 +141,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
this.wrongMapErrs = wrongMapErrsCounter;
this.connectionErrs = connectionErrsCounter;
this.wrongReduceErrs = wrongReduceErrsCounter;
+ this.applicationId = applicationId;
+ this.dagId = dagId;
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
@@ -145,9 +154,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
this.codec = null;
}
this.conf = conf;
- this.localShuffleHostPort = localHostname + ":" + String.valueOf(shufflePort);
+ this.localShuffleHost = localHostname;
+ this.localShufflePort = shufflePort;
this.localDiskFetchEnabled = localDiskFetchEnabled;
+ this.sslShuffle = sslShuffle;
this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id;
}
@@ -157,8 +168,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
try {
metrics.threadBusy();
- String hostPort = mapHost.getHostIdentifier();
- if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) {
+ if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) {
setupLocalDiskFetch(mapHost);
} else {
// Shuffle
@@ -319,8 +329,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
throws IOException {
boolean connectSucceeded = false;
try {
- URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts,
- httpConnectionParams.isKeepAlive());
+ StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host.getHost(),
+ host.getPort(), host.getPartitionId(), applicationId, dagId, sslShuffle);
+ URL url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.isKeepAlive());
httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams,
logIdentifier, jobTokenSecretManager);
connectSucceeded = httpConnection.connect();
@@ -734,6 +745,5 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
remaining.put(id.toString(), id);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
index 3116568..486d8c5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
@@ -25,51 +25,138 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@Private
class MapHost {
-
+
public static enum State {
IDLE, // No map outputs available
BUSY, // Map outputs are being fetched
PENDING, // Known map outputs which need to be fetched
PENALIZED // Host penalized due to shuffle failures
}
-
+
+ public static class HostPort {
+
+ final String host;
+ final int port;
+
+ HostPort(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ HostPort other = (HostPort) obj;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "HostPort [host=" + host + ", port=" + port + "]";
+ }
+ }
+
+ public static class HostPortPartition {
+
+ final String host;
+ final int port;
+ final int partition;
+
+ HostPortPartition(String host, int port, int partition) {
+ this.host = host;
+ this.port = port;
+ this.partition = partition;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + partition;
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ HostPortPartition other = (HostPortPartition) obj;
+ if (partition != other.partition)
+ return false;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "HostPortPartition [host=" + host + ", port=" + port + ", partition=" + partition + "]";
+ }
+ }
+
private State state = State.IDLE;
- private final String hostIdentifier;
- private final int partitionId;
- private final String baseUrl;
- private final String identifier;
+ private final String host;
+ private final int port;
+ private final int partition;
// Tracks attempt IDs
private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
- public MapHost(int partitionId, String hostPort, String baseUrl) {
- this.partitionId = partitionId;
- this.hostIdentifier = hostPort;
- this.baseUrl = baseUrl;
- this.identifier = createIdentifier(hostPort, partitionId);
- }
-
- public static String createIdentifier(String hostName, int partitionId) {
- return hostName + ":" + Integer.toString(partitionId);
+ public MapHost(String host, int port, int partition) {
+ this.host = host;
+ this.port = port;
+ this.partition = partition;
}
- public String getIdentifier() {
- return identifier;
- }
-
public int getPartitionId() {
- return partitionId;
+ return partition;
}
public State getState() {
return state;
}
- public String getHostIdentifier() {
- return hostIdentifier;
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
}
- public String getBaseUrl() {
- return baseUrl;
+ public String getHostIdentifier() {
+ return host + ":" + port;
}
public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
@@ -112,7 +199,7 @@ class MapHost {
@Override
public String toString() {
- return hostIdentifier;
+ return getHostIdentifier();
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index db43651..fa66b7e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -150,8 +150,6 @@ public class Shuffle implements ExceptionReporter {
+ (codec == null ? "None" : codec.getClass().getName())
+ ", ifileReadAhead: " + ifileReadAhead);
- boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
startTime = System.currentTimeMillis();
merger = new MergeManager(
this.conf,
@@ -188,8 +186,7 @@ public class Shuffle implements ExceptionReporter {
eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
inputContext,
- scheduler,
- sslShuffle);
+ scheduler);
ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build());
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 6e6d967..7991485 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -19,16 +19,15 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import java.io.IOException;
-import java.net.URI;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.StringInterner;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -49,7 +48,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
private final ShuffleScheduler scheduler;
private final InputContext inputContext;
- private final boolean sslShuffle;
private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
private final AtomicInteger numDmeEvents = new AtomicInteger(0);
@@ -57,10 +55,9 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
- ShuffleScheduler scheduler, boolean sslShuffle) {
+ ShuffleScheduler scheduler) {
this.inputContext = inputContext;
this.scheduler = scheduler;
- this.sslShuffle = sslShuffle;
}
@Override
@@ -103,17 +100,19 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
}
int partitionId = dmEvent.getSourceIndex();
+ InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload);
+
if (LOG.isDebugEnabled()) {
LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex()
+ ", attemptNum: " + dmEvent.getVersion() + ", payload: " +
ShuffleUtils.stringify(shufflePayload));
}
+
if (shufflePayload.hasEmptyPartitions()) {
try {
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
if (emptyPartitionsBitSet.get(partitionId)) {
- InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
@@ -129,13 +128,10 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
}
}
- InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload);
-
- URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
- scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
- partitionId, baseUri.toString(), srcAttemptIdentifier);
+ scheduler.addKnownMapOutput(StringInterner.weakIntern(shufflePayload.getHost()), shufflePayload.getPort(),
+ partitionId, srcAttemptIdentifier);
}
-
+
private void processTaskFailedEvent(InputFailedEvent ifEvent) {
InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
scheduler.obsoleteInput(taIdentifier);
@@ -144,14 +140,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
}
}
- @VisibleForTesting
- URI getBaseURI(String host, int port, int partitionId) {
- StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
- partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle);
- URI u = URI.create(sb.toString());
- return u;
- }
-
/**
* Helper method to create InputAttemptIdentifier
*
@@ -161,7 +149,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
*/
private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent,
DataMovementEventPayloadProto shufflePayload) {
- String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent() : null;
+ String pathComponent = (shufflePayload.hasPathComponent()) ? StringInterner.weakIntern(shufflePayload.getPathComponent()) : null;
int spillEventId = shufflePayload.getSpillId();
InputAttemptIdentifier srcAttemptIdentifier = null;
if (shufflePayload.hasSpillId()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 8cba2a6..2f6e490 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -73,12 +73,58 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPort;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
import com.google.common.collect.Lists;
class ShuffleScheduler {
+ public static class PathPartition {
+
+ final String path;
+ final int partition;
+
+ PathPartition(String path, int partition) {
+ this.path = path;
+ this.partition = partition;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((path == null) ? 0 : path.hashCode());
+ result = prime * result + partition;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PathPartition other = (PathPartition) obj;
+ if (path == null) {
+ if (other.path != null)
+ return false;
+ } else if (!path.equals(other.path))
+ return false;
+ if (partition != other.partition)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "PathPartition [path=" + path + ", partition=" + partition + "]";
+ }
+ }
+
@VisibleForTesting
enum ShuffleErrors {
IO_ERROR,
@@ -101,11 +147,11 @@ class ShuffleScheduler {
private final int numInputs;
private int numFetchedSpills;
@VisibleForTesting
- final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+ final Map<HostPortPartition, MapHost> mapLocations = new HashMap<HostPortPartition, MapHost>();
//TODO Clean this and other maps at some point
@VisibleForTesting
- final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap
- = new ConcurrentHashMap<String, InputAttemptIdentifier>();
+ final ConcurrentMap<PathPartition, InputAttemptIdentifier> pathToIdentifierMap
+ = new ConcurrentHashMap<PathPartition, InputAttemptIdentifier>();
//To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is
// enabled in source.
@@ -122,9 +168,8 @@ class ShuffleScheduler {
private final Referee referee;
@VisibleForTesting
final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>();
- final Set<String> uniqueHosts = Sets.newHashSet();
- private final Map<String,IntWritable> hostFailures =
- new HashMap<String,IntWritable>();
+ final Set<HostPort> uniqueHosts = Sets.newHashSet();
+ private final Map<HostPort,IntWritable> hostFailures = new HashMap<HostPort,IntWritable>();
private final InputContext inputContext;
private final TezCounter shuffledInputsCounter;
private final TezCounter skippedInputCounter;
@@ -166,7 +211,10 @@ class ShuffleScheduler {
private final boolean localDiskFetchEnabled;
private final String localHostname;
private final int shufflePort;
+ private final String applicationId;
+ private final int dagId;
private final boolean asyncHttp;
+ private final boolean sslShuffle;
private final TezCounter ioErrsCounter;
private final TezCounter wrongLengthErrsCounter;
@@ -275,6 +323,8 @@ class ShuffleScheduler {
TezRuntimeConfiguration
.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);
+ this.applicationId = inputContext.getApplicationId().toString();
+ this.dagId = inputContext.getDagIdentifier();
this.localHostname = inputContext.getExecutionContext().getHostName();
final ByteBuffer shuffleMetadata =
inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
@@ -311,6 +361,8 @@ class ShuffleScheduler {
this.startTime = startTime;
this.lastProgressTime = startTime;
+ this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
@@ -477,7 +529,7 @@ class ShuffleScheduler {
failureCounts.remove(srcAttemptIdentifier);
if (host != null) {
- hostFailures.remove(host.getHostIdentifier());
+ hostFailures.remove(new HostPort(host.getHost(), host.getPort()));
}
output.commit();
@@ -691,7 +743,7 @@ class ShuffleScheduler {
private void penalizeHost(MapHost host, int failures) {
host.penalize();
- String hostPort = host.getHostIdentifier();
+ HostPort hostPort = new HostPort(host.getHost(), host.getPort());
// TODO TEZ-922 hostFailures isn't really used for anything apart from
// hasFailedAcrossNodes().Factor it into error
// reporting / potential blacklisting of hosts.
@@ -766,7 +818,7 @@ class ShuffleScheduler {
(int) Math.ceil(numUniqueHosts * hostFailureFraction));
int total = 0;
boolean failedAcrossNodes = false;
- for(String host : uniqueHosts) {
+ for(HostPort host : uniqueHosts) {
IntWritable failures = hostFailures.get(host);
if (failures != null && failures.get() > minFailurePerHost) {
total++;
@@ -926,17 +978,13 @@ class ShuffleScheduler {
public synchronized void addKnownMapOutput(String inputHostName,
int port,
int partitionId,
- String hostUrl,
InputAttemptIdentifier srcAttempt) {
- String hostPort = (inputHostName + ":" + String.valueOf(port));
- uniqueHosts.add(hostPort);
- String identifier = MapHost.createIdentifier(hostPort, partitionId);
-
+ uniqueHosts.add(new HostPort(inputHostName, port));
+ HostPortPartition identifier = new HostPortPartition(inputHostName, port, partitionId);
MapHost host = mapLocations.get(identifier);
if (host == null) {
- host = new MapHost(partitionId, hostPort, hostUrl);
- assert identifier.equals(host.getIdentifier());
+ host = new MapHost(inputHostName, port, partitionId);
mapLocations.put(identifier, host);
}
@@ -1150,8 +1198,8 @@ class ShuffleScheduler {
}
- private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
- return path + "_" + reduceId;
+ private PathPartition getIdentifierFromPathAndReduceId(String path, int reduceId) {
+ return new PathPartition(path, reduceId);
}
/**
@@ -1274,7 +1322,7 @@ class ShuffleScheduler {
count++;
if (LOG.isDebugEnabled()) {
LOG.debug(srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}",
- mapHost.getIdentifier());
+ mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId());
}
FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost);
runningFetchers.add(fetcherOrderedGrouped);
@@ -1299,7 +1347,7 @@ class ShuffleScheduler {
shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
- connectionErrsCounter, wrongReduceErrsCounter, asyncHttp);
+ connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle);
}
private class FetchFutureCallback implements FutureCallback<Void> {
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 20fb9a9..89d35f4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -57,6 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
@@ -79,6 +80,8 @@ public class TestFetcher {
public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
public static final String HOST = "localhost";
public static final int PORT = 65;
+ public static final int DAG_ID = 1;
+ public static final String APP_ID = "application_1234_1";
private TezCounters tezCounters = new TezCounters();
private TezCounter ioErrsCounter = tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
@@ -114,7 +117,7 @@ public class TestFetcher {
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("src vertex").when(inputContext).getSourceVertexName();
- MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+ MapHost mapHost = new MapHost(HOST, PORT, 0);
InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");
mapHost.addKnownMap(inputAttemptIdentifier);
List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier);
@@ -124,7 +127,7 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
fetcher.call();
verify(scheduler).getMapsForHost(mapHost);
@@ -147,12 +150,12 @@ public class TestFetcher {
final boolean ENABLE_LOCAL_FETCH = true;
final boolean DISABLE_LOCAL_FETCH = false;
- MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+ MapHost mapHost = new MapHost(HOST, PORT, 0);
FetcherOrderedGrouped fetcher =
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
// when local mode is enabled and host and port matches use local fetch
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -164,12 +167,12 @@ public class TestFetcher {
verify(spyFetcher, never()).copyFromHost(any(MapHost.class));
// if hostname does not match use http
- mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl");
+ mapHost = new MapHost(HOST + "_OTHER", PORT, 0);
fetcher =
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -179,12 +182,12 @@ public class TestFetcher {
verify(spyFetcher, times(1)).copyFromHost(mapHost);
// if port does not match use http
- mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl");
+ mapHost = new MapHost(HOST, PORT + 1, 0);
fetcher =
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -194,11 +197,11 @@ public class TestFetcher {
verify(spyFetcher, times(1)).copyFromHost(mapHost);
//if local fetch is not enabled
- mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+ mapHost = new MapHost(HOST, PORT, 0);
fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -219,11 +222,10 @@ public class TestFetcher {
when(inputContext.getCounters()).thenReturn(new TezCounters());
when(inputContext.getSourceVertexName()).thenReturn("");
- MapHost host = new MapHost(1, HOST + ":" + PORT,
- "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+ MapHost host = new MapHost(HOST, PORT, 1);
FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -361,13 +363,13 @@ public class TestFetcher {
InputContext inputContext = mock(InputContext.class);
when(inputContext.getCounters()).thenReturn(new TezCounters());
when(inputContext.getSourceVertexName()).thenReturn("");
+ when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1));
HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
- final MapHost host = new MapHost(1, HOST + ":" + PORT,
- "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+ final MapHost host = new MapHost(HOST, PORT, 1);
FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
@@ -379,7 +381,7 @@ public class TestFetcher {
doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
doReturn(true).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class));
- URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false);
+ URL url = ShuffleUtils.constructInputURL("http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=", srcAttempts, false);
fetcher.httpConnection = new FakeHttpConnection(url, null, "", null);
doAnswer(new Answer<MapOutput>() {
@@ -452,14 +454,13 @@ public class TestFetcher {
doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class));
HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
- final MapHost host = new MapHost(1, HOST + ":" + PORT,
- "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+ final MapHost host = new MapHost(HOST, PORT, 1);
FetcherOrderedGrouped mockFetcher =
new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr,
false, 0,
null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, true);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, true, false);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -521,12 +522,12 @@ public class TestFetcher {
MergeManager merger = mock(MergeManager.class);
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
Shuffle shuffle = mock(Shuffle.class);
- MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+ MapHost mapHost = new MapHost(HOST, PORT, 0);
FetcherOrderedGrouped fetcher =
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator();
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index de066fe..26aa298 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -153,7 +153,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
0,
"src vertex");
scheduler = spy(realScheduler);
- handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false);
+ handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler);
mergeManager = mock(MergeManager.class);
}
@@ -167,9 +167,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
new InputAttemptIdentifier(inputIdx, attemptNum,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
handler.handleEvents(Collections.singletonList(dme1));
- String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
int partitionId = attemptNum;
- verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1));
+ verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id1));
verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
//Send final_update event.
@@ -178,10 +177,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
new InputAttemptIdentifier(inputIdx, attemptNum,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
handler.handleEvents(Collections.singletonList(dme2));
- baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
partitionId = attemptNum;
assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
- verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2));
+ verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id2));
assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
MapHost host = scheduler.getHost();
@@ -222,7 +220,6 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
//Process attempt #1 first
int attemptNum = 1;
int inputIdx = 1;
- String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum);
handler.handleEvents(Collections.singletonList(dme1));
@@ -231,7 +228,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
new InputAttemptIdentifier(inputIdx, attemptNum,
PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
- verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1));
+ verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1));
assertTrue("Shuffle info events should not be empty for pipelined shuffle",
!scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
@@ -257,10 +254,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
handler.handleEvents(events);
InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0,
PATH_COMPONENT);
- String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
int partitionId = srcIdx;
verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
- eq(baseUri), eq(expectedIdentifier));
+ eq(expectedIdentifier));
assertTrue("Shuffle info events should be empty for regular shuffle codepath",
scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
}
@@ -313,12 +309,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
false);
events.add(dme);
handler.handleEvents(events);
- String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
int partitionId = srcIdx;
InputAttemptIdentifier expectedIdentifier =
new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
- verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
- eq(expectedIdentifier));
+ verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(expectedIdentifier));
}
private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index f7ef309..15cfa48 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -89,7 +89,7 @@ public class TestShuffleScheduler {
for (int i = 0; i < numInputs; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+ scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
@@ -134,7 +134,7 @@ public class TestShuffleScheduler {
for (int i = 0; i < numInputs; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+ scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}
@@ -192,7 +192,7 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
- 10000, i, "hostUrl", inputAttemptIdentifier);
+ 10000, i, inputAttemptIdentifier);
}
//100 succeeds
@@ -202,16 +202,16 @@ public class TestShuffleScheduler {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
}
//99 fails
for (int i = 100; i < 199; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
}
@@ -219,9 +219,9 @@ public class TestShuffleScheduler {
new InputAttemptIdentifier(200, 0, "attempt_");
//Should fail here and report exception as reducer is not healthy
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (200 %
+ totalProducerNodes),
+ 10000, 200), false, true, false);
int minFailurePerHost = conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
@@ -261,7 +261,7 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
- 10000, i, "hostUrl", inputAttemptIdentifier);
+ 10000, i, inputAttemptIdentifier);
}
assertEquals(320, scheduler.remainingMaps.get());
@@ -282,8 +282,8 @@ public class TestShuffleScheduler {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
}
assertEquals(10, scheduler.remainingMaps.get());
@@ -292,8 +292,8 @@ public class TestShuffleScheduler {
for (int i = 190; i < 200; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
}
//Shuffle has not stalled. so no issues.
@@ -304,9 +304,9 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(190, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" +
- (190 % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" +
+ (190 % totalProducerNodes),
+ 10000, 190), false, true, false);
//Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures
verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
@@ -317,16 +317,16 @@ public class TestShuffleScheduler {
for (int i = 190; i < 200; i++) {
inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
}
assertEquals(61, scheduler.failedShufflesSinceLastCompletion);
@@ -338,12 +338,12 @@ public class TestShuffleScheduler {
for (int i = 110; i < 120; i++) {
inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
}
// Should fail now due to fetcherHealthy. (stall has already happened and
@@ -377,7 +377,7 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
- 10000, i, "hostUrl", inputAttemptIdentifier);
+ 10000, i, inputAttemptIdentifier);
}
//319 succeeds
@@ -387,15 +387,15 @@ public class TestShuffleScheduler {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
}
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(319, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
+ 10000, 319), false, true, false);
//stall the shuffle
scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -403,15 +403,15 @@ public class TestShuffleScheduler {
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+ totalProducerNodes),
+ 10000, 319), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+ totalProducerNodes),
+ 10000, 310), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+ totalProducerNodes),
+ 10000, 310), false, true, false);
// failedShufflesSinceLastCompletion has crossed the limits. Throw error
verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -442,7 +442,7 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
- 10000, i, "hostUrl", inputAttemptIdentifier);
+ 10000, i, inputAttemptIdentifier);
}
//Tasks fail in 20% of nodes 3 times, but are able to proceed further
@@ -450,20 +450,20 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
- totalProducerNodes) + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
+ totalProducerNodes), 10000, i), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
- totalProducerNodes) + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
+ totalProducerNodes), 10000, i), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
- totalProducerNodes) + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
+ totalProducerNodes), 10000, i), false, true, false);
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
}
//319 succeeds
@@ -473,15 +473,15 @@ public class TestShuffleScheduler {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
}
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(319, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
+ 10000, 319), false, true, false);
//stall the shuffle (but within limits)
scheduler.lastProgressTime = System.currentTimeMillis() - 100000;
@@ -489,15 +489,15 @@ public class TestShuffleScheduler {
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+ totalProducerNodes),
+ 10000, 319), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+ totalProducerNodes),
+ 10000, 319), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+ totalProducerNodes),
+ 10000, 319), false, true, false);
// failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as
// well. However, it has failed only in one host. So this should proceed
@@ -506,9 +506,9 @@ public class TestShuffleScheduler {
//stall the shuffle (but within limits)
scheduler.lastProgressTime = System.currentTimeMillis() - 300000;
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+ totalProducerNodes),
+ 10000, 319), false, true, false);
verify(shuffle, times(1)).reportException(any(Throwable.class));
}
@@ -537,7 +537,7 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
- 10000, i, "hostUrl", inputAttemptIdentifier);
+ 10000, i, inputAttemptIdentifier);
}
//318 succeeds
@@ -547,15 +547,15 @@ public class TestShuffleScheduler {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
}
//1 fails (last fetch)
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(318, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes),
+ 10000, 318), false, true, false);
//stall the shuffle
scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -563,15 +563,15 @@ public class TestShuffleScheduler {
assertEquals(scheduler.remainingMaps.get(), 1);
//Retry for 3 more times
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
- totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
+ totalProducerNodes),
+ 10000, 318), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
+ totalProducerNodes),
+ 10000, 318), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
+ totalProducerNodes),
+ 10000, 318), false, true, false);
//Shuffle has not received the events completely. So do not bail out yet.
verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -616,7 +616,7 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
- 10000, i, "hostUrl", inputAttemptIdentifier);
+ 10000, i, inputAttemptIdentifier);
}
//10 succeeds
@@ -626,16 +626,16 @@ public class TestShuffleScheduler {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
}
//5 fetches fail once
for (int i = 10; i < 15; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
}
assertTrue(scheduler.failureCounts.size() >= 5);
@@ -648,10 +648,10 @@ public class TestShuffleScheduler {
for (int i = 10; i < 15; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
- scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
- + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+ 10000, i), false, true, false);
}
boolean checkFailedFetchSinceLastCompletion = conf.getBoolean
@@ -692,7 +692,7 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i,
- "hostUrl", inputAttemptIdentifier);
+ inputAttemptIdentifier);
}
//100 succeeds
@@ -703,7 +703,7 @@ public class TestShuffleScheduler {
.createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
scheduler.copySucceeded(inputAttemptIdentifier,
- new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i),
100, 200, startTime + (i * 100), mapOutput, false);
}
@@ -712,16 +712,16 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
scheduler.copyFailed(inputAttemptIdentifier,
- new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i),
false, true, false);
scheduler.copyFailed(inputAttemptIdentifier,
- new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i),
false, true, false);
scheduler.copyFailed(inputAttemptIdentifier,
- new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i),
false, true, false);
scheduler.copyFailed(inputAttemptIdentifier,
- new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ new MapHost("host" + (i % totalProducerNodes), 10000, i),
false, true, false);
}
@@ -754,7 +754,7 @@ public class TestShuffleScheduler {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(0, 0, "attempt_");
- scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier);
+ scheduler.addKnownMapOutput("host0", 10000, 0, inputAttemptIdentifier);
assertTrue(scheduler.pendingHosts.size() == 1);
assertTrue(scheduler.pendingHosts.iterator().next().getState() == MapHost.State.PENDING);
@@ -765,12 +765,13 @@ public class TestShuffleScheduler {
//Should not get host, as it is added to penalty loop
MapHost host = scheduler.getHost();
- assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000"));
+ assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000"));
+
//Refree thread would release it after INITIAL_PENALTY timeout
Thread.sleep(ShuffleScheduler.INITIAL_PENALTY + 1000);
host = scheduler.getHost();
- assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000"));
+ assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000"));
}
@Test(timeout = 5000)
@@ -801,7 +802,7 @@ public class TestShuffleScheduler {
for (int i = 0; i < numInputs; i++) {
InputAttemptIdentifier inputAttemptIdentifier =
new InputAttemptIdentifier(i, 0, "attempt_");
- scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+ scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
identifiers[i] = inputAttemptIdentifier;
}