You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cn...@apache.org on 2014/03/24 20:08:34 UTC
svn commit: r1581003 - in
/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/
hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/m...
Author: cnauroth
Date: Mon Mar 24 19:08:33 2014
New Revision: 1581003
URL: http://svn.apache.org/r1581003
Log:
MAPREDUCE-5791. Merging change r1580996 from branch-2 to branch-2.4
Added:
hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
- copied unchanged from r1580996, hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedFileRegion.java
Modified:
hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt?rev=1581003&r1=1581002&r2=1581003&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt Mon Mar 24 19:08:33 2014
@@ -89,6 +89,10 @@ Release 2.4.0 - UNRELEASED
override HADOOP_ROOT_LOGGER or HADOOP_CLIENT_OPTS. (Varun Vasudev via
vinodkv)
+ MAPREDUCE-5791. Shuffle phase is slow in Windows -
+ FadviseFileRegion::transferTo does not read disks efficiently.
+ (Nikola Vujic via cnauroth)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1581003&r1=1581002&r2=1581003&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Mar 24 19:08:33 2014
@@ -599,6 +599,30 @@
</property>
<property>
+ <name>mapreduce.shuffle.transferTo.allowed</name>
+ <value></value>
+ <description>This option can enable/disable using nio transferTo method in
+ the shuffle phase. NIO transferTo does not perform well on windows in the
+ shuffle phase. Thus, with this configuration property it is possible to
+ disable it, in which case custom transfer method will be used. Recommended
+ value is false when running Hadoop on Windows. For Linux, it is recommended
+ to set it to true. If nothing is set then the default value is false for
+ Windows, and true for Linux.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.shuffle.transfer.buffer.size</name>
+ <value>131072</value>
+ <description>This property is used only if
+ mapreduce.shuffle.transferTo.allowed is set to false. In that case,
+ this property defines the size of the buffer used in the buffer copy code
+ for the shuffle phase. The size of this buffer determines the size of the IO
+ requests.
+ </description>
+</property>
+
+<property>
<name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value>
<description>The percentage of memory -relative to the maximum heap size- to
Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java?rev=1581003&r1=1581002&r2=1581003&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java Mon Mar 24 19:08:33 2014
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapred;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.commons.logging.Log;
@@ -30,6 +32,8 @@ import org.apache.hadoop.io.ReadaheadPoo
import org.apache.hadoop.io.nativeio.NativeIO;
import org.jboss.netty.channel.DefaultFileRegion;
+import com.google.common.annotations.VisibleForTesting;
+
public class FadvisedFileRegion extends DefaultFileRegion {
private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
@@ -39,18 +43,29 @@ public class FadvisedFileRegion extends
private final ReadaheadPool readaheadPool;
private final FileDescriptor fd;
private final String identifier;
-
+ private final long count;
+ private final long position;
+ private final int shuffleBufferSize;
+ private final boolean shuffleTransferToAllowed;
+ private final FileChannel fileChannel;
+
private ReadaheadRequest readaheadRequest;
public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier) throws IOException {
+ String identifier, int shuffleBufferSize,
+ boolean shuffleTransferToAllowed) throws IOException {
super(file.getChannel(), position, count);
this.manageOsCache = manageOsCache;
this.readaheadLength = readaheadLength;
this.readaheadPool = readaheadPool;
this.fd = file.getFD();
this.identifier = identifier;
+ this.fileChannel = file.getChannel();
+ this.count = count;
+ this.position = position;
+ this.shuffleBufferSize = shuffleBufferSize;
+ this.shuffleTransferToAllowed = shuffleTransferToAllowed;
}
@Override
@@ -61,9 +76,69 @@ public class FadvisedFileRegion extends
getPosition() + position, readaheadLength,
getPosition() + getCount(), readaheadRequest);
}
- return super.transferTo(target, position);
+
+ if(this.shuffleTransferToAllowed) {
+ return super.transferTo(target, position);
+ } else {
+ return customShuffleTransfer(target, position);
+ }
+ }
+
+ /**
+ * This method transfers data using local buffer. It transfers data from
+ * a disk to a local buffer in memory, and then it transfers data from the
+ * buffer to the target. This is used only if transferTo is disallowed in
+ * the configuration file. super.TransferTo does not perform well on Windows
+ * due to a small IO request generated. customShuffleTransfer can control
+ * the size of the IO requests by changing the size of the intermediate
+ * buffer.
+ */
+ @VisibleForTesting
+ long customShuffleTransfer(WritableByteChannel target, long position)
+ throws IOException {
+ long actualCount = this.count - position;
+ if (actualCount < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position +
+ " (expected: 0 - " + (this.count - 1) + ')');
+ }
+ if (actualCount == 0) {
+ return 0L;
+ }
+
+ long trans = actualCount;
+ int readSize;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+ while(trans > 0L &&
+ (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+ //adjust counters and buffer limit
+ if(readSize < trans) {
+ trans -= readSize;
+ position += readSize;
+ byteBuffer.flip();
+ } else {
+ //We can read more than we need if the actualCount is not multiple
+ //of the byteBuffer size and file is big enough. In that case we cannot
+ //use flip method but we need to set buffer limit manually to trans.
+ byteBuffer.limit((int)trans);
+ byteBuffer.position(0);
+ position += trans;
+ trans = 0;
+ }
+
+ //write data to the target
+ while(byteBuffer.hasRemaining()) {
+ target.write(byteBuffer);
+ }
+
+ byteBuffer.clear();
+ }
+
+ return actualCount - trans;
}
+
@Override
public void releaseExternalResources() {
if (readaheadRequest != null) {
Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1581003&r1=1581002&r2=1581003&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Mon Mar 24 19:08:33 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.metrics2.lib.Mu
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
@@ -144,6 +145,8 @@ public class ShuffleHandler extends Auxi
private boolean manageOsCache;
private int readaheadLength;
private int maxShuffleConnections;
+ private int shuffleBufferSize;
+ private boolean shuffleTransferToAllowed;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
public static final String MAPREDUCE_SHUFFLE_SERVICEID =
@@ -183,6 +186,17 @@ public class ShuffleHandler extends Auxi
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
// 0 implies Netty default of 2 * number of available processors
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
+
+ public static final String SHUFFLE_BUFFER_SIZE =
+ "mapreduce.shuffle.transfer.buffer.size";
+ public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+ public static final String SHUFFLE_TRANSFERTO_ALLOWED =
+ "mapreduce.shuffle.transferTo.allowed";
+ public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
+ public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
+ false;
+
boolean connectionKeepAliveEnabled = false;
int connectionKeepAliveTimeOut;
int mapOutputMetaInfoCacheSize;
@@ -310,6 +324,13 @@ public class ShuffleHandler extends Auxi
if (maxShuffleThreads == 0) {
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
}
+
+ shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
+ DEFAULT_SHUFFLE_BUFFER_SIZE);
+
+ shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
+ (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
+ DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d")
@@ -746,7 +767,8 @@ public class ShuffleHandler extends Auxi
if (ch.getPipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
info.startOffset, info.partLength, manageOsCache, readaheadLength,
- readaheadPool, spillfile.getAbsolutePath());
+ readaheadPool, spillfile.getAbsolutePath(),
+ shuffleBufferSize, shuffleTransferToAllowed);
writeFuture = ch.write(partition);
writeFuture.addListener(new ChannelFutureListener() {
// TODO error handling; distinguish IO/connection failures,