You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/03/02 21:02:18 UTC

tez git commit: TEZ-3115. Shuffle string handling adds significant memory overhead (jeagles)

Repository: tez
Updated Branches:
  refs/heads/master ac0fd8bb3 -> 3f5a7f35d


TEZ-3115. Shuffle string handling adds significant memory overhead (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3f5a7f35
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3f5a7f35
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3f5a7f35

Branch: refs/heads/master
Commit: 3f5a7f35dd5a8e28fcf56c2e2bdad86f8d7febbf
Parents: ac0fd8b
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Mar 2 14:02:04 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Mar 2 14:02:04 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../library/common/shuffle/ShuffleUtils.java    |  11 +-
 .../orderedgrouped/FetcherOrderedGrouped.java   |  26 ++-
 .../common/shuffle/orderedgrouped/MapHost.java  | 137 +++++++++---
 .../common/shuffle/orderedgrouped/Shuffle.java  |   5 +-
 .../ShuffleInputEventHandlerOrderedGrouped.java |  30 +--
 .../orderedgrouped/ShuffleScheduler.java        |  88 ++++++--
 .../shuffle/orderedgrouped/TestFetcher.java     |  45 ++--
 ...tShuffleInputEventHandlerOrderedGrouped.java |  18 +-
 .../orderedgrouped/TestShuffleScheduler.java    | 213 ++++++++++---------
 10 files changed, 349 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 47913ab..5a7ae58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3115. Shuffle string handling adds significant memory overhead
   TEZ-3151. Expose DAG credentials to plugins.
   TEZ-3149. Tez-tools: Add username in DagInfo.
   TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier.
@@ -395,6 +396,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-3115. Shuffle string handling adds significant memory overhead
   TEZ-3149. Tez-tools: Add username in DagInfo.
   TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier.
   TEZ-3141. mapreduce.task.timeout is not translated to container heartbeat timeout

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index e8bf6ae..013a002 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -190,18 +190,13 @@ public class ShuffleUtils {
     }
   }
 
-  // TODO NEWTEZ handle ssl shuffle
   public static StringBuilder constructBaseURIForShuffleHandler(String host,
       int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
-    return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
-      partition, appId, dagIdentifier, sslShuffle);
-  }
-  
-  public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
-      int partition, String appId, int dagIdentifier, boolean sslShuffle) {
     final String http_protocol = (sslShuffle) ? "https://" : "http://";
     StringBuilder sb = new StringBuilder(http_protocol);
-    sb.append(hostIdentifier);
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
     sb.append("/");
     sb.append("mapOutput?job=");
     sb.append(appId.replace("application", "job"));

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 93f083d..51bdf68 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -72,7 +72,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   private final ExceptionReporter exceptionReporter;
   private final int id;
   private final String logIdentifier;
-  private final String localShuffleHostPort;
+  private final String localShuffleHost;
+  private final int localShufflePort;
+  private final String applicationId;
+ private final int dagId;
   private final MapHost mapHost;
 
   private final int currentPartition;
@@ -82,6 +85,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   private final JobTokenSecretManager jobTokenSecretManager;
 
   final HttpConnectionParams httpConnectionParams;
+  private final boolean sslShuffle;
 
   @VisibleForTesting
   volatile boolean stopped = false;
@@ -118,7 +122,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                                TezCounter wrongMapErrsCounter,
                                TezCounter connectionErrsCounter,
                                TezCounter wrongReduceErrsCounter,
-                               boolean asyncHttp) {
+                               String applicationId,
+                               int dagId,
+                               boolean asyncHttp,
+                               boolean sslShuffle) {
     this.scheduler = scheduler;
     this.allocator = allocator;
     this.metrics = metrics;
@@ -134,6 +141,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     this.wrongMapErrs = wrongMapErrsCounter;
     this.connectionErrs = connectionErrsCounter;
     this.wrongReduceErrs = wrongReduceErrsCounter;
+    this.applicationId = applicationId;
+    this.dagId = dagId;
 
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
@@ -145,9 +154,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       this.codec = null;
     }
     this.conf = conf;
