You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/16 01:55:57 UTC

svn commit: r1410132 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/jav...

Author: bobby
Date: Fri Nov 16 00:55:56 2012
New Revision: 1410132

URL: http://svn.apache.org/viewvc?rev=1410132&view=rev
Log:
svn merge -c 1410131 FIXES: MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely closed channels (jlowe via bobby)

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1410132&r1=1410131&r2=1410132&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Nov 16 00:55:56 2012
@@ -534,6 +534,9 @@ Release 0.23.5 - UNRELEASED
 
     MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact
     the RM (jlowe via bobby)
+
+    MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely
+    closed channels (jlowe via bobby)
  
 Release 0.23.4
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1410132&r1=1410131&r2=1410132&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Fri Nov 16 00:55:56 2012
@@ -37,6 +37,7 @@ import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -45,6 +46,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import javax.crypto.SecretKey;
 
@@ -120,10 +122,16 @@ public class ShuffleHandler extends Abst
   public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
   public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
 
+  // pattern to identify errors related to the client closing the socket early
+  // idea borrowed from Netty SslHandler
+  private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
+      "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
+      Pattern.CASE_INSENSITIVE);
+
   private int port;
   private ChannelFactory selector;
   private final ChannelGroup accepted = new DefaultChannelGroup();
-  private HttpPipelineFactory pipelineFact;
+  protected HttpPipelineFactory pipelineFact;
   private int sslFileBufferSize;
 
   /**
@@ -319,13 +327,17 @@ public class ShuffleHandler extends Abst
     }
   }
 
+  protected Shuffle getShuffle(Configuration conf) {
+    return new Shuffle(conf);
+  }
+
   class HttpPipelineFactory implements ChannelPipelineFactory {
 
     final Shuffle SHUFFLE;
     private SSLFactory sslFactory;
 
     public HttpPipelineFactory(Configuration conf) throws Exception {
-      SHUFFLE = new Shuffle(conf);
+      SHUFFLE = getShuffle(conf);
       if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
                           MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
         sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
@@ -465,7 +477,7 @@ public class ShuffleHandler extends Abst
       lastMap.addListener(ChannelFutureListener.CLOSE);
     }
 
-    private void verifyRequest(String appid, ChannelHandlerContext ctx,
+    protected void verifyRequest(String appid, ChannelHandlerContext ctx,
         HttpRequest request, HttpResponse response, URL requestUri)
         throws IOException {
       SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
@@ -566,12 +578,12 @@ public class ShuffleHandler extends Abst
       return writeFuture;
     }
 
-    private void sendError(ChannelHandlerContext ctx,
+    protected void sendError(ChannelHandlerContext ctx,
         HttpResponseStatus status) {
       sendError(ctx, "", status);
     }
 
-    private void sendError(ChannelHandlerContext ctx, String message,
+    protected void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
       HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
       response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
@@ -590,6 +602,16 @@ public class ShuffleHandler extends Abst
       if (cause instanceof TooLongFrameException) {
         sendError(ctx, BAD_REQUEST);
         return;
+      } else if (cause instanceof IOException) {
+        if (cause instanceof ClosedChannelException) {
+          LOG.debug("Ignoring closed channel error", cause);
+          return;
+        }
+        String message = String.valueOf(cause.getMessage());
+        if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
+          LOG.debug("Ignoring client socket close", cause);
+          return;
+        }
       }
 
       LOG.error("Shuffle error: ", cause);

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1410132&r1=1410131&r2=1410132&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Fri Nov 16 00:55:56 2012
@@ -17,17 +17,35 @@
  */
 package org.apache.hadoop.mapred;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.apache.hadoop.test.MockitoMaker.make;
+import static org.apache.hadoop.test.MockitoMaker.stub;
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
 import org.apache.hadoop.metrics2.MetricsSystem;
-import static org.apache.hadoop.test.MetricsAsserts.*;
-
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
-
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.Assert;
 import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.apache.hadoop.test.MockitoMaker.*;
 
 public class TestShuffleHandler {
   static final long MiB = 1024 * 1024;
@@ -69,4 +87,76 @@ public class TestShuffleHandler {
     assertCounter("ShuffleOutputsOK", succeeded, rb);
     assertGauge("ShuffleConnections", connections, rb);
   }
+
+  @Test
+  public void testClientClosesConnection() throws Exception {
+    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+          @Override
+          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+              HttpRequest request, HttpResponse response, URL requestUri)
+                  throws IOException {
+          }
+          @Override
+          protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+              Channel ch, String user, String jobId, String mapId, int reduce)
+                  throws IOException {
+            // send a shuffle header and a lot of data down the channel
+            // to trigger a broken pipe
+            ShuffleHeader header =
+                new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+            DataOutputBuffer dob = new DataOutputBuffer();
+            header.write(dob);
+            ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            dob = new DataOutputBuffer();
+            for (int i=0; i<100000; ++i) {
+              header.write(dob);
+            }
+            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+          }
+          @Override
+          protected void sendError(ChannelHandlerContext ctx,
+              HttpResponseStatus status) {
+            if (failures.size() == 0) {
+              failures.add(new Error());
+              ctx.getChannel().close();
+            }
+          }
+          @Override
+          protected void sendError(ChannelHandlerContext ctx, String message,
+              HttpResponseStatus status) {
+            if (failures.size() == 0) {
+              failures.add(new Error());
+              ctx.getChannel().close();
+            }
+          }
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    shuffleHandler.start();
+
+    // simulate a reducer that closes early by reading a single shuffle header
+    // then closing the connection
+    URL url = new URL("http://127.0.0.1:"
+      + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+      + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
+    HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+    conn.connect();
+    DataInputStream input = new DataInputStream(conn.getInputStream());
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    ShuffleHeader header = new ShuffleHeader();
+    header.readFields(input);
+    input.close();
+
+    shuffleHandler.stop();
+    Assert.assertTrue("sendError called when client closed connection",
+        failures.size() == 0);
+  }
 }