You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:40 UTC

[02/43] tez git commit: TEZ-2366. Pig tez MiniTezCluster unit tests fail intermittently after TEZ-2333 (pramachandran)

TEZ-2366. Pig tez MiniTezCluster unit tests fail intermittently after TEZ-2333 (pramachandran)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6a04fa48
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6a04fa48
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6a04fa48

Branch: refs/heads/TEZ-2003
Commit: 6a04fa48cb1113faf640115dcbba9b2270e756f3
Parents: 12ef073
Author: Prakash Ramachandran <pr...@hortonworks.com>
Authored: Wed May 6 19:11:06 2015 +0530
Committer: Prakash Ramachandran <pr...@hortonworks.com>
Committed: Wed May 6 19:11:06 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../runtime/library/common/shuffle/Fetcher.java | 17 ++---
 .../common/shuffle/impl/ShuffleManager.java     | 12 +++-
 .../orderedgrouped/FetcherOrderedGrouped.java   | 67 ++++++++++--------
 .../common/shuffle/orderedgrouped/Shuffle.java  | 11 ++-
 .../library/common/shuffle/TestFetcher.java     | 49 +++++++++++--
 .../impl/TestShuffleInputEventHandlerImpl.java  | 14 +++-
 .../shuffle/orderedgrouped/TestFetcher.java     | 74 +++++++++++++++++++-
 8 files changed, 191 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f060a8c..91dd9c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2366. Pig tez MiniTezCluster unit tests fail intermittently after TEZ-2333
   TEZ-2406. Tez UI: Display per-io counter columns in task and attempt pages under vertex
   TEZ-2384. Add warning message in the case of prewarn under non-session mode.
   TEZ-2415. PMC RDF needs to use asfext:pmc, not asfext:PMC.

http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 48fe0f2..61e0151 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -33,7 +33,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -48,7 +47,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -75,6 +73,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
   private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
   private final Configuration conf;
+  private final int shufflePort;
 
   // Configurable fields.
   private CompressionCodec codec;
@@ -132,7 +131,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       Path lockPath,
       boolean localDiskFetchEnabled,
       boolean sharedFetchEnabled,