-    this.localShuffleHostPort = localHostname + ":" + String.valueOf(shufflePort);
+    this.localShuffleHost = localHostname;
+    this.localShufflePort = shufflePort;
 
     this.localDiskFetchEnabled = localDiskFetchEnabled;
+    this.sslShuffle = sslShuffle;
 
     this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id;
   }
@@ -157,8 +168,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     try {
       metrics.threadBusy();
 
-      String hostPort = mapHost.getHostIdentifier();
-      if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) {
+      if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) {
         setupLocalDiskFetch(mapHost);
       } else {
         // Shuffle
@@ -319,8 +329,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       throws IOException {
     boolean connectSucceeded = false;
     try {
-      URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts,
-          httpConnectionParams.isKeepAlive());
+      StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host.getHost(),
+          host.getPort(), host.getPartitionId(), applicationId, dagId, sslShuffle);
+      URL url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.isKeepAlive());
       httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams,
           logIdentifier, jobTokenSecretManager);
       connectSucceeded = httpConnection.connect();
@@ -734,6 +745,5 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       remaining.put(id.toString(), id);
     }
   }
-
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
index 3116568..486d8c5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
@@ -25,51 +25,138 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 @Private
 class MapHost {
-  
+
   public static enum State {
     IDLE,               // No map outputs available
     BUSY,               // Map outputs are being fetched
     PENDING,            // Known map outputs which need to be fetched
     PENALIZED           // Host penalized due to shuffle failures
   }
-  
+
+  public static class HostPort {
+
+    final String host;
+    final int port;
+
+    HostPort(String host, int port) {
+      this.host = host;
+      this.port = port;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((host == null) ? 0 : host.hashCode());
+      result = prime * result + port;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      HostPort other = (HostPort) obj;
+      if (host == null) {
+        if (other.host != null)
+          return false;
+      } else if (!host.equals(other.host))
+        return false;
+      if (port != other.port)
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "HostPort [host=" + host + ", port=" + port + "]";
+    }
+  }
+
+  public static class HostPortPartition {
+
+    final String host;
+    final int port;
+    final int partition;
+
+    HostPortPartition(String host, int port, int partition) {
+      this.host = host;
+      this.port = port;
+      this.partition = partition;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((host == null) ? 0 : host.hashCode());
+      result = prime * result + partition;
+      result = prime * result + port;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      HostPortPartition other = (HostPortPartition) obj;
+      if (partition != other.partition)
+        return false;
+      if (host == null) {
+        if (other.host != null)
+          return false;
+      } else if (!host.equals(other.host))
+        return false;
+      if (port != other.port)
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "HostPortPartition [host=" + host + ", port=" + port + ", partition=" + partition + "]";
+    }
+  }
+
   private State state = State.IDLE;
-  private final String hostIdentifier;
-  private final int partitionId;
-  private final String baseUrl;
-  private final String identifier;
+  private final String host;
+  private final int port;
+  private final int partition;
   // Tracks attempt IDs
   private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
   
-  public MapHost(int partitionId, String hostPort, String baseUrl) {
-    this.partitionId = partitionId;
-    this.hostIdentifier = hostPort;
-    this.baseUrl = baseUrl;
-    this.identifier = createIdentifier(hostPort, partitionId);
-  }
-  
-  public static String createIdentifier(String hostName, int partitionId) {
-    return hostName + ":" + Integer.toString(partitionId);
+  public MapHost(String host, int port, int partition) {
+    this.host = host;
+    this.port = port;
+    this.partition = partition;
   }
 
-  public String getIdentifier() {
-    return identifier;
-  }
-  
   public int getPartitionId() {
-    return partitionId;
+    return partition;
   }
 
   public State getState() {
     return state;
   }
 
-  public String getHostIdentifier() {
-    return hostIdentifier;
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
   }
 
-  public String getBaseUrl() {
-    return baseUrl;
+  public String getHostIdentifier() {
+    return host + ":" + port;
   }
 
   public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
@@ -112,7 +199,7 @@ class MapHost {
   
   @Override
   public String toString() {
-    return hostIdentifier;
+    return getHostIdentifier();
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index db43651..fa66b7e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -150,8 +150,6 @@ public class Shuffle implements ExceptionReporter {
         + (codec == null ? "None" : codec.getClass().getName())
         + ", ifileReadAhead: " + ifileReadAhead);
 
-    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-      TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
     startTime = System.currentTimeMillis();
     merger = new MergeManager(
         this.conf,
@@ -188,8 +186,7 @@ public class Shuffle implements ExceptionReporter {
 
     eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
         inputContext,
-        scheduler,
-        sslShuffle);
+        scheduler);
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build());

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 6e6d967..7991485 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -19,16 +19,15 @@
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -49,7 +48,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
   private final ShuffleScheduler scheduler;
   private final InputContext inputContext;
 
-  private final boolean sslShuffle;
 
   private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
   private final AtomicInteger numDmeEvents = new AtomicInteger(0);
@@ -57,10 +55,9 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
   private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
 
   public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
-                                                ShuffleScheduler scheduler, boolean sslShuffle) {
+                                                ShuffleScheduler scheduler) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
-    this.sslShuffle = sslShuffle;
   }
 
   @Override
@@ -103,17 +100,19 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     } 
     int partitionId = dmEvent.getSourceIndex();
+    InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload);
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex()
           + ", attemptNum: " + dmEvent.getVersion() + ", payload: " +
           ShuffleUtils.stringify(shufflePayload));
     }
