You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/08/04 02:25:44 UTC

tez git commit: TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation (Saikat via rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 4d381d778 -> 73da831e8


TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation (Saikat via rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73da831e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73da831e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73da831e

Branch: refs/heads/master
Commit: 73da831e8e5acab5e2e044c74a29384bcda4cbc6
Parents: 4d381d7
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Aug 4 05:59:12 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Aug 4 05:59:12 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../orderedgrouped/FetcherOrderedGrouped.java   | 52 +++++++-------
 .../shuffle/orderedgrouped/TestFetcher.java     | 74 ++++++++++++++++++--
 3 files changed, 96 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 303171a..59307b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2646. Add scheduling casual dependency for attempts
 
 ALL CHANGES:
+  TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.
   TEZ-2613. Fetcher(unordered) using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.
   TEZ-2645. Provide standard analyzers for job analysis.
   TEZ-2627. Support for Tez Job Priorities.

http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index a4d38ce..d8be8dd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.tez.http.BaseHttpConnection;
@@ -87,8 +89,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
   @VisibleForTesting
-  LinkedList<InputAttemptIdentifier> remaining;
-
+  Map<String, InputAttemptIdentifier> remaining;
   volatile DataInputStream input;
 
   volatile BaseHttpConnection httpConnection;
@@ -228,25 +229,19 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     retryStartTime = 0;
     // Get completed maps on 'host'
     List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
-
     // Sanity check to catch hosts with only 'OBSOLETE' maps, 
     // especially at the tail of large jobs
     if (srcAttempts.size() == 0) {
       return;
     }
-    
     if(LOG.isDebugEnabled()) {
       LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
         + srcAttempts + ", partitionId: " + currentPartition);
     }
-    
-    // List of maps to be fetched yet
-    remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
-    
+    populateRemainingMap(srcAttempts);
     // Construct the url and connect
-
     try {
-      if (!setupConnection(host, srcAttempts)) {
+      if (!setupConnection(host, remaining.values())) {
         if (stopped) {
           cleanupCurrentConnection(true);
         }
@@ -273,7 +268,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
             return;
           }
           // Connect with retry
-          if (!setupConnection(host, new LinkedList<InputAttemptIdentifier>(remaining))) {
+          if (!setupConnection(host, remaining.values())) {
             if (stopped) {
               cleanupCurrentConnection(true);
               LOG.info("Not reporting connection re-establishment failure since fetcher is stopped");
@@ -310,7 +305,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   }
 
   @VisibleForTesting
-  boolean setupConnection(MapHost host, List<InputAttemptIdentifier> attempts)
+  boolean setupConnection(MapHost host, Collection<InputAttemptIdentifier> attempts)
       throws IOException {
     boolean connectSucceeded = false;
     try {
@@ -347,7 +342,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       // At this point, either the connection failed, or the initial header verification failed.
       // The error does not relate to any specific Input. Report all of them as failed.
       // This ends up indirectly penalizing the host (multiple failures reported on the single host)
-      for(InputAttemptIdentifier left: remaining) {
+      for (InputAttemptIdentifier left : remaining.values()) {
         // Need to be handling temporary glitches ..
         // Report read error to the AM to trigger source failure heuristics
         scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded);
@@ -361,7 +356,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     // Cycle through remaining MapOutputs
     boolean isFirst = true;
     InputAttemptIdentifier first = null;
-    for (InputAttemptIdentifier left : remaining) {
+    for (InputAttemptIdentifier left : remaining.values()) {
       if (isFirst) {
         first = left;
         isFirst = false;
@@ -487,7 +482,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
                               endTime - startTime, mapOutput);
       // Note successful shuffle
-      remaining.remove(srcAttemptId);
+      remaining.remove(srcAttemptId.toString());
       metrics.successFetch();
       return null;
     } catch (IOException ioe) {
@@ -514,12 +509,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                  srcAttemptId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
         if(srcAttemptId == null) {
-          return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+          return remaining.values().toArray(new InputAttemptIdentifier[remaining.values().size()]);
         } else {
           return new InputAttemptIdentifier[] {srcAttemptId};
         }
       }
-      
       LOG.warn("Failed to shuffle output of " + srcAttemptId + 
                " from " + host.getHostIdentifier(), ioe); 
 
@@ -528,7 +522,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       metrics.failedFetch();
       return new InputAttemptIdentifier[] {srcAttemptId};
     }
-
   }
 
   /**
@@ -572,7 +565,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
    * @return true/false, based on if the verification succeeded or not
    */
   private boolean verifySanity(long compressedLength, long decompressedLength,
-      int forReduce, List<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
+      int forReduce, Map<String, InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
     if (compressedLength < 0 || decompressedLength < 0) {
       wrongLengthErrs.increment(1);
       LOG.warn(logIdentifier + " invalid lengths in map output header: id: " +
@@ -592,7 +585,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     }
 
     // Sanity check
-    if (!remaining.contains(srcAttemptId)) {
+    if (remaining.get(srcAttemptId.toString()) == null) {
       wrongMapErrs.increment(1);
       LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
       return false;
@@ -603,7 +596,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   
   private InputAttemptIdentifier getNextRemainingAttempt() {
     if (remaining.size() > 0) {
-      return remaining.iterator().next();
+      return remaining.values().iterator().next();
     } else {
       return null;
     }
@@ -626,10 +619,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     }
 
     // List of maps to be fetched yet
-    remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
+    populateRemainingMap(srcAttempts);
 
     try {
-      final Iterator<InputAttemptIdentifier> iter = remaining.iterator();
+      final Iterator<InputAttemptIdentifier> iter = remaining.values().iterator();
       while (iter.hasNext()) {
         // Avoid fetching more if already stopped
         if (stopped) {
@@ -701,5 +694,16 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     return MapOutput.createLocalDiskMapOutput(srcAttemptId, allocator, filename,
         indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
   }
+
+  @VisibleForTesting
+  void populateRemainingMap(List<InputAttemptIdentifier> origlist) {
+    if (remaining == null) {
+      remaining = new LinkedHashMap<String, InputAttemptIdentifier>(origlist.size());
+    }
+    for (InputAttemptIdentifier id : origlist) {
+      remaining.put(id.toString(), id);
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/73da831e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 385b7b0..7415570 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -42,9 +41,15 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map.Entry;
 
 import com.google.common.collect.Lists;
+
 import org.apache.tez.http.HttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.counters.TezCounter;
@@ -373,7 +378,7 @@ public class TestFetcher {
         new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3")
     );
     doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
-    doReturn(true).when(fetcher).setupConnection(host, srcAttempts);
+    doReturn(true).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class));
 
     URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false);
     fetcher.httpConnection = new FakeHttpConnection(url, null, "", null);
@@ -394,8 +399,7 @@ public class TestFetcher {
       @Override public Void answer(InvocationOnMock invocation) throws Throwable {
         // Emulate host down for 4 seconds.
         Thread.sleep(4000);
-        doReturn(false).when(fetcher).setupConnection(host, srcAttempts);
-
+        doReturn(false).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class));
         // Throw IOException when fetcher tries to connect again to the same node
         throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception");
       }
@@ -407,7 +411,7 @@ public class TestFetcher {
       //ignore
     }
     //setup connection should be called twice (1 for connect and another for retry)
-    verify(fetcher, times(2)).setupConnection(any(MapHost.class), anyList());
+    verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class));
     //since copyMapOutput consistently fails, it should call copyFailed once
     verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class),
           anyBoolean(), anyBoolean());
@@ -458,8 +462,7 @@ public class TestFetcher {
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, true);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
-    fetcher.remaining = Lists.newLinkedList();
-
+    fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
     final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
         new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
         new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
@@ -477,4 +480,61 @@ public class TestFetcher {
       fail();
     }
   }
+
+  @Test(timeout = 1000)
+  public void testInputAttemptIdentifierMap() {
+    InputAttemptIdentifier[] srcAttempts = {
+      new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+          //duplicate entry
+      new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+      // pipeline shuffle based identifiers, with multiple attempts
+      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+      new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
+      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+          false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
+      new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+          false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
+    };
+    InputAttemptIdentifier[] expectedSrcAttempts = {
+      new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+      // pipeline shuffle based identifiers
+      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+      new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
+      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
+      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+          false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
+      new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+          false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
+    };
+
+    Configuration conf = new TezConfiguration();
+    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+    MergeManager merger = mock(MergeManager.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    Shuffle shuffle = mock(Shuffle.class);
+    MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    FetcherOrderedGrouped fetcher =
+        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+            null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
+            wrongLengthErrsCounter, badIdErrsCounter,
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
+    fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
+    Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
+    Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator();
+    int count = 0;
+    while(iterator.hasNext()) {
+      String key = iterator.next().getKey();
+      Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) == 0);
+    }
+  }
 }