You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cn...@apache.org on 2014/03/24 20:08:34 UTC

svn commit: r1581003 - in /hadoop/common/branches/branch-2.4/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/m...

Author: cnauroth
Date: Mon Mar 24 19:08:33 2014
New Revision: 1581003

URL: http://svn.apache.org/r1581003
Log:
MAPREDUCE-5791. Merging change r1580996 from branch-2 to branch-2.4

Added:
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
      - copied unchanged from r1580996, hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
Modified:
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt?rev=1581003&r1=1581002&r2=1581003&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt Mon Mar 24 19:08:33 2014
@@ -89,6 +89,10 @@ Release 2.4.0 - UNRELEASED
     override HADOOP_ROOT_LOGGER or HADOOP_CLIENT_OPTS. (Varun Vasudev via
     vinodkv)
 
+    MAPREDUCE-5791. Shuffle phase is slow in Windows -
+    FadviseFileRegion::transferTo does not read disks efficiently.
+    (Nikola Vujic via cnauroth)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1581003&r1=1581002&r2=1581003&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Mar 24 19:08:33 2014
@@ -599,6 +599,30 @@
 </property>
 
 <property>
+  <name>mapreduce.shuffle.transferTo.allowed</name>
+  <value></value>
+  <description>This option can enable/disable using nio transferTo method in 
+  the shuffle phase. NIO transferTo does not perform well on windows in the 
+  shuffle phase. Thus, with this configuration property it is possible to 
+  disable it, in which case custom transfer method will be used. Recommended 
+  value is false when running Hadoop on Windows. For Linux, it is recommended 
+  to set it to true. If nothing is set then the default value is false for 
+  Windows, and true for Linux.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.shuffle.transfer.buffer.size</name>
+  <value>131072</value>
+  <description>This property is used only if 
+  mapreduce.shuffle.transferTo.allowed is set to false. In that case, 
+  this property defines the size of the buffer used in the buffer copy code
+  for the shuffle phase. The size of this buffer determines the size of the IO
+  requests.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.reduce.markreset.buffer.percent</name>
   <value>0.0</value>
   <description>The percentage of memory -relative to the maximum heap size- to

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java?rev=1581003&r1=1581002&r2=1581003&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java Mon Mar 24 19:08:33 2014
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapred;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
 
 import org.apache.commons.logging.Log;
@@ -30,6 +32,8 @@ import org.apache.hadoop.io.ReadaheadPoo
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.jboss.netty.channel.DefaultFileRegion;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class FadvisedFileRegion extends DefaultFileRegion {
 
   private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
@@ -39,18 +43,29 @@ public class FadvisedFileRegion extends 
   private final ReadaheadPool readaheadPool;
   private final FileDescriptor fd;
   private final String identifier;
-
+  private final long count;
+  private final long position;
+  private final int shuffleBufferSize;
+  private final boolean shuffleTransferToAllowed;
+  private final FileChannel fileChannel;
+  
   private ReadaheadRequest readaheadRequest;
 
   public FadvisedFileRegion(RandomAccessFile file, long position, long count,
       boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-      String identifier) throws IOException {
+      String identifier, int shuffleBufferSize, 
+      boolean shuffleTransferToAllowed) throws IOException {
     super(file.getChannel(), position, count);
     this.manageOsCache = manageOsCache;
     this.readaheadLength = readaheadLength;
     this.readaheadPool = readaheadPool;
     this.fd = file.getFD();
     this.identifier = identifier;
+    this.fileChannel = file.getChannel();
+    this.count = count;
+    this.position = position;
+    this.shuffleBufferSize = shuffleBufferSize;
+    this.shuffleTransferToAllowed = shuffleTransferToAllowed;
   }
 
   @Override
@@ -61,9 +76,69 @@ public class FadvisedFileRegion extends 
           getPosition() + position, readaheadLength,
           getPosition() + getCount(), readaheadRequest);
     }