+
     if (shufflePayload.hasEmptyPartitions()) {
       try {
         byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
         BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
         if (emptyPartitionsBitSet.get(partitionId)) {
-          InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload);
           if (LOG.isDebugEnabled()) {
             LOG.debug(
                 "Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
@@ -129,13 +128,10 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
       }
     }
 
-    InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload);
-
-    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
-    scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
-        partitionId, baseUri.toString(), srcAttemptIdentifier);
+    scheduler.addKnownMapOutput(StringInterner.weakIntern(shufflePayload.getHost()), shufflePayload.getPort(),
+        partitionId, srcAttemptIdentifier);
   }
-  
+
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
     InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
     scheduler.obsoleteInput(taIdentifier);
@@ -144,14 +140,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
     }
   }
 
-  @VisibleForTesting
-  URI getBaseURI(String host, int port, int partitionId) {
-    StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
-      partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle);
-    URI u = URI.create(sb.toString());
-    return u;
-  }
-
   /**
    * Helper method to create InputAttemptIdentifier
    *
@@ -161,7 +149,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
    */
   private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent,
       DataMovementEventPayloadProto shufflePayload) {
-    String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent() : null;
+    String pathComponent = (shufflePayload.hasPathComponent()) ? StringInterner.weakIntern(shufflePayload.getPathComponent()) : null;
     int spillEventId = shufflePayload.getSpillId();
     InputAttemptIdentifier srcAttemptIdentifier = null;
     if (shufflePayload.hasSpillId()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 8cba2a6..2f6e490 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -73,12 +73,58 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPort;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
 
 import com.google.common.collect.Lists;
 
 class ShuffleScheduler {
 
+  public static class PathPartition {
+
+    final String path;
+    final int partition;
+
+    PathPartition(String path, int partition) {
+      this.path = path;
+      this.partition = partition;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((path == null) ? 0 : path.hashCode());
+      result = prime * result + partition;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      PathPartition other = (PathPartition) obj;
+      if (path == null) {
+        if (other.path != null)
+          return false;
+      } else if (!path.equals(other.path))
+        return false;
+      if (partition != other.partition)
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "PathPartition [path=" + path + ", partition=" + partition + "]";
+    }
+  }
+
   @VisibleForTesting
   enum ShuffleErrors {
     IO_ERROR,
@@ -101,11 +147,11 @@ class ShuffleScheduler {
   private final int numInputs;
   private int numFetchedSpills;
   @VisibleForTesting
-  final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+  final Map<HostPortPartition, MapHost> mapLocations = new HashMap<HostPortPartition, MapHost>();
   //TODO Clean this and other maps at some point
   @VisibleForTesting
-  final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap
-      = new ConcurrentHashMap<String, InputAttemptIdentifier>();
+  final ConcurrentMap<PathPartition, InputAttemptIdentifier> pathToIdentifierMap
+      = new ConcurrentHashMap<PathPartition, InputAttemptIdentifier>();
 
   //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is
   // enabled in source.
@@ -122,9 +168,8 @@ class ShuffleScheduler {
   private final Referee referee;
   @VisibleForTesting
   final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>();
-  final Set<String> uniqueHosts = Sets.newHashSet();
-  private final Map<String,IntWritable> hostFailures = 
-    new HashMap<String,IntWritable>();
+  final Set<HostPort> uniqueHosts = Sets.newHashSet();
+  private final Map<HostPort,IntWritable> hostFailures = new HashMap<HostPort,IntWritable>();
   private final InputContext inputContext;
   private final TezCounter shuffledInputsCounter;
   private final TezCounter skippedInputCounter;
@@ -166,7 +211,10 @@ class ShuffleScheduler {
   private final boolean localDiskFetchEnabled;
   private final String localHostname;
   private final int shufflePort;
+  private final String applicationId;
+  private final int dagId;
   private final boolean asyncHttp;
+  private final boolean sslShuffle;
 
   private final TezCounter ioErrsCounter;
   private final TezCounter wrongLengthErrsCounter;
@@ -275,6 +323,8 @@ class ShuffleScheduler {
             TezRuntimeConfiguration
                 .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);
 
+    this.applicationId = inputContext.getApplicationId().toString();
+    this.dagId = inputContext.getDagIdentifier();
     this.localHostname = inputContext.getExecutionContext().getHostName();
     final ByteBuffer shuffleMetadata =
         inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
@@ -311,6 +361,8 @@ class ShuffleScheduler {
     this.startTime = startTime;
     this.lastProgressTime = startTime;
 
+    this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
     this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
     this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
     this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
@@ -477,7 +529,7 @@ class ShuffleScheduler {
 
         failureCounts.remove(srcAttemptIdentifier);
         if (host != null) {
-          hostFailures.remove(host.getHostIdentifier());
+          hostFailures.remove(new HostPort(host.getHost(), host.getPort()));
         }
 
         output.commit();
@@ -691,7 +743,7 @@ class ShuffleScheduler {
   private void penalizeHost(MapHost host, int failures) {
     host.penalize();
 
-    String hostPort = host.getHostIdentifier();
+    HostPort hostPort = new HostPort(host.getHost(), host.getPort());
     // TODO TEZ-922 hostFailures isn't really used for anything apart from
     // hasFailedAcrossNodes().Factor it into error
     // reporting / potential blacklisting of hosts.
@@ -766,7 +818,7 @@ class ShuffleScheduler {
         (int) Math.ceil(numUniqueHosts * hostFailureFraction));
     int total = 0;
     boolean failedAcrossNodes = false;
-    for(String host : uniqueHosts) {
+    for(HostPort host : uniqueHosts) {
       IntWritable failures = hostFailures.get(host);
       if (failures != null && failures.get() > minFailurePerHost) {
         total++;
@@ -926,17 +978,13 @@ class ShuffleScheduler {
   public synchronized void addKnownMapOutput(String inputHostName,
                                              int port,
                                              int partitionId,
-                                             String hostUrl,
                                              InputAttemptIdentifier srcAttempt) {
-    String hostPort = (inputHostName + ":" + String.valueOf(port));
-    uniqueHosts.add(hostPort);
-    String identifier = MapHost.createIdentifier(hostPort, partitionId);
-
+    uniqueHosts.add(new HostPort(inputHostName, port));
+    HostPortPartition identifier = new HostPortPartition(inputHostName, port, partitionId);
 
     MapHost host = mapLocations.get(identifier);
     if (host == null) {
-      host = new MapHost(partitionId, hostPort, hostUrl);
-      assert identifier.equals(host.getIdentifier());
+      host = new MapHost(inputHostName, port, partitionId);
       mapLocations.put(identifier, host);
     }
 
@@ -1150,8 +1198,8 @@ class ShuffleScheduler {
     
   }
   
-  private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
-    return path + "_" + reduceId;
+  private PathPartition getIdentifierFromPathAndReduceId(String path, int reduceId) {
+    return new PathPartition(path, reduceId);
   }
   
   /**
@@ -1274,7 +1322,7 @@ class ShuffleScheduler {
                 count++;
                 if (LOG.isDebugEnabled()) {
                   LOG.debug(srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}",
-                      mapHost.getIdentifier());
+                      mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId());
                 }
                 FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost);
                 runningFetchers.add(fetcherOrderedGrouped);
@@ -1299,7 +1347,7 @@ class ShuffleScheduler {
         shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
         codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
         ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
-        connectionErrsCounter, wrongReduceErrsCounter, asyncHttp);
+        connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle);
   }
 
   private class FetchFutureCallback implements FutureCallback<Void> {

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 20fb9a9..89d35f4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -57,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -79,6 +80,8 @@ public class TestFetcher {
   public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
   public static final String HOST = "localhost";
   public static final int PORT = 65;
+  public static final int DAG_ID = 1;
+  public static final String APP_ID = "application_1234_1";
 
   private TezCounters tezCounters = new TezCounters();
   private TezCounter ioErrsCounter = tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
@@ -114,7 +117,7 @@ public class TestFetcher {
     doReturn(new TezCounters()).when(inputContext).getCounters();
     doReturn("src vertex").when(inputContext).getSourceVertexName();
 
-    MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    MapHost mapHost = new MapHost(HOST, PORT, 0);
     InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");
     mapHost.addKnownMap(inputAttemptIdentifier);
     List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier);
@@ -124,7 +127,7 @@ public class TestFetcher {
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
 
     fetcher.call();
     verify(scheduler).getMapsForHost(mapHost);
@@ -147,12 +150,12 @@ public class TestFetcher {
 
     final boolean ENABLE_LOCAL_FETCH = true;
     final boolean DISABLE_LOCAL_FETCH = false;
-    MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    MapHost mapHost = new MapHost(HOST, PORT, 0);
     FetcherOrderedGrouped fetcher =
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
 
     // when local mode is enabled and host and port matches use local fetch
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -164,12 +167,12 @@ public class TestFetcher {
     verify(spyFetcher, never()).copyFromHost(any(MapHost.class));
 
     // if hostname does not match use http
-    mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl");
+    mapHost = new MapHost(HOST + "_OTHER", PORT, 0);
     fetcher =
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -179,12 +182,12 @@ public class TestFetcher {
     verify(spyFetcher, times(1)).copyFromHost(mapHost);
 
     // if port does not match use http
-    mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl");
+    mapHost = new MapHost(HOST, PORT + 1, 0);
     fetcher =
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -194,11 +197,11 @@ public class TestFetcher {
     verify(spyFetcher, times(1)).copyFromHost(mapHost);
 
     //if local fetch is not enabled
-    mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    mapHost = new MapHost(HOST, PORT, 0);
     fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
         wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -219,11 +222,10 @@ public class TestFetcher {
     when(inputContext.getCounters()).thenReturn(new TezCounters());
     when(inputContext.getSourceVertexName()).thenReturn("");
 
-    MapHost host = new MapHost(1, HOST + ":" + PORT,
-        "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+    MapHost host = new MapHost(HOST, PORT, 1);
     FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
 
@@ -361,13 +363,13 @@ public class TestFetcher {
     InputContext inputContext = mock(InputContext.class);
     when(inputContext.getCounters()).thenReturn(new TezCounters());
     when(inputContext.getSourceVertexName()).thenReturn("");
+    when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1));
 
     HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
-    final MapHost host = new MapHost(1, HOST + ":" + PORT,
-        "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+    final MapHost host = new MapHost(HOST, PORT, 1);
     FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
 
 
@@ -379,7 +381,7 @@ public class TestFetcher {
     doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
     doReturn(true).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class));
 
-    URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false);
+    URL url = ShuffleUtils.constructInputURL("http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=", srcAttempts, false);
     fetcher.httpConnection = new FakeHttpConnection(url, null, "", null);
 
     doAnswer(new Answer<MapOutput>() {
@@ -452,14 +454,13 @@ public class TestFetcher {
     doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class));
 
     HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
-    final MapHost host = new MapHost(1, HOST + ":" + PORT,
-        "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+    final MapHost host = new MapHost(HOST, PORT, 1);
     FetcherOrderedGrouped mockFetcher =
         new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr,
             false, 0,
             null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, true);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, true, false);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
     fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
     final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -521,12 +522,12 @@ public class TestFetcher {
     MergeManager merger = mock(MergeManager.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
-    MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    MapHost mapHost = new MapHost(HOST, PORT, 0);
     FetcherOrderedGrouped fetcher =
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
     fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
     Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
     Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator();

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index de066fe..26aa298 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -153,7 +153,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         0,
         "src vertex");
     scheduler = spy(realScheduler);
-    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false);
+    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler);
     mergeManager = mock(MergeManager.class);
   }
 
@@ -167,9 +167,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
     handler.handleEvents(Collections.singletonList(dme1));
-    String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
     int partitionId = attemptNum;
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1));
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id1));
     verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
 
     //Send final_update event.
@@ -178,10 +177,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
     handler.handleEvents(Collections.singletonList(dme2));
-    baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
     partitionId = attemptNum;
     assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2));
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id2));
     assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
 
     MapHost host = scheduler.getHost();
@@ -222,7 +220,6 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     //Process attempt #1 first
     int attemptNum = 1;
     int inputIdx = 1;
-    String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
 
     Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum);
     handler.handleEvents(Collections.singletonList(dme1));
@@ -231,7 +228,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
 
-    verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1));
+    verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1));
     assertTrue("Shuffle info events should not be empty for pipelined shuffle",
         !scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
 
@@ -257,10 +254,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     handler.handleEvents(events);
     InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0,
         PATH_COMPONENT);
-    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
     int partitionId = srcIdx;
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
-        eq(baseUri), eq(expectedIdentifier));
+        eq(expectedIdentifier));
     assertTrue("Shuffle info events should be empty for regular shuffle codepath",
         scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
   }
@@ -313,12 +309,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         false);
     events.add(dme);
     handler.handleEvents(events);
-    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
     int partitionId = srcIdx;
     InputAttemptIdentifier expectedIdentifier =
         new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
-        eq(expectedIdentifier));
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(expectedIdentifier));
   }
 
   private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index f7ef309..15cfa48 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -89,7 +89,7 @@ public class TestShuffleScheduler {
       for (int i = 0; i < numInputs; i++) {
         InputAttemptIdentifier inputAttemptIdentifier =
             new InputAttemptIdentifier(i, 0, "attempt_");
-        scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+        scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }
 
@@ -134,7 +134,7 @@ public class TestShuffleScheduler {
       for (int i = 0; i < numInputs; i++) {
         InputAttemptIdentifier inputAttemptIdentifier =
             new InputAttemptIdentifier(i, 0, "attempt_");
-        scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+        scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }
 
@@ -192,7 +192,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
-          10000, i, "hostUrl", inputAttemptIdentifier);
+          10000, i, inputAttemptIdentifier);
     }
 
     //100 succeeds
@@ -202,16 +202,16 @@ public class TestShuffleScheduler {
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
-      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //99 fails
     for (int i = 100; i < 199; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
     }
 
 
@@ -219,9 +219,9 @@ public class TestShuffleScheduler {
         new InputAttemptIdentifier(200, 0, "attempt_");
 
     //Should fail here and report exception as reducer is not healthy
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (200 %
+        totalProducerNodes),
+        10000, 200), false, true, false);
 
     int minFailurePerHost = conf.getInt(
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
@@ -261,7 +261,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
-          10000, i, "hostUrl", inputAttemptIdentifier);
+          10000, i, inputAttemptIdentifier);
     }
     assertEquals(320, scheduler.remainingMaps.get());
 
@@ -282,8 +282,8 @@ public class TestShuffleScheduler {
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
-      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     assertEquals(10, scheduler.remainingMaps.get());
@@ -292,8 +292,8 @@ public class TestShuffleScheduler {
     for (int i = 190; i < 200; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
     }
 
     //Shuffle has not stalled. so no issues.
@@ -304,9 +304,9 @@ public class TestShuffleScheduler {
 
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(190, 0, "attempt_");
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" +
-        (190 % totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" +
+        (190 % totalProducerNodes),
+        10000, 190), false, true, false);
 
     //Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures
     verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
@@ -317,16 +317,16 @@ public class TestShuffleScheduler {
     for (int i = 190; i < 200; i++) {
       inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
     }
 
     assertEquals(61, scheduler.failedShufflesSinceLastCompletion);
@@ -338,12 +338,12 @@ public class TestShuffleScheduler {
     for (int i = 110; i < 120; i++) {
       inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
     }
 
     // Should fail now due to fetcherHealthy. (stall has already happened and
@@ -377,7 +377,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
-          10000, i, "hostUrl", inputAttemptIdentifier);
+          10000, i, inputAttemptIdentifier);
     }
 
     //319 succeeds
@@ -387,15 +387,15 @@ public class TestShuffleScheduler {
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
-      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(319, 0, "attempt_");
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
+        10000, 319), false, true, false);
 
     //stall the shuffle
     scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -403,15 +403,15 @@ public class TestShuffleScheduler {
     assertEquals(scheduler.remainingMaps.get(), 1);
 
     //Retry for 3 more times
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+        totalProducerNodes),
+        10000, 319), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+        totalProducerNodes),
+        10000, 310), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+        totalProducerNodes),
+        10000, 310), false, true, false);
 
     // failedShufflesSinceLastCompletion has crossed the limits. Throw error
     verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -442,7 +442,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
-          10000, i, "hostUrl", inputAttemptIdentifier);
+          10000, i, inputAttemptIdentifier);
     }
 
     //Tasks fail in 20% of nodes 3 times, but are able to proceed further
@@ -450,20 +450,20 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
 
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
-          totalProducerNodes) + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
+          totalProducerNodes), 10000, i), false, true, false);
 
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
-          totalProducerNodes) + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
+          totalProducerNodes), 10000, i), false, true, false);
 
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
-          totalProducerNodes) + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i %
+          totalProducerNodes), 10000, i), false, true, false);
 
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
-      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
       //319 succeeds
@@ -473,15 +473,15 @@ public class TestShuffleScheduler {
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
-      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(319, 0, "attempt_");
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes),
+        10000, 319), false, true, false);
 
     //stall the shuffle (but within limits)
     scheduler.lastProgressTime = System.currentTimeMillis() - 100000;
@@ -489,15 +489,15 @@ public class TestShuffleScheduler {
     assertEquals(scheduler.remainingMaps.get(), 1);
 
     //Retry for 3 more times
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+        totalProducerNodes),
+        10000, 319), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+        totalProducerNodes),
+        10000, 319), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+        totalProducerNodes),
+        10000, 319), false, true, false);
 
     // failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as
     // well. However, it has failed only in one host. So this should proceed
@@ -506,9 +506,9 @@ public class TestShuffleScheduler {
 
     //stall the shuffle (but within limits)
     scheduler.lastProgressTime = System.currentTimeMillis() - 300000;
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 %
+        totalProducerNodes),
+        10000, 319), false, true, false);
     verify(shuffle, times(1)).reportException(any(Throwable.class));
 
   }
@@ -537,7 +537,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
-          10000, i, "hostUrl", inputAttemptIdentifier);
+          10000, i, inputAttemptIdentifier);
     }
 
     //318 succeeds
@@ -547,15 +547,15 @@ public class TestShuffleScheduler {
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
-      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(318, 0, "attempt_");
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes),
+        10000, 318), false, true, false);
 
     //stall the shuffle
     scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
@@ -563,15 +563,15 @@ public class TestShuffleScheduler {
     assertEquals(scheduler.remainingMaps.get(), 1);
 
     //Retry for 3 more times
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
-    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
-        totalProducerNodes)
-        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
+        totalProducerNodes),
+        10000, 318), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
+        totalProducerNodes),
+        10000, 318), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 %
+        totalProducerNodes),
+        10000, 318), false, true, false);
 
     //Shuffle has not received the events completely. So do not bail out yet.
     verify(shuffle, times(0)).reportException(any(Throwable.class));
@@ -616,7 +616,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
-          10000, i, "hostUrl", inputAttemptIdentifier);
+          10000, i, inputAttemptIdentifier);
     }
 
     //10 succeeds
@@ -626,16 +626,16 @@ public class TestShuffleScheduler {
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
-      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), 100, 200, startTime + (i * 100), mapOutput, false);
     }
 
     //5 fetches fail once
     for (int i = 10; i < 15; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
     }
 
     assertTrue(scheduler.failureCounts.size() >= 5);
@@ -648,10 +648,10 @@ public class TestShuffleScheduler {
     for (int i = 10; i < 15; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
-      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
-          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes),
+          10000, i), false, true, false);
     }
 
     boolean checkFailedFetchSinceLastCompletion = conf.getBoolean
@@ -692,7 +692,7 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i,
-          "hostUrl", inputAttemptIdentifier);
+          inputAttemptIdentifier);
     }
 
     //100 succeeds
@@ -703,7 +703,7 @@ public class TestShuffleScheduler {
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
       scheduler.copySucceeded(inputAttemptIdentifier,
-          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i),
           100, 200, startTime + (i * 100), mapOutput, false);
     }
 
@@ -712,16 +712,16 @@ public class TestShuffleScheduler {
       InputAttemptIdentifier inputAttemptIdentifier =
           new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i),
           false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i),
           false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i),
           false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier,
-          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          new MapHost("host" + (i % totalProducerNodes), 10000, i),
           false, true, false);
     }
 
@@ -754,7 +754,7 @@ public class TestShuffleScheduler {
 
     InputAttemptIdentifier inputAttemptIdentifier =
         new InputAttemptIdentifier(0, 0, "attempt_");
-    scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier);
+    scheduler.addKnownMapOutput("host0", 10000, 0, inputAttemptIdentifier);
 
     assertTrue(scheduler.pendingHosts.size() == 1);
     assertTrue(scheduler.pendingHosts.iterator().next().getState() == MapHost.State.PENDING);
@@ -765,12 +765,13 @@ public class TestShuffleScheduler {
 
     //Should not get host, as it is added to penalty loop
     MapHost host = scheduler.getHost();
-    assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000"));
+    assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000"));
+
 
     //Refree thread would release it after INITIAL_PENALTY timeout
     Thread.sleep(ShuffleScheduler.INITIAL_PENALTY + 1000);
     host = scheduler.getHost();
-    assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000"));
+    assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000"));
   }
 
   @Test(timeout = 5000)
@@ -801,7 +802,7 @@ public class TestShuffleScheduler {
       for (int i = 0; i < numInputs; i++) {
         InputAttemptIdentifier inputAttemptIdentifier =
             new InputAttemptIdentifier(i, 0, "attempt_");
-        scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
+        scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }