You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2013/06/26 07:50:28 UTC

svn commit: r1496743 - in /hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-c...

Author: acmurthy
Date: Wed Jun 26 05:50:27 2013
New Revision: 1496743

URL: http://svn.apache.org/r1496743
Log:
Merge -c 1496741 from trunk to branch-2.1-beta to fix MAPREDUCE-5326. Added version to shuffle header. Contributed by Zhijie Shen.

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1496743&r1=1496742&r2=1496743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt Wed Jun 26 05:50:27 2013
@@ -156,6 +156,9 @@ Release 2.1.0-beta - UNRELEASED
 
     MAPREDUCE-5194. Heed interrupts during Fetcher shutdown. (cdouglas)
 
+    MAPREDUCE-5326. Added version to shuffle header. (Zhijie Shen via
+    acmurthy)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1496743&r1=1496742&r2=1496743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Wed Jun 26 05:50:27 2013
@@ -275,6 +275,11 @@ class Fetcher<K,V> extends Thread {
           SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
       // set the read timeout
       connection.setReadTimeout(readTimeout);
+      // put shuffle version into http header
+      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       connect(connection, connectionTimeout);
       // verify that the thread wasn't stopped during calls to connect
       if (stopped) {
@@ -290,7 +295,13 @@ class Fetcher<K,V> extends Thread {
             "Got invalid response code " + rc + " from " + url +
             ": " + connection.getResponseMessage());
       }
-      
+      // get the shuffle version
+      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
+          connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
+              connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
+        throw new IOException("Incompatible shuffle response version");
+      }
       // get the replyHash which is HMac of the encHash we sent to the server
       String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
       if(replyHash==null) {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java?rev=1496743&r1=1496742&r2=1496743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java Wed Jun 26 05:50:27 2013
@@ -36,6 +36,12 @@ import org.apache.hadoop.io.WritableUtil
 @InterfaceStability.Stable
 public class ShuffleHeader implements Writable {
 
+  /** Header info of the shuffle http request/response */
+  public static final String HTTP_HEADER_NAME = "name";
+  public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce";
+  public static final String HTTP_HEADER_VERSION = "version";
+  public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0";
+
   /**
    * The longest possible length of task attempt id that we will accept.
    */

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1496743&r1=1496742&r2=1496743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Wed Jun 26 05:50:27 2013
@@ -124,9 +124,8 @@ public class TestFetcher {
 
     underTest.copyFromHost(host);
     
-    verify(connection)
-      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
-          encHash);
+    verify(connection).addRequestProperty(
+        SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
     
     verify(allErrs).increment(1);
     verify(ss).copyFailed(map1ID, host, false, false);
@@ -144,17 +143,20 @@ public class TestFetcher {
     String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
     
     when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
     when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
-      .thenReturn(replyHash);
+        .thenReturn(replyHash);
     ByteArrayInputStream in = new ByteArrayInputStream(
         "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
     when(connection.getInputStream()).thenReturn(in);
     
     underTest.copyFromHost(host);
     
-    verify(connection)
-      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
-          encHash);
+    verify(connection).addRequestProperty(
+        SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
     
     verify(allErrs).increment(1);
     verify(ss).copyFailed(map1ID, host, true, false);
@@ -165,6 +167,37 @@ public class TestFetcher {
   }
 
   @Test
+  public void testCopyFromHostIncompatibleShuffleVersion() throws Exception {
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn("mapreduce").thenReturn("other").thenReturn("other");
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn("1.0.1").thenReturn("1.0.0").thenReturn("1.0.1");
+    when(connection.getHeaderField(
+        SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
+    ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]);
+    when(connection.getInputStream()).thenReturn(in);
+
+    for (int i = 0; i < 3; ++i) {
+      Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+          r, metrics, except, key, connection);
+      underTest.copyFromHost(host);
+    }
+    
+    verify(connection, times(3)).addRequestProperty(
+        SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+    
+    verify(allErrs, times(3)).increment(1);
+    verify(ss, times(3)).copyFailed(map1ID, host, false, false);
+    verify(ss, times(3)).copyFailed(map2ID, host, false, false);
+    
+    verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+  }
+
+  @Test
   public void testCopyFromHostWait() throws Exception {
     Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
         r, metrics, except, key, connection);
@@ -173,20 +206,24 @@ public class TestFetcher {
     
     when(connection.getResponseCode()).thenReturn(200);
     when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
-      .thenReturn(replyHash);
+        .thenReturn(replyHash);
     ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
     ByteArrayOutputStream bout = new ByteArrayOutputStream();
     header.write(new DataOutputStream(bout));
     ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
     when(connection.getInputStream()).thenReturn(in);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
     //Defaults to null, which is what we want to test
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
-      .thenReturn(null);
+        .thenReturn(null);
     
     underTest.copyFromHost(host);
     
     verify(connection)
-      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+        .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
           encHash);
     verify(allErrs, never()).increment(1);
     verify(ss, never()).copyFailed(map1ID, host, true, false);
@@ -208,24 +245,27 @@ public class TestFetcher {
     
     when(connection.getResponseCode()).thenReturn(200);
     when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
-      .thenReturn(replyHash);
+        .thenReturn(replyHash);
     ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
     ByteArrayOutputStream bout = new ByteArrayOutputStream();
     header.write(new DataOutputStream(bout));
     ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
     when(connection.getInputStream()).thenReturn(in);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
-      .thenReturn(immo);
+        .thenReturn(immo);
     
-    doThrow(new java.lang.InternalError())
-    .when(immo)
-      .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), 
-               anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+    doThrow(new java.lang.InternalError()).when(immo)
+        .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), 
+            anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
 
     underTest.copyFromHost(host);
        
     verify(connection)
-      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+        .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
           encHash);
     verify(ss, times(1)).copyFailed(map1ID, host, true, false);
   }
@@ -236,19 +276,23 @@ public class TestFetcher {
     InMemoryMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
           job, id, mm, 100, null, true));
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
-      .thenReturn(immo);
+        .thenReturn(immo);
     doNothing().when(mm).waitForResource();
     when(ss.getHost()).thenReturn(host);
 
     String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
     when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
     when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
-      .thenReturn(replyHash);
+        .thenReturn(replyHash);
     ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
     ByteArrayOutputStream bout = new ByteArrayOutputStream();
     header.write(new DataOutputStream(bout));
     final StuckInputStream in =
-      new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
+        new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
     when(connection.getInputStream()).thenReturn(in);
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock ignore) throws IOException {
@@ -280,20 +324,24 @@ public class TestFetcher {
     OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID,
         id, mm, 100L, job, mof, FETCHER, true, mFs, p));
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
-      .thenReturn(odmo);
+        .thenReturn(odmo);
     doNothing().when(mm).waitForResource();
     when(ss.getHost()).thenReturn(host);
 
     String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
     when(connection.getResponseCode()).thenReturn(200);
     when(connection.getHeaderField(
-          SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
+        SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
     ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
     ByteArrayOutputStream bout = new ByteArrayOutputStream();
     header.write(new DataOutputStream(bout));
     final StuckInputStream in =
-      new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
+        new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
     when(connection.getInputStream()).thenReturn(in);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock ignore) throws IOException {
         in.close();

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1496743&r1=1496742&r2=1496743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed Jun 26 05:50:27 2013
@@ -23,6 +23,7 @@ import static org.jboss.netty.handler.co
 import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
@@ -438,6 +439,13 @@ public class ShuffleHandler extends Auxi
           sendError(ctx, METHOD_NOT_ALLOWED);
           return;
       }
+      // Check whether the shuffle version is compatible
+      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
+          request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
+              request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+        sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+      }
       final Map<String,List<String>> q =
         new QueryStringDecoder(request.getUri()).getParameters();
       final List<String> mapIds = splitMaps(q.get("map"));
