You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/16 01:55:57 UTC
svn commit: r1410132 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/jav...
Author: bobby
Date: Fri Nov 16 00:55:56 2012
New Revision: 1410132
URL: http://svn.apache.org/viewvc?rev=1410132&view=rev
Log:
svn merge -c 1410131 FIXES: MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely closed channels (jlowe via bobby)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1410132&r1=1410131&r2=1410132&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Nov 16 00:55:56 2012
@@ -534,6 +534,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact
the RM (jlowe via bobby)
+
+ MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely
+ closed channels (jlowe via bobby)
Release 0.23.4
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1410132&r1=1410131&r2=1410132&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Fri Nov 16 00:55:56 2012
@@ -37,6 +37,7 @@ import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -45,6 +46,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@@ -120,10 +122,16 @@ public class ShuffleHandler extends Abst
public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+ // pattern to identify errors related to the client closing the socket early
+ // idea borrowed from Netty SslHandler
+ private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
+ "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
+ Pattern.CASE_INSENSITIVE);
+
private int port;
private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup();
- private HttpPipelineFactory pipelineFact;
+ protected HttpPipelineFactory pipelineFact;
private int sslFileBufferSize;
/**
@@ -319,13 +327,17 @@ public class ShuffleHandler extends Abst
}
}
+ protected Shuffle getShuffle(Configuration conf) {
+ return new Shuffle(conf);
+ }
+
class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE;
private SSLFactory sslFactory;
public HttpPipelineFactory(Configuration conf) throws Exception {
- SHUFFLE = new Shuffle(conf);
+ SHUFFLE = getShuffle(conf);
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
@@ -465,7 +477,7 @@ public class ShuffleHandler extends Abst
lastMap.addListener(ChannelFutureListener.CLOSE);
}
- private void verifyRequest(String appid, ChannelHandlerContext ctx,
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
@@ -566,12 +578,12 @@ public class ShuffleHandler extends Abst
return writeFuture;
}
- private void sendError(ChannelHandlerContext ctx,
+ protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
sendError(ctx, "", status);
}
- private void sendError(ChannelHandlerContext ctx, String message,
+ protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
@@ -590,6 +602,16 @@ public class ShuffleHandler extends Abst
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
+ } else if (cause instanceof IOException) {
+ if (cause instanceof ClosedChannelException) {
+ LOG.debug("Ignoring closed channel error", cause);
+ return;
+ }
+ String message = String.valueOf(cause.getMessage());
+ if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
+ LOG.debug("Ignoring client socket close", cause);
+ return;
+ }
}
LOG.error("Shuffle error: ", cause);
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1410132&r1=1410131&r2=1410132&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Fri Nov 16 00:55:56 2012
@@ -17,17 +17,35 @@
*/
package org.apache.hadoop.mapred;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.apache.hadoop.test.MockitoMaker.make;
+import static org.apache.hadoop.test.MockitoMaker.stub;
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.MetricsSystem;
-import static org.apache.hadoop.test.MetricsAsserts.*;
-
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
-
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.apache.hadoop.test.MockitoMaker.*;
public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
@@ -69,4 +87,76 @@ public class TestShuffleHandler {
assertCounter("ShuffleOutputsOK", succeeded, rb);
assertGauge("ShuffleConnections", connections, rb);
}
+
+ @Test
+ public void testClientClosesConnection() throws Exception {
+ final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ ShuffleHandler shuffleHandler = new ShuffleHandler() {
+ @Override
+ protected Shuffle getShuffle(Configuration conf) {
+ // replace the shuffle handler with one stubbed for testing
+ return new Shuffle(conf) {
+ @Override
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+ HttpRequest request, HttpResponse response, URL requestUri)
+ throws IOException {
+ }
+ @Override
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+ Channel ch, String user, String jobId, String mapId, int reduce)
+ throws IOException {
+ // send a shuffle header and a lot of data down the channel
+ // to trigger a broken pipe
+ ShuffleHeader header =
+ new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ dob = new DataOutputBuffer();
+ for (int i=0; i<100000; ++i) {
+ header.write(dob);
+ }
+ return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ }
+ @Override
+ protected void sendError(ChannelHandlerContext ctx,
+ HttpResponseStatus status) {
+ if (failures.size() == 0) {
+ failures.add(new Error());
+ ctx.getChannel().close();
+ }
+ }
+ @Override
+ protected void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ if (failures.size() == 0) {
+ failures.add(new Error());
+ ctx.getChannel().close();
+ }
+ }
+ };
+ }
+ };
+ shuffleHandler.init(conf);
+ shuffleHandler.start();
+
+ // simulate a reducer that closes early by reading a single shuffle header
+ // then closing the connection
+ URL url = new URL("http://127.0.0.1:"
+ + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
+ HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+ conn.connect();
+ DataInputStream input = new DataInputStream(conn.getInputStream());
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(input);
+ input.close();
+
+ shuffleHandler.stop();
+ Assert.assertTrue("sendError called when client closed connection",
+ failures.size() == 0);
+ }
}