You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tg...@apache.org on 2012/07/27 03:48:09 UTC

svn commit: r1366258 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/o...

Author: tgraves
Date: Fri Jul 27 01:48:09 2012
New Revision: 1366258

URL: http://svn.apache.org/viewvc?rev=1366258&view=rev
Log:
 MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans via tgraves)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1366258&r1=1366257&r2=1366258&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Jul 27 01:48:09 2012
@@ -757,6 +757,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4467. IndexCache failures due to missing synchronization
     (Kihwal Lee via tgraves)
 
+    MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans
+    via tgraves)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1366258&r1=1366257&r2=1366258&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Jul 27 01:48:09 2012
@@ -21,11 +21,12 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.HttpURLConnection;
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -53,7 +54,8 @@ import org.apache.hadoop.mapreduce.task.
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-@SuppressWarnings({"deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
 class Fetcher<K,V> extends Thread {
   
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -199,6 +201,7 @@ class Fetcher<K,V> extends Thread {
     }
   }
 
+  @VisibleForTesting
   protected HttpURLConnection openConnection(URL url) throws IOException {
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     if (sslShuffle) {
@@ -209,17 +212,18 @@ class Fetcher<K,V> extends Thread {
         throw new IOException(ex);
       }
       httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
-      }
-      return conn;
     }
-    
+    return conn;
+  }
+  
   /**
    * The crux of the matter...
    * 
    * @param host {@link MapHost} from which we need to  
    *              shuffle available map-outputs.
    */
-  private void copyFromHost(MapHost host) throws IOException {
+  @VisibleForTesting
+  protected void copyFromHost(MapHost host) throws IOException {
     // Get completed maps on 'host'
     List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
     
@@ -229,9 +233,9 @@ class Fetcher<K,V> extends Thread {
       return;
     }
     
-    LOG.debug("Fetcher " + id + " going to fetch from " + host);
-    for (TaskAttemptID tmp: maps) {
-      LOG.debug(tmp);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+        + maps);
     }
     
     // List of maps to be fetched yet
@@ -304,17 +308,25 @@ class Fetcher<K,V> extends Thread {
     
     try {
       // Loop through available map-outputs and fetch them
-      // On any error, good becomes false and we exit after putting back
-      // the remaining maps to the yet_to_be_fetched list
-      boolean good = true;
-      while (!remaining.isEmpty() && good) {
-        good = copyMapOutput(host, input, remaining);
+      // On any error, faildTasks is not null and we exit
+      // after putting back the remaining maps to the 
+      // yet_to_be_fetched list and marking the failed tasks.
+      TaskAttemptID[] failedTasks = null;
+      while (!remaining.isEmpty() && failedTasks == null) {
+        failedTasks = copyMapOutput(host, input, remaining);
+      }
+      
+      if(failedTasks != null && failedTasks.length > 0) {
+        LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+        for(TaskAttemptID left: failedTasks) {
+          scheduler.copyFailed(left, host, true);
+        }
       }
       
       IOUtils.cleanup(LOG, input);
       
       // Sanity check
-      if (good && !remaining.isEmpty()) {
+      if (failedTasks == null && !remaining.isEmpty()) {
         throw new IOException("server didn't return all expected map outputs: "
             + remaining.size() + " left.");
       }
@@ -323,10 +335,11 @@ class Fetcher<K,V> extends Thread {
         scheduler.putBackKnownMapOutput(host, left);
       }
     }
-      
-   }
+  }
   
-  private boolean copyMapOutput(MapHost host,
+  private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0];
+  
+  private TaskAttemptID[] copyMapOutput(MapHost host,
                                 DataInputStream input,
                                 Set<TaskAttemptID> remaining) {
     MapOutput<K,V> mapOutput = null;
@@ -348,18 +361,21 @@ class Fetcher<K,V> extends Thread {
       } catch (IllegalArgumentException e) {
         badIdErrs.increment(1);
         LOG.warn("Invalid map id ", e);
-        return false;
+        //Don't know which one was bad, so consider all of them as bad
+        return remaining.toArray(new TaskAttemptID[remaining.size()]);
       }
 
  
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, mapId)) {
-        return false;
+        return new TaskAttemptID[] {mapId};
       }
       
-      LOG.debug("header: " + mapId + ", len: " + compressedLength + 
-               ", decomp len: " + decompressedLength);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("header: " + mapId + ", len: " + compressedLength + 
+            ", decomp len: " + decompressedLength);
+      }
       
       // Get the location for the map output - either in-memory or on-disk
       mapOutput = merger.reserve(mapId, decompressedLength, id);
@@ -367,7 +383,8 @@ class Fetcher<K,V> extends Thread {
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
         LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        return false;
+        //Not an error but wait to process data.
+        return EMPTY_ATTEMPT_ID_ARRAY;
       } 
       
       // Go!
@@ -389,24 +406,27 @@ class Fetcher<K,V> extends Thread {
       // Note successful shuffle
       remaining.remove(mapId);
       metrics.successFetch();
-      return true;
+      return null;
     } catch (IOException ioe) {
       ioErrs.increment(1);
       if (mapId == null || mapOutput == null) {
         LOG.info("fetcher#" + id + " failed to read map header" + 
                  mapId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
-        return false;
+        if(mapId == null) {
+          return remaining.toArray(new TaskAttemptID[remaining.size()]);
+        } else {
+          return new TaskAttemptID[] {mapId};
+        }
       }
       
-      LOG.info("Failed to shuffle output of " + mapId + 
+      LOG.warn("Failed to shuffle output of " + mapId + 
                " from " + host.getHostName(), ioe); 
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
-      scheduler.copyFailed(mapId, host, true);
       metrics.failedFetch();
-      return false;
+      return new TaskAttemptID[] {mapId};
     }
 
   }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1366258&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Fri Jul 27 01:48:09 2012
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.junit.Test;
+
+/**
+ * Test that the Fetcher does what we expect it to.
+ */
+public class TestFetcher {
+  private static final Log LOG = LogFactory.getLog(TestFetcher.class);
+
+  public static class FakeFetcher<K,V> extends Fetcher<K,V> {
+
+    private HttpURLConnection connection;
+
+    public FakeFetcher(JobConf job, TaskAttemptID reduceId,
+        ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter,
+        ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
+        SecretKey jobTokenSecret, HttpURLConnection connection) {
+      super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
+          jobTokenSecret);
+      this.connection = connection;
+    }
+    
+    @Override
+    protected HttpURLConnection openConnection(URL url) throws IOException {
+      if(connection != null) {
+        return connection;
+      }
+      return super.openConnection(url);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCopyFromHostBogusHeader() throws Exception {
+    LOG.info("testCopyFromHostBogusHeader");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+    
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString()))
+      .thenReturn(allErrs);
+    
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+        r, metrics, except, key, connection);
+    
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+      .thenReturn(replyHash);
+    ByteArrayInputStream in = new ByteArrayInputStream(
+        "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
+    when(connection.getInputStream()).thenReturn(in);
+    
+    underTest.copyFromHost(host);
+    
+    verify(connection)
+      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+          encHash);
+    
+    verify(allErrs).increment(1);
+    verify(ss).copyFailed(map1ID, host, true);
+    verify(ss).copyFailed(map2ID, host, true);
+    
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCopyFromHostWait() throws Exception {
+    LOG.info("testCopyFromHostWait");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+    
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString()))
+      .thenReturn(allErrs);
+    
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+        r, metrics, except, key, connection);
+    
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+      .thenReturn(replyHash);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    when(connection.getInputStream()).thenReturn(in);
+    //Defaults to WAIT, which is what we want to test
+    MapOutput<Text,Text> mapOut = new MapOutput<Text, Text>(map1ID);
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+      .thenReturn(mapOut);
+    
+    underTest.copyFromHost(host);
+    
+    verify(connection)
+      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+          encHash);
+    verify(allErrs, never()).increment(1);
+    verify(ss, never()).copyFailed(map1ID, host, true);
+    verify(ss, never()).copyFailed(map2ID, host, true);
+    
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+  }
+  
+}