@@ -543,6 +551,11 @@ public class ShuffleHandler extends Auxi
         SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), 
             tokenSecret);
       response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      // Put shuffle version into http header
+      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       if (LOG.isDebugEnabled()) {
         int len = reply.length();
         LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
@@ -627,6 +640,11 @@ public class ShuffleHandler extends Auxi
         HttpResponseStatus status) {
       HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
       response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+      // Put shuffle version into http header
+      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       response.setContent(
         ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1496743&r1=1496742&r2=1496743&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Wed Jun 26 05:50:27 2013
@@ -180,6 +180,10 @@ public class TestShuffleHandler {
       + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
       + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
     HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
     conn.connect();
     DataInputStream input = new DataInputStream(conn.getInputStream());
     Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
@@ -191,6 +195,35 @@ public class TestShuffleHandler {
     Assert.assertTrue("sendError called when client closed connection",
         failures.size() == 0);
   }
+
+  @Test (timeout = 10000)
+  public void testIncompatibleShuffleVersion() throws Exception {
+    final int failureNum = 3;
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    ShuffleHandler shuffleHandler = new ShuffleHandler();
+    shuffleHandler.init(conf);
+    shuffleHandler.start();
+
+    // simulate a reducer that closes early by reading a single shuffle header
+    // then closing the connection
+    URL url = new URL("http://127.0.0.1:"
+      + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+      + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
+    for (int i = 0; i < failureNum; ++i) {
+      HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          i == 0 ? "mapreduce" : "other");
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          i == 1 ? "1.0.0" : "1.0.1");
+      conn.connect();
+      Assert.assertEquals(
+          HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
+    }
+
+    shuffleHandler.stop();
+    shuffleHandler.close();
+  }
   
   @Test (timeout = 10000)
   public void testMaxConnections() throws Exception {
@@ -242,6 +275,10 @@ public class TestShuffleHandler {
            + i + "_0";
       URL url = new URL(URLstring);
       conns[i] = (HttpURLConnection)url.openConnection();
+      conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
     }
 
     // Try to open numerous connections
@@ -330,6 +367,10 @@ public class TestShuffleHandler {
                   + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
                   + "&map=attempt_12345_1_m_1_0");
       HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       conn.connect();
       byte[] byteArr = new byte[10000];
       try {