You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/02/02 21:56:56 UTC

tez git commit: TEZ-3599. Unordered Fetcher can hang if empty partitions are present (Kuhu Shukla via jeagles)

Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 bc3c17a57 -> 0cf1ce26c


TEZ-3599. Unordered Fetcher can hang if empty partitions are present (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0cf1ce26
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0cf1ce26
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0cf1ce26

Branch: refs/heads/TEZ-3334
Commit: 0cf1ce26c96139abd726e8e00cfcfacfafd2b4f8
Parents: bc3c17a
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Thu Feb 2 15:56:43 2017 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Thu Feb 2 15:56:43 2017 -0600

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |   1 +
 .../runtime/library/common/shuffle/Fetcher.java | 114 +++++++++++--------
 .../common/shuffle/impl/ShuffleManager.java     |  28 +++--
 .../library/common/shuffle/TestFetcher.java     |  46 +++++---
 .../common/shuffle/impl/TestShuffleManager.java |   2 +-
 .../shuffle/orderedgrouped/TestFetcher.java     |   4 +-
 6 files changed, 122 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 6c9a858..8416cc9 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3599. Unordered Fetcher can hang if empty partitions are present
   TEZ-3596. Number of Empty DME logged for Composite fetch is too high
   TEZ-3597. Composite Fetch hangs on certain DME empty events.
   TEZ-3595. Composite Fetch account error for disk direct

http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 7b5ca17..a083daa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -584,6 +584,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     // yet_to_be_fetched list and marking the failed tasks.
     InputAttemptIdentifier[] failedInputs = null;
     while (!srcAttemptsRemaining.isEmpty() && failedInputs == null) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          srcAttemptsRemaining.entrySet().iterator().next().getValue();
       if (isShutDown.get()) {
         shutdownInternal(true);
         if (isDebugEnabled) {
@@ -595,6 +597,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       }
       try {
         failedInputs = fetchInputs(input, callback);
+        if(failedInputs == null || failedInputs.length == 0) {
+         srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
+        }
       } catch (FetcherReadTimeoutException e) {
         //clean up connection
         shutdownInternal(true);
@@ -635,6 +640,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
     Iterator<Entry<String, InputAttemptIdentifier>> iterator = srcAttemptsRemaining.entrySet().iterator();
     while (iterator.hasNext()) {
+      boolean hasFailures = false;
       if (isShutDown.get()) {
         if (isDebugEnabled) {
           LOG.debug(
@@ -643,54 +649,67 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         break;
       }
       InputAttemptIdentifier srcAttemptId = iterator.next().getValue();
-      long startTime = System.currentTimeMillis();
+      for (int curPartition = 0; curPartition < partitionCount; curPartition++) {
+        int reduceId = curPartition + partition;
+        srcAttemptId = pathToAttemptMap.get(new PathPartition(srcAttemptId.getPathComponent(), reduceId));
+        long startTime = System.currentTimeMillis();
 
-      FetchedInput fetchedInput = null;
-      try {
-        TezIndexRecord idxRecord;
-        // for missing files, this will throw an exception
-        idxRecord = getTezIndexRecord(srcAttemptId);
-
-        fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
-            idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId,
-            getShuffleInputFileName(srcAttemptId.getPathComponent(), null), conf,
-            new FetchedInputCallback() {
-              @Override
-              public void fetchComplete(FetchedInput fetchedInput) {}
-
-              @Override
-              public void fetchFailed(FetchedInput fetchedInput) {}
-
-              @Override
-              public void freeResources(FetchedInput fetchedInput) {}
-            });
-        if (isDebugEnabled) {
-          LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
-              + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
-              + " to " + fetchedInput.getType());
-        }
-
-        long endTime = System.currentTimeMillis();
-        fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
-            idxRecord.getRawLength(), (endTime - startTime));
-        iterator.remove();
-      } catch (IOException e) {
-        cleanupFetchedInput(fetchedInput);
-        if (isShutDown.get()) {
+        FetchedInput fetchedInput = null;
+        try {
+          TezIndexRecord idxRecord;
+          // for missing files, this will throw an exception
+          idxRecord = getTezIndexRecord(srcAttemptId, reduceId);
+
+          fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
+              idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId,
+              getShuffleInputFileName(srcAttemptId.getPathComponent(), null),
+              conf,
+              new FetchedInputCallback() {
+                @Override
+                public void fetchComplete(FetchedInput fetchedInput) {
+                }
+
+                @Override
+                public void fetchFailed(FetchedInput fetchedInput) {
+                }
+
+                @Override
+                public void freeResources(FetchedInput fetchedInput) {
+                }
+              });
           if (isDebugEnabled) {
-            LOG.debug(
-                "Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId +
-                    " from host " +
-                    host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+            LOG.debug("fetcher" + " about to shuffle output of srcAttempt (direct disk)" + srcAttemptId
+                + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength()
+                + " to " + fetchedInput.getType());
+          }
+
+          long endTime = System.currentTimeMillis();
+          fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, idxRecord.getPartLength(),
+              idxRecord.getRawLength(), (endTime - startTime));
+        } catch (IOException e) {
+          hasFailures = true;
+          cleanupFetchedInput(fetchedInput);
+          if (isShutDown.get()) {
+            if (isDebugEnabled) {
+              LOG.debug(
+                  "Already shutdown. Ignoring Local Fetch Failure for " +
+                      srcAttemptId +
+                      " from host " +
+                      host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
+            }
+            break;
+          }
+          if (failMissing) {
+            LOG.warn(
+                "Failed to shuffle output of " + srcAttemptId + " from " +
+                    host + "(local fetch)",
+                e);
           }
-          break;
-        }
-        if (failMissing) {
-          LOG.warn(
-              "Failed to shuffle output of " + srcAttemptId + " from " + host + "(local fetch)",
-              e);
         }
       }
+      if(!hasFailures) {
+        iterator.remove();
+      }
     }
 
     InputAttemptIdentifier[] failedFetches = null;
@@ -713,7 +732,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   }
 
   @VisibleForTesting
-  protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws
+  protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId, int partition) throws
       IOException {
     TezIndexRecord idxRecord;
     Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
@@ -745,6 +764,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf);
   }
 
+  @VisibleForTesting
+  public Map<PathPartition, InputAttemptIdentifier> getPathToAttemptMap() {
+    return pathToAttemptMap;
+  }
+
   static class HostFetchResult {
     private final FetchResult fetchResult;
     private final InputAttemptIdentifier[] failedInputs;
@@ -946,8 +970,6 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
             compressedLength, decompressedLength, (endTime - startTime));
 
         // Note successful shuffle
-        srcAttemptsRemaining.remove(srcAttemptId.toString());
-
         // metrics.successFetch();
       }
     } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 3964431..a23ce72 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -117,7 +117,7 @@ public class ShuffleManager implements FetcherCallback {
   private final BlockingQueue<FetchedInput> completedInputs;
   private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
   @VisibleForTesting
-  final Set<Integer> completedInputSet;
+  final BitSet completedInputSet;
   private final ConcurrentMap<HostPort, InputHost> knownSrcHosts;
   private final BlockingQueue<InputHost> pendingHosts;
   private final Set<InputAttemptIdentifier> obsoletedInputs;
@@ -217,7 +217,7 @@ public class ShuffleManager implements FetcherCallback {
     
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
   
-    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>(numInputs));
+    completedInputSet = new BitSet(numInputs);
     /**
      * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt.
      * We do not know upfront the number of spills from source.
@@ -445,9 +445,17 @@ public class ShuffleManager implements FetcherCallback {
       }
 
       // Avoid adding attempts which have already completed.
-      if (completedInputSet.contains(input.getInputIdentifier())) {
-        inputIter.remove();
-        continue;
+      if(input instanceof CompositeInputAttemptIdentifier) {
+        if (completedInputSet.nextClearBit(input.getInputIdentifier()) >=
+            input.getInputIdentifier() + ((CompositeInputAttemptIdentifier) input).getInputIdentifierCount()) {
+          inputIter.remove();
+          continue;
+        }
+      } else {
+        if (completedInputSet.get(input.getInputIdentifier())) {
+          inputIter.remove();
+          continue;
+        }
       }
       // Avoid adding attempts which have been marked as OBSOLETE 
       if (obsoletedInputs.contains(input)) {
@@ -531,9 +539,9 @@ public class ShuffleManager implements FetcherCallback {
       LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
     }
     
-    if (!completedInputSet.contains(inputIdentifier)) {
+    if (!completedInputSet.get(inputIdentifier)) {
       synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
+        if (!completedInputSet.get(inputIdentifier)) {
           NullFetchedInput fetchedInput = new NullFetchedInput(srcAttemptIdentifier);
           if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
             registerCompletedInput(fetchedInput);
@@ -631,9 +639,9 @@ public class ShuffleManager implements FetcherCallback {
     
     inputContext.notifyProgress();
     boolean committed = false;
-    if (!completedInputSet.contains(inputIdentifier)) {
+    if (!completedInputSet.get(inputIdentifier)) {
       synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
+        if (!completedInputSet.get(inputIdentifier)) {
           fetchedInput.commit();
           committed = true;
           ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
@@ -710,7 +718,7 @@ public class ShuffleManager implements FetcherCallback {
   private void adjustCompletedInputs(FetchedInput fetchedInput) {
     lock.lock();
     try {
-      completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
+      completedInputSet.set(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
 
       int numComplete = numCompletedInputs.incrementAndGet();
       if (numComplete == numInputs) {

http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 17a065c..b031154 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.library.common.shuffle;
 
+import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -31,6 +33,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -138,13 +141,13 @@ public class TestFetcher {
 
   @Test(timeout = 3000)
   public void testSetupLocalDiskFetch() throws Exception {
-
-    InputAttemptIdentifier[] srcAttempts = {
-        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
-        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
-        new InputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2"),
-        new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3"),
-        new InputAttemptIdentifier(4, 5, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4")
+    
+    CompositeInputAttemptIdentifier[] srcAttempts = {
+        new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1),
+        new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1),
+        new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1),
+        new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1),
+        new CompositeInputAttemptIdentifier(4, 5, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1)
     };
     final int FIRST_FAILED_ATTEMPT_IDX = 2;
     final int SECOND_FAILED_ATTEMPT_IDX = 4;
@@ -156,10 +159,25 @@ public class TestFetcher {
     FetcherCallback callback = mock(FetcherCallback.class);
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
         ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
-        false, true, false);
-    builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts));
+        false, true, true);
+    ArrayList<InputAttemptIdentifier> inputAttemptIdentifiers = new ArrayList<>();
+    for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) {
+      for(int i=0;i<compositeInputAttemptIdentifier.getInputIdentifierCount();i++) {
+        inputAttemptIdentifiers.add(compositeInputAttemptIdentifier.expand(i));
+      }
+    }
+    ArrayList<InputAttemptIdentifier> list = new ArrayList<InputAttemptIdentifier>();
+    list.addAll(Arrays.asList(srcAttempts));
+    builder.assignWork(HOST, PORT, partition, 1, list);
     Fetcher fetcher = spy(builder.build());
-
+    for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) {
+      for(int i=0;i<compositeInputAttemptIdentifier.getInputIdentifierCount();i++) {
+        inputAttemptIdentifiers.add(compositeInputAttemptIdentifier.expand(i));
+        Fetcher.PathPartition pathPartition =
+            new Fetcher.PathPartition(compositeInputAttemptIdentifier.getPathComponent(),partition + i);
+        fetcher.getPathToAttemptMap().put(pathPartition, compositeInputAttemptIdentifier.expand(i));
+      }
+    }
     doAnswer(new Answer<Path>() {
       @Override
       public Path answer(InvocationOnMock invocation) throws Throwable {
@@ -183,7 +201,7 @@ public class TestFetcher {
         // match with params for copySucceeded below.
         return new TezIndexRecord(p * 10, p * 1000, p * 100);
       }
-    }).when(fetcher).getTezIndexRecord(any(InputAttemptIdentifier.class));
+    }).when(fetcher).getTezIndexRecord(any(InputAttemptIdentifier.class), anyInt());
 
     doNothing().when(fetcher).shutdown();
     doNothing().when(callback).fetchSucceeded(anyString(), any(InputAttemptIdentifier.class),
@@ -214,14 +232,14 @@ public class TestFetcher {
         srcAttempts[SECOND_FAILED_ATTEMPT_IDX]);
   }
 
-  protected void verifyFetchSucceeded(FetcherCallback callback, InputAttemptIdentifier srcAttempId, Configuration conf) throws IOException {
+  protected void verifyFetchSucceeded(FetcherCallback callback, CompositeInputAttemptIdentifier srcAttempId, Configuration conf) throws IOException {
     String pathComponent = srcAttempId.getPathComponent();
     int len = pathComponent.length();
     long p = Long.valueOf(pathComponent.substring(len - 1, len));
     ArgumentCaptor<LocalDiskFetchedInput> capturedFetchedInput =
         ArgumentCaptor.forClass(LocalDiskFetchedInput.class);
     verify(callback)
-        .fetchSucceeded(eq(HOST), eq(srcAttempId), capturedFetchedInput.capture(), eq(p * 100),
+        .fetchSucceeded(eq(HOST), eq(srcAttempId.expand(0)), capturedFetchedInput.capture(), eq(p * 100),
             eq(p * 1000), anyLong());
     LocalDiskFetchedInput f = capturedFetchedInput.getValue();
     Assert.assertEquals("success callback filename", f.getInputFile().toString(),
@@ -230,7 +248,7 @@ public class TestFetcher {
     Assert.assertEquals("success callback filesystem", f.getStartOffset(), p * 10);
     Assert.assertEquals("success callback raw size", f.getActualSize(), p * 1000);
     Assert.assertEquals("success callback compressed size", f.getCompressedSize(), p * 100);
-    Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId);
+    Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId.expand(0));
     Assert.assertEquals("success callback type", f.getType(), FetchedInput.Type.DISK_DIRECT);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index f026cb2..b3b8688 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -220,7 +220,7 @@ public class TestShuffleManager {
     }
 
     public int getNumOfCompletedInputs() {
-      return completedInputSet.size();
+      return completedInputSet.cardinality();
     }
 
     boolean isFetcherExecutorShutdown() {

http://git-wip-us.apache.org/repos/asf/tez/blob/0cf1ce26/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 3686d17..54b0279 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -107,7 +107,7 @@ public class TestFetcher {
 
   static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class);
 
-  @Test (timeout = 5000)
+  @Test(timeout = 5000)
   public void testInputsReturnedOnConnectionException() throws Exception {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
@@ -333,7 +333,7 @@ public class TestFetcher {
     verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
   }
 
-  @Test(timeout = 5000000)
+  @Test(timeout = 5000)
   public void testSetupLocalDiskFetchAutoReduce() throws Exception {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);