-    return super.transferTo(target, position);
+    
+    if(this.shuffleTransferToAllowed) {
+      return super.transferTo(target, position);
+    } else {
+      return customShuffleTransfer(target, position);
+    } 
+  }
+
+  /**
+   * This method transfers data using local buffer. It transfers data from 
+   * a disk to a local buffer in memory, and then it transfers data from the 
+   * buffer to the target. This is used only if transferTo is disallowed in
+   * the configuration file. super.TransferTo does not perform well on Windows 
+   * due to a small IO request generated. customShuffleTransfer can control 
+   * the size of the IO requests by changing the size of the intermediate 
+   * buffer.
+   */
+  @VisibleForTesting
+  long customShuffleTransfer(WritableByteChannel target, long position)
+      throws IOException {
+    long actualCount = this.count - position;
+    if (actualCount < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position +
+          " (expected: 0 - " + (this.count - 1) + ')');
+    }
+    if (actualCount == 0) {
+      return 0L;
+    }
+    
+    long trans = actualCount;
+    int readSize;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+    
+    while(trans > 0L &&
+        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+      //adjust counters and buffer limit
+      if(readSize < trans) {
+        trans -= readSize;
+        position += readSize;
+        byteBuffer.flip();
+      } else {
+        //We can read more than we need if the actualCount is not multiple 
+        //of the byteBuffer size and file is big enough. In that case we cannot
+        //use flip method but we need to set buffer limit manually to trans.
+        byteBuffer.limit((int)trans);
+        byteBuffer.position(0);
+        position += trans; 
+        trans = 0;
+      }
+      
+      //write data to the target
+      while(byteBuffer.hasRemaining()) {
+        target.write(byteBuffer);
+      }
+      
+      byteBuffer.clear();
+    }
+    
+    return actualCount - trans;
   }
 
+  
   @Override
   public void releaseExternalResources() {
     if (readaheadRequest != null) {

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1581003&r1=1581002&r2=1581003&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Mon Mar 24 19:08:33 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.metrics2.lib.Mu
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
@@ -144,6 +145,8 @@ public class ShuffleHandler extends Auxi
   private boolean manageOsCache;
   private int readaheadLength;
   private int maxShuffleConnections;
+  private int shuffleBufferSize;
+  private boolean shuffleTransferToAllowed;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
@@ -183,6 +186,17 @@ public class ShuffleHandler extends Auxi
   public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
   // 0 implies Netty default of 2 * number of available processors
   public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
+  
+  public static final String SHUFFLE_BUFFER_SIZE = 
+      "mapreduce.shuffle.transfer.buffer.size";
+  public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+  
+  public static final String  SHUFFLE_TRANSFERTO_ALLOWED = 
+      "mapreduce.shuffle.transferTo.allowed";
+  public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
+  public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = 
+      false;
+
   boolean connectionKeepAliveEnabled = false;
   int connectionKeepAliveTimeOut;
   int mapOutputMetaInfoCacheSize;
@@ -310,6 +324,13 @@ public class ShuffleHandler extends Auxi
     if (maxShuffleThreads == 0) {
       maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
     }
+    
+    shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE, 
+                                    DEFAULT_SHUFFLE_BUFFER_SIZE);
+        
+    shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
+         (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
+                         DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
 
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("ShuffleHandler Netty Boss #%d")
@@ -746,7 +767,8 @@ public class ShuffleHandler extends Auxi
       if (ch.getPipeline().get(SslHandler.class) == null) {
         final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
             info.startOffset, info.partLength, manageOsCache, readaheadLength,
-            readaheadPool, spillfile.getAbsolutePath());
+            readaheadPool, spillfile.getAbsolutePath(), 
+            shuffleBufferSize, shuffleTransferToAllowed);
         writeFuture = ch.write(partition);
         writeFuture.addListener(new ChannelFutureListener() {
             // TODO error handling; distinguish IO/connection failures,