You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2013/06/26 07:50:23 UTC
svn commit: r1496742 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src...
Author: acmurthy
Date: Wed Jun 26 05:50:23 2013
New Revision: 1496742
URL: http://svn.apache.org/r1496742
Log:
Merge -c 1496741 from trunk to branch-2 to fix MAPREDUCE-5326. Added version to shuffle header. Contributed by Zhijie Shen.
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1496742&r1=1496741&r2=1496742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Jun 26 05:50:23 2013
@@ -183,6 +183,9 @@ Release 2.1.0-beta - UNRELEASED
MAPREDUCE-5194. Heed interrupts during Fetcher shutdown. (cdouglas)
+ MAPREDUCE-5326. Added version to shuffle header. (Zhijie Shen via
+ acmurthy)
+
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1496742&r1=1496741&r2=1496742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Wed Jun 26 05:50:23 2013
@@ -275,6 +275,11 @@ class Fetcher<K,V> extends Thread {
SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
// set the read timeout
connection.setReadTimeout(readTimeout);
+ // put shuffle version into http header
+ connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect
if (stopped) {
@@ -290,7 +295,13 @@ class Fetcher<K,V> extends Thread {
"Got invalid response code " + rc + " from " + url +
": " + connection.getResponseMessage());
}
-
+ // get the shuffle version
+ if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
+ connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
+ connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ throw new IOException("Incompatible shuffle response version");
+ }
// get the replyHash which is HMac of the encHash we sent to the server
String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
if(replyHash==null) {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java?rev=1496742&r1=1496741&r2=1496742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java Wed Jun 26 05:50:23 2013
@@ -36,6 +36,12 @@ import org.apache.hadoop.io.WritableUtil
@InterfaceStability.Stable
public class ShuffleHeader implements Writable {
+ /** Header info of the shuffle http request/response */
+ public static final String HTTP_HEADER_NAME = "name";
+ public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce";
+ public static final String HTTP_HEADER_VERSION = "version";
+ public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0";
+
/**
* The longest possible length of task attempt id that we will accept.
*/
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1496742&r1=1496741&r2=1496742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Wed Jun 26 05:50:23 2013
@@ -124,9 +124,8 @@ public class TestFetcher {
underTest.copyFromHost(host);
- verify(connection)
- .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
- encHash);
+ verify(connection).addRequestProperty(
+ SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
verify(allErrs).increment(1);
verify(ss).copyFailed(map1ID, host, false, false);
@@ -144,17 +143,20 @@ public class TestFetcher {
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
- .thenReturn(replyHash);
+ .thenReturn(replyHash);
ByteArrayInputStream in = new ByteArrayInputStream(
"\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
when(connection.getInputStream()).thenReturn(in);
underTest.copyFromHost(host);
- verify(connection)
- .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
- encHash);
+ verify(connection).addRequestProperty(
+ SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
verify(allErrs).increment(1);
verify(ss).copyFailed(map1ID, host, true, false);
@@ -165,6 +167,37 @@ public class TestFetcher {
}
@Test
+ public void testCopyFromHostIncompatibleShuffleVersion() throws Exception {
+ String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+ when(connection.getResponseCode()).thenReturn(200);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn("mapreduce").thenReturn("other").thenReturn("other");
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn("1.0.1").thenReturn("1.0.0").thenReturn("1.0.1");
+ when(connection.getHeaderField(
+ SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
+ ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]);
+ when(connection.getInputStream()).thenReturn(in);
+
+ for (int i = 0; i < 3; ++i) {
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+ r, metrics, except, key, connection);
+ underTest.copyFromHost(host);
+ }
+
+ verify(connection, times(3)).addRequestProperty(
+ SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+
+ verify(allErrs, times(3)).increment(1);
+ verify(ss, times(3)).copyFailed(map1ID, host, false, false);
+ verify(ss, times(3)).copyFailed(map2ID, host, false, false);
+
+ verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+ verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+ }
+
+ @Test
public void testCopyFromHostWait() throws Exception {
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
r, metrics, except, key, connection);
@@ -173,20 +206,24 @@ public class TestFetcher {
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
- .thenReturn(replyHash);
+ .thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
//Defaults to null, which is what we want to test
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
- .thenReturn(null);
+ .thenReturn(null);
underTest.copyFromHost(host);
verify(connection)
- .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
+ .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
encHash);
verify(allErrs, never()).increment(1);
verify(ss, never()).copyFailed(map1ID, host, true, false);
@@ -208,24 +245,27 @@ public class TestFetcher {
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
- .thenReturn(replyHash);
+ .thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
- .thenReturn(immo);
+ .thenReturn(immo);
- doThrow(new java.lang.InternalError())
- .when(immo)
- .shuffle(any(MapHost.class), any(InputStream.class), anyLong(),
- anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+ doThrow(new java.lang.InternalError()).when(immo)
+ .shuffle(any(MapHost.class), any(InputStream.class), anyLong(),
+ anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
underTest.copyFromHost(host);
verify(connection)
- .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
+ .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
encHash);
verify(ss, times(1)).copyFailed(map1ID, host, true, false);
}
@@ -236,19 +276,23 @@ public class TestFetcher {
InMemoryMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
job, id, mm, 100, null, true));
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
- .thenReturn(immo);
+ .thenReturn(immo);
doNothing().when(mm).waitForResource();
when(ss.getHost()).thenReturn(host);
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
- .thenReturn(replyHash);
+ .thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
final StuckInputStream in =
- new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
+ new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
when(connection.getInputStream()).thenReturn(in);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock ignore) throws IOException {
@@ -280,20 +324,24 @@ public class TestFetcher {
OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID,
id, mm, 100L, job, mof, FETCHER, true, mFs, p));
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
- .thenReturn(odmo);
+ .thenReturn(odmo);
doNothing().when(mm).waitForResource();
when(ss.getHost()).thenReturn(host);
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(
- SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
+ SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
final StuckInputStream in =
- new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
+ new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
when(connection.getInputStream()).thenReturn(in);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock ignore) throws IOException {
in.close();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1496742&r1=1496741&r2=1496742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed Jun 26 05:50:23 2013
@@ -23,6 +23,7 @@ import static org.jboss.netty.handler.co
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
@@ -438,6 +439,13 @@ public class ShuffleHandler extends Auxi
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
+ // Check whether the shuffle version is compatible
+ if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
+ request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+ || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
+ request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+ }
final Map<String,List<String>> q =
new QueryStringDecoder(request.getUri()).getParameters();
final List<String> mapIds = splitMaps(q.get("map"));
@@ -543,6 +551,11 @@ public class ShuffleHandler extends Auxi
SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
tokenSecret);
response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+ // Put shuffle version into http header
+ response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
if (LOG.isDebugEnabled()) {
int len = reply.length();
LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
@@ -627,6 +640,11 @@ public class ShuffleHandler extends Auxi
HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ // Put shuffle version into http header
+ response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
response.setContent(
ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1496742&r1=1496741&r2=1496742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Wed Jun 26 05:50:23 2013
@@ -180,6 +180,10 @@ public class TestShuffleHandler {
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect();
DataInputStream input = new DataInputStream(conn.getInputStream());
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
@@ -191,6 +195,35 @@ public class TestShuffleHandler {
Assert.assertTrue("sendError called when client closed connection",
failures.size() == 0);
}
+
+ @Test (timeout = 10000)
+ public void testIncompatibleShuffleVersion() throws Exception {
+ final int failureNum = 3;
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ ShuffleHandler shuffleHandler = new ShuffleHandler();
+ shuffleHandler.init(conf);
+ shuffleHandler.start();
+
+ // simulate a reducer that closes early by reading a single shuffle header
+ // then closing the connection
+ URL url = new URL("http://127.0.0.1:"
+ + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
+ for (int i = 0; i < failureNum; ++i) {
+ HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ i == 0 ? "mapreduce" : "other");
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ i == 1 ? "1.0.0" : "1.0.1");
+ conn.connect();
+ Assert.assertEquals(
+ HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
+ }
+
+ shuffleHandler.stop();
+ shuffleHandler.close();
+ }
@Test (timeout = 10000)
public void testMaxConnections() throws Exception {
@@ -242,6 +275,10 @@ public class TestShuffleHandler {
+ i + "_0";
URL url = new URL(URLstring);
conns[i] = (HttpURLConnection)url.openConnection();
+ conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
}
// Try to open numerous connections
@@ -330,6 +367,10 @@ public class TestShuffleHandler {
+ "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+ "&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect();
byte[] byteArr = new byte[10000];
try {