-      String localHostname) {
+      String localHostname,
+      int shufflePort) {
     this.fetcherCallback = fetcherCallback;
     this.inputManager = inputManager;
     this.jobTokenSecretMgr = jobTokenSecretManager;
@@ -151,6 +151,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     this.localDirAllocator = localDirAllocator;
     this.lockPath = lockPath;
     this.localHostname = localHostname;
+    this.shufflePort = shufflePort;
 
     try {
       if (this.sharedFetchEnabled) {
@@ -186,7 +187,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
     HostFetchResult hostFetchResult;
 
-    if (localDiskFetchEnabled && host.equals(localHostname)) {
+    if (localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort) {
       hostFetchResult = setupLocalDiskFetch();
     } else if (multiplex) {
       hostFetchResult = doSharedFetch();
@@ -902,10 +903,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     public FetcherBuilder(FetcherCallback fetcherCallback,
         HttpConnectionParams params, FetchedInputAllocator inputManager,
         ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
-        Configuration conf, boolean localDiskFetchEnabled, String localHostname) {
+        Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
           jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
-          false, localHostname);
+          false, localHostname, shufflePort);
     }
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
@@ -914,10 +915,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         Configuration conf, RawLocalFileSystem localFs,
         LocalDirAllocator localDirAllocator, Path lockPath,
         boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
-        String localHostname) {
+        String localHostname, int shufflePort) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
           jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
-          lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname);
+          lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort);
     }
 
     public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {

http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index d47e652..ac7caca 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -161,7 +162,8 @@ public class ShuffleManager implements FetcherCallback {
   private final LocalDirAllocator localDirAllocator;
   private final RawLocalFileSystem localFs;
   private final Path[] localDisks;
-  private final static String localhostName = NetUtils.getHostname();
+  private final String localhostName;
+  private final int shufflePort;
 
   private final TezCounter shufflePhaseTime;
   private final TezCounter firstEventReceived;
@@ -216,7 +218,7 @@ public class ShuffleManager implements FetcherCallback {
 
     int maxConfiguredFetchers = 
         conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
     
     this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
@@ -249,6 +251,10 @@ public class ShuffleManager implements FetcherCallback {
 
     this.localDisks = Iterables.toArray(
         localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
+    this.localhostName = inputContext.getExecutionContext().getHostName();
+    final ByteBuffer shuffleMetaData =
+        inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetaData);
 
     Arrays.sort(this.localDisks);
 
@@ -390,7 +396,7 @@ public class ShuffleManager implements FetcherCallback {
       httpConnectionParams, inputManager, inputContext.getApplicationId(),
         jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
         lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
-        inputContext.getExecutionContext().getHostName());
+        localhostName, shufflePort);
 
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);

http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index fbaabff..60f1c98 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -56,7 +56,7 @@ class FetcherOrderedGrouped extends Thread {
   private final Configuration conf;
   private final boolean localDiskFetchEnabled;
 
-  private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+  private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
   private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
@@ -72,7 +72,7 @@ class FetcherOrderedGrouped extends Thread {
   private final Shuffle shuffle;
   private final int id;
   private final String logIdentifier;
-  private final String localHostname;
+  private final String localShuffleHostPort;
   private static int nextId = 0;
   private int currentPartition = -1;
 
@@ -104,7 +104,8 @@ class FetcherOrderedGrouped extends Thread {
                                CompressionCodec codec,
                                InputContext inputContext, Configuration conf,
                                boolean localDiskFetchEnabled,
-                               String localHostname) throws IOException {
+                               String localHostname,
+                               int shufflePort) throws IOException {
     setDaemon(true);
     this.scheduler = scheduler;
     this.merger = merger;
@@ -134,7 +135,7 @@ class FetcherOrderedGrouped extends Thread {
       this.codec = null;
     }
     this.conf = conf;
-    this.localHostname = localHostname;
+    this.localShuffleHostPort = localHostname + ":" + String.valueOf(shufflePort);
 
     this.localDiskFetchEnabled = localDiskFetchEnabled;
 
@@ -144,37 +145,41 @@ class FetcherOrderedGrouped extends Thread {
     setDaemon(true);
   }  
 
-  public void run() {
+  @VisibleForTesting
+  protected void fetchNext() throws InterruptedException, IOException {
+    MapHost host = null;
     try {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        remaining = null; // Safety.
-        MapHost host = null;
-        try {
-          // If merge is on, block
-          merger.waitForInMemoryMerge();
+      // If merge is on, block
+      merger.waitForInMemoryMerge();
 
-          // In case usedMemory > memorylimit, wait until some memory is released
-          merger.waitForShuffleToMergeMemory();
+      // In case usedMemory > memorylimit, wait until some memory is released
+      merger.waitForShuffleToMergeMemory();
 
-          // Get a host to shuffle from
-          host = scheduler.getHost();
-          metrics.threadBusy();
+      // Get a host to shuffle from
+      host = scheduler.getHost();
+      metrics.threadBusy();
 
-          String hostPort = host.getHostIdentifier();
-          String hostname = hostPort.substring(0, hostPort.indexOf(":"));
-          if (localDiskFetchEnabled && hostname.equals(localHostname)) {
-            setupLocalDiskFetch(host);
-          } else {
-            // Shuffle
-            copyFromHost(host);
-          }
-        } finally {
-          cleanupCurrentConnection(false);
-          if (host != null) {
-            scheduler.freeHost(host);
-            metrics.threadFree();
-          }
-        }
+      String hostPort = host.getHostIdentifier();
+      if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) {
+        setupLocalDiskFetch(host);
+      } else {
+        // Shuffle
+        copyFromHost(host);
+      }
+    } finally {
+      cleanupCurrentConnection(false);
+      if (host != null) {
+        scheduler.freeHost(host);
+        metrics.threadFree();
+      }
+    }
+  }
+
+  public void run() {
+    try {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        remaining = null; // Safety.
+        fetchNext();
       }
     } catch (InterruptedException ie) {
       //TODO: might not be respected when fetcher is in progress / server is busy.  TEZ-711

http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 442f032..ee05378 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -95,6 +96,8 @@ public class Shuffle implements ExceptionReporter {
   private final int ifileReadAheadLength;
   private final int numFetchers;
   private final boolean localDiskFetchEnabled;
+  private final String localHostname;
+  private final int shufflePort;
   
   private AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
   private String throwingThreadName = null;
@@ -158,6 +161,11 @@ public class Shuffle implements ExceptionReporter {
     LocalDirAllocator localDirAllocator = 
         new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
 
+    this.localHostname = inputContext.getExecutionContext().getHostName();
+    final ByteBuffer shuffleMetadata =
+        inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
+
     // TODO TEZ Get rid of Map / Reduce references.
     TezCounter shuffledInputsCounter = 
         inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
@@ -336,8 +344,7 @@ public class Shuffle implements ExceptionReporter {
               FetcherOrderedGrouped
                 fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
                 metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength,
-                codec, inputContext, conf, localDiskFetchEnabled,
-                inputContext.getExecutionContext().getHostName());
+                codec, inputContext, conf, localDiskFetchEnabled, localHostname, shufflePort);
               fetchers.add(fetcher);
               fetcher.start();
             }

http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index e6f0c4a..4ef187d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -38,8 +39,11 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -52,19 +56,21 @@ import org.mockito.stubbing.Answer;
 public class TestFetcher {
   private static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
   private static String HOST = "localhost";
-  private static int PORT = 0;
+  private static int PORT = 41;
 
   @Test(timeout = 3000)
   public void testLocalFetchModeSetting() throws Exception {
     TezConfiguration conf = new TezConfiguration();
-    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
     InputAttemptIdentifier[] srcAttempts = {
         new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1")
     };
     FetcherCallback fetcherCallback = mock(FetcherCallback.class);
+    final boolean ENABLE_LOCAL_FETCH = true;
+    final boolean DISABLE_LOCAL_FETCH = false;
 
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST);
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
     builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
 
@@ -79,10 +85,41 @@ public class TestFetcher {
     verify(fetcher).setupLocalDiskFetch();
     verify(fetcher, never()).doHttpFetch();
 
+    // when enabled and hostname does not match use http fetch.
+    builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+        PORT);
+    builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
+    fetcher = spy(builder.build());
+
+    doReturn(null).when(fetcher).setupLocalDiskFetch();
+    doReturn(hfr).when(fetcher).doHttpFetch();
+    doNothing().when(fetcher).shutdown();
+
+    fetcher.call();
+
+    verify(fetcher, never()).setupLocalDiskFetch();
+    verify(fetcher).doHttpFetch();
+
+    // when enabled and port does not match use http fetch.
+    builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+    builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
+    fetcher = spy(builder.build());
+
+    doReturn(null).when(fetcher).setupLocalDiskFetch();
+    doReturn(hfr).when(fetcher).doHttpFetch();
+    doNothing().when(fetcher).shutdown();
+
+    fetcher.call();
+
+    verify(fetcher, never()).setupLocalDiskFetch();
+    verify(fetcher).doHttpFetch();
+
     // When disabled use http fetch
-    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "false");
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, false, HOST);
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, PORT);
     builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 
@@ -115,7 +152,7 @@ public class TestFetcher {
     int partition = 42;
     FetcherCallback callback = mock(FetcherCallback.class);
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST);
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT);
     builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index 44122a2..c452898 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -49,11 +49,13 @@ import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.junit.Test;
 
@@ -156,10 +158,20 @@ public class TestShuffleInputEventHandlerImpl {
   }
 
 
-  private InputContext createInputContext() {
+  private InputContext createInputContext() throws IOException {
+    DataOutputBuffer port_dob = new DataOutputBuffer();
+    port_dob.writeInt(PORT);
+    final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+
+    ExecutionContext executionContext = mock(ExecutionContext.class);
+    doReturn(HOST).when(executionContext).getHostName();
+
     InputContext inputContext = mock(InputContext.class);
     doReturn(new TezCounters()).when(inputContext).getCounters();
     doReturn("sourceVertex").when(inputContext).getSourceVertexName();
+    doReturn(shuffleMetaData).when(inputContext)
+        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    doReturn(executionContext).when(inputContext).getExecutionContext();
     return inputContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6a04fa48/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 2e826d8..c33905f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -39,9 +40,11 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -68,11 +71,76 @@ public class TestFetcher {
 
   public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
   public static final String HOST = "localhost";
-  public static final int PORT = 0;
+  public static final int PORT = 65;
 
   static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class);
 
   @Test(timeout = 5000)
+  public void testLocalFetchModeSetting1() throws Exception {
+    Configuration conf = new TezConfiguration();
+    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+    MergeManager merger = mock(MergeManager.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    Shuffle shuffle = mock(Shuffle.class);
+
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+    doReturn("src vertex").when(inputContext).getSourceVertexName();
+
+    final boolean ENABLE_LOCAL_FETCH = true;
+    final boolean DISABLE_LOCAL_FETCH = false;
+    MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    FetcherOrderedGrouped
+        fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
+        false, 0, null, inputContext, conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+
+    // when local mode is enabled and host and port matches use local fetch
+    FetcherOrderedGrouped spyFetcher = spy(fetcher);
+    doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
+    doReturn(mapHost).when(scheduler).getHost();
+
+    spyFetcher.fetchNext();
+
+    verify(spyFetcher, times(1)).setupLocalDiskFetch(mapHost);
+    verify(spyFetcher, never()).copyFromHost(any(MapHost.class));
+
+    // if hostname does not match use http
+    spyFetcher = spy(fetcher);
+    mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl");
+    doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
+    doReturn(mapHost).when(scheduler).getHost();
+
+    spyFetcher.fetchNext();
+
+    verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
+    verify(spyFetcher, times(1)).copyFromHost(mapHost);
+
+    // if port does not match use http
+    spyFetcher = spy(fetcher);
+    mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl");
+    doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
+    doReturn(mapHost).when(scheduler).getHost();
+
+    spyFetcher.fetchNext();
+
+    verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
+    verify(spyFetcher, times(1)).copyFromHost(mapHost);
+
+    //if local fetch is not enabled
+    mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
+        false, 0, null, inputContext, conf, DISABLE_LOCAL_FETCH, HOST, PORT);
+    spyFetcher = spy(fetcher);
+    doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
+    doReturn(mapHost).when(scheduler).getHost();
+
+    spyFetcher.fetchNext();
+
+    verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
+    verify(spyFetcher, times(1)).copyFromHost(mapHost);
+  }
+
+  @Test(timeout = 5000)
   public void testSetupLocalDiskFetch() throws Exception {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
@@ -85,7 +153,7 @@ public class TestFetcher {
 
     FetcherOrderedGrouped
         fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
-        false, 0, null, inputContext, conf, true, HOST);
+        false, 0, null, inputContext, conf, true, HOST, PORT);
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
     MapHost host = new MapHost(1, HOST + ":" + PORT,
@@ -228,7 +296,7 @@ public class TestFetcher {
         ShuffleUtils.constructHttpShuffleConnectionParams(conf);
     FetcherOrderedGrouped mockFetcher =
         new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, null,
-            false, 0, null, inputContext, conf, false, HOST);
+            false, 0, null, inputContext, conf, false, HOST, PORT);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
 
     final MapHost host = new MapHost(1, HOST + ":" + PORT,