You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2023/02/28 08:36:45 UTC
[tez] branch master updated: TEZ-4460: Read timed out in shuffle handler - incorrect usage of EMPTY_LAST_CONTENT and channel write (#257) (Laszlo Bodor reviewed by Rajesh Balamohan, Syed Shameerur Rahman)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 6bd6f9cdc TEZ-4460: Read timed out in shuffle handler - incorrect usage of EMPTY_LAST_CONTENT and channel write (#257) (Laszlo Bodor reviewed by Rajesh Balamohan, Syed Shameerur Rahman)
6bd6f9cdc is described below
commit 6bd6f9cdcf47176ab78fb5aea555583af50cc2d3
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Tue Feb 28 09:36:40 2023 +0100
TEZ-4460: Read timed out in shuffle handler - incorrect usage of EMPTY_LAST_CONTENT and channel write (#257) (Laszlo Bodor reviewed by Rajesh Balamohan, Syed Shameerur Rahman)
---
.../org/apache/tez/auxservices/ShuffleHandler.java | 31 +--
.../apache/tez/auxservices/TestShuffleHandler.java | 288 +++++++++++++--------
2 files changed, 196 insertions(+), 123 deletions(-)
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index c6657750f..8b1b1aee0 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -129,7 +129,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
@@ -306,21 +305,28 @@ public class ShuffleHandler extends AuxiliaryService {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
+ Channel ch = future.channel();
if (!future.isSuccess()) {
- future.channel().close();
+ ch.close();
return;
}
int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
if (waitCount == 0) {
+ LOG.debug("Finished with all map outputs");
+ /*
+ * LastHttpContent.EMPTY_LAST_CONTENT can only be written when there are no remaining maps to send,
+ * this is the only time we can finish the HTTP response.
+ */
+ ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
metrics.operationComplete(future);
// Let the idle timer handler close keep-alive connections
if (reduceContext.getKeepAlive()) {
- ChannelPipeline pipeline = future.channel().pipeline();
+ ChannelPipeline pipeline = ch.pipeline();
TimeoutHandler timeoutHandler =
(TimeoutHandler) pipeline.get(TIMEOUT_HANDLER);
timeoutHandler.setEnabledTimeout(true);
} else {
- future.channel().close();
+ ch.close();
}
} else {
SHUFFLE.sendMap(reduceContext);
@@ -993,12 +999,11 @@ public class ShuffleHandler extends AuxiliaryService {
@Override
public void channelRead(ChannelHandlerContext ctx, Object message)
throws Exception {
- FullHttpRequest request = (FullHttpRequest) message;
+ HttpRequest request = (HttpRequest) message;
handleRequest(ctx, request);
- request.release();
}
- private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request)
+ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request)
throws IOException, Exception {
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
@@ -1123,13 +1128,9 @@ public class ShuffleHandler extends AuxiliaryService {
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
- // by this special message flushed, we can make sure the whole response is finished
- ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
return;
}
}
- // by this special message flushed, we can make sure the whole response is finished
- ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}
private boolean isNullOrEmpty(List<String> entries) {
@@ -1496,7 +1497,7 @@ public class ShuffleHandler extends AuxiliaryService {
DataOutputBuffer dobRange = new DataOutputBuffer();
// Indicate how many record to be written
WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1);
- ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
TezIndexRecord index = outputInfo.getIndex(reduce);
// Records are only valid if they have a non-zero part length
@@ -1511,7 +1512,7 @@ public class ShuffleHandler extends AuxiliaryService {
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
// Free the memory needed to store the spill and index records
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
outputInfo.finish();
@@ -1531,14 +1532,14 @@ public class ShuffleHandler extends AuxiliaryService {
rangeOffset, rangePartLength, manageOsCache, readaheadLength,
readaheadPool, spillFile.getAbsolutePath(),
shuffleBufferSize, shuffleTransferToAllowed);
- writeFuture = ch.write(partition);
+ writeFuture = ch.writeAndFlush(partition);
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
rangeOffset, rangePartLength, sslFileBufferSize,
manageOsCache, readaheadLength, readaheadPool,
spillFile.getAbsolutePath());
- writeFuture = ch.write(chunk);
+ writeFuture = ch.writeAndFlush(chunk);
}
metrics.shuffleConnections.incr();
metrics.shuffleOutputBytes.incr(rangePartLength); // optimistic
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 40b16857a..d0d0a381e 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -48,6 +48,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.zip.Checksum;
import org.apache.hadoop.conf.Configuration;
@@ -109,6 +111,8 @@ public class TestShuffleHandler {
private static final File TEST_DIR = new File(System.getProperty("test.build.data"),
TestShuffleHandler.class.getName()).getAbsoluteFile();
private static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
+ private static final String TEST_PARTITION_DATA_STRING = "0123456789";
+
class MockShuffleHandler extends org.apache.tez.auxservices.ShuffleHandler {
@Override
protected Shuffle getShuffle(final Configuration conf) {
@@ -283,9 +287,7 @@ public class TestShuffleHandler {
@Test (timeout = 10000)
public void testClientClosesConnection() throws Exception {
final AtomicBoolean failureEncountered = new AtomicBoolean(false);
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
protected Shuffle getShuffle(Configuration conf) {
@@ -387,9 +389,7 @@ public class TestShuffleHandler {
@Test(timeout = 10000)
public void testKeepAlive() throws Exception {
final AtomicBoolean failureEncountered = new AtomicBoolean(false);
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
// try setting to -ve keep alive timeout.
conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
@@ -535,9 +535,7 @@ public class TestShuffleHandler {
@Test
public void testSocketKeepAlive() throws Exception {
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
// try setting to -ve keep alive timeout.
conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
@@ -579,9 +577,7 @@ public class TestShuffleHandler {
@Test (timeout = 10000)
public void testIncompatibleShuffleVersion() throws Exception {
final int failureNum = 3;
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
ShuffleHandler shuffleHandler = new ShuffleHandler();
shuffleHandler.init(conf);
shuffleHandler.start();
@@ -613,9 +609,7 @@ public class TestShuffleHandler {
@Test (timeout = 10000)
public void testMaxConnections() throws Exception {
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
@@ -722,9 +716,7 @@ public class TestShuffleHandler {
*/
@Test(timeout = 10000)
public void testRangedFetch() throws IOException {
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"simple");
@@ -741,23 +733,7 @@ public class TestShuffleHandler {
List<File> fileMap = new ArrayList<>();
createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
conf, fileMap);
- ShuffleHandler shuffleHandler = new ShuffleHandler() {
-
- @Override
- protected Shuffle getShuffle(Configuration conf) {
- // replace the shuffle handler with one stubbed for testing
- return new Shuffle(conf) {
-
- @Override
- protected void verifyRequest(String appid, ChannelHandlerContext ctx,
- HttpRequest request, HttpResponse response, URL requestUri)
- throws IOException {
- // Do nothing.
- }
-
- };
- }
- };
+ ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify();
shuffleHandler.init(conf);
try {
shuffleHandler.start();
@@ -814,6 +790,104 @@ public class TestShuffleHandler {
}
}
+ /**
+ * Validate the ranged fetch works as expected for different amount of map attempts and reduce ranges.
+ */
+ @Test(timeout = 30000)
+ public void testRangedFetchMultipleAttempts() throws IOException {
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/1);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/1);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/1);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/1);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/5);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/5);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/5);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/5);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/10);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/10);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/10);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/10);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/100);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/100);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/100);
+ runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/100);
+ }
+
+ private void runMultiAttemptMultiRangeShuffleTest(int attemptRange, int reduceRange) throws IOException {
+ Random random = new Random();
+ String user = "randomUser";
+ int firstAttempt = random.nextInt(10);
+ int reducerIdStart = random.nextInt(10);
+ int reducerIdEnd = reducerIdStart + reduceRange - 1;
+
+ Configuration conf = getInitialConf();
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ conf.setInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, 3);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple");
+ UserGroupInformation.setConfiguration(conf);
+ File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+ ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ LOG.info(appId.toString());
+ List<String> attemptIds = IntStream.range(firstAttempt, firstAttempt + attemptRange)
+ .mapToObj(i -> "attempt_12345_1_m_" + i + "_0").collect(Collectors.toList());
+ List<File> fileMap = new ArrayList<>();
+ for (String attemptId : attemptIds) {
+ createShuffleHandlerFiles(absLogDir, user, appId.toString(), attemptId, conf, fileMap, reducerIdStart,
+ reducerIdEnd);
+ }
+ ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify();
+ shuffleHandler.init(conf);
+ try {
+ shuffleHandler.start();
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>("identifier".getBytes(), "password".getBytes(),
+ new Text(user), new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId,
+ ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
+ URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerIdStart + "-" + reducerIdEnd + "&map="
+ + String.join(",", attemptIds));
+ LOG.info("Calling shuffle URL: {}", url);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ conn.connect();
+ boolean succeeded = false;
+ try {
+ DataInputStream is = new DataInputStream(conn.getInputStream());
+ for (String attempt : attemptIds) {
+ int partitionCount = WritableUtils.readVInt(is);
+ List<ShuffleHeader> headers = new ArrayList<>(partitionCount);
+ for (int i = reducerIdStart; i <= reducerIdEnd; i++) {
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(is);
+ Assert.assertEquals("Incorrect map id", attempt, header.getMapId());
+ Assert.assertEquals("Incorrect reduce id", i, header.getPartition());
+ headers.add(header);
+ }
+ for (ShuffleHeader header : headers) {
+ byte[] bytes = new byte[(int) header.getCompressedLength()];
+ is.read(bytes);
+ Assert.assertEquals(TEST_PARTITION_DATA_STRING, new String(bytes));
+ }
+ }
+ succeeded = true;
+ // Read one more byte to force EOF
+ is.readByte();
+ Assert.fail("More fetch bytes that expected in stream");
+ } catch (EOFException e) {
+ Assert.assertTrue("Failed to copy ranged fetch", succeeded);
+ }
+
+ } finally {
+ shuffleHandler.close();
+ FileUtil.fullyDelete(absLogDir);
+ }
+ }
+
/**
* Validate the ownership of the map-output files being pulled in. The
* local-file-system owner of the file should match the user component in the
@@ -824,9 +898,7 @@ public class TestShuffleHandler {
public void testMapFileAccess() throws IOException {
// This will run only in NativeIO is enabled as SecureIOUtils need it
assumeTrue(NativeIO.isAvailable());
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
@@ -842,23 +914,7 @@ public class TestShuffleHandler {
List<File> fileMap = new ArrayList<File>();
createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
conf, fileMap);
- ShuffleHandler shuffleHandler = new ShuffleHandler() {
-
- @Override
- protected Shuffle getShuffle(Configuration conf) {
- // replace the shuffle handler with one stubbed for testing
- return new Shuffle(conf) {
-
- @Override
- protected void verifyRequest(String appid, ChannelHandlerContext ctx,
- HttpRequest request, HttpResponse response, URL requestUri)
- throws IOException {
- // Do nothing.
- }
-
- };
- }
- };
+ ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify();
shuffleHandler.init(conf);
try {
shuffleHandler.start();
@@ -907,48 +963,55 @@ public class TestShuffleHandler {
}
}
- private static void createShuffleHandlerFiles(File logDir, String user,
- String appId, String appAttemptId, Configuration conf,
- List<File> fileMap) throws IOException {
- String attemptDir =
- StringUtils.join(Path.SEPARATOR,
- new String[] { logDir.getAbsolutePath(),
- ShuffleHandler.USERCACHE, user,
- ShuffleHandler.APPCACHE, appId,"dag_1/" + "output",
- appAttemptId });
+ private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId,
+ Configuration conf, List<File> fileMap) throws IOException {
+ createShuffleHandlerFiles(logDir, user, appId, appAttemptId, conf, fileMap, 0, 1);
+ }
+
+ private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId,
+ Configuration conf, List<File> fileMap, int reduceStart, int reduceEnd) throws IOException {
+ String attemptDir = StringUtils.join(Path.SEPARATOR, new String[] { logDir.getAbsolutePath(),
+ ShuffleHandler.USERCACHE, user, ShuffleHandler.APPCACHE, appId, "dag_1/" + "output", appAttemptId });
File appAttemptDir = new File(attemptDir);
appAttemptDir.mkdirs();
- System.out.println(appAttemptDir.getAbsolutePath());
+ LOG.info(appAttemptDir.getAbsolutePath());
File indexFile = new File(appAttemptDir, "file.out.index");
fileMap.add(indexFile);
- createIndexFile(indexFile, conf);
+ createIndexFile(indexFile, conf, reduceStart, reduceEnd);
File mapOutputFile = new File(appAttemptDir, "file.out");
fileMap.add(mapOutputFile);
- createMapOutputFile(mapOutputFile, conf);
+ createMapOutputFile(mapOutputFile, conf, reduceEnd - reduceStart + 1);
}
- private static void
- createMapOutputFile(File mapOutputFile, Configuration conf)
- throws IOException {
+ private static void createMapOutputFile(File mapOutputFile, Configuration conf, int partitionCount)
+ throws IOException {
FileOutputStream out = new FileOutputStream(mapOutputFile);
- out.write("Creating new dummy map output file. Used only for testing"
- .getBytes());
+
+ StringBuilder b = new StringBuilder(partitionCount * TEST_PARTITION_DATA_STRING.length());
+ for (int i = 0; i < partitionCount; i++) {
+ b.append(TEST_PARTITION_DATA_STRING);
+ }
+
+ out.write(b.toString().getBytes());
out.flush();
out.close();
}
- private static void createIndexFile(File indexFile, Configuration conf)
+ private static void createIndexFile(File indexFile, Configuration conf, int reduceStart, int reduceEnd)
throws IOException {
if (indexFile.exists()) {
- System.out.println("Deleting existing file");
+ LOG.info("Deleting existing file");
indexFile.delete();
}
Checksum crc = new PureJavaCrc32();
- TezSpillRecord tezSpillRecord = new TezSpillRecord(2);
- tezSpillRecord.putIndex(new TezIndexRecord(0, 10, 10), 0);
- tezSpillRecord.putIndex(new TezIndexRecord(10, 10, 10), 1);
- tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf,
- FileSystem.getLocal(conf).getRaw(), crc);
+ TezSpillRecord tezSpillRecord = new TezSpillRecord(reduceEnd + 1);
+ int offset = 0;
+ for (int i = reduceStart; i <= reduceEnd; i++) {
+ tezSpillRecord.putIndex(
+ new TezIndexRecord(offset, TEST_PARTITION_DATA_STRING.length(), TEST_PARTITION_DATA_STRING.length()), i);
+ offset += TEST_PARTITION_DATA_STRING.length();
+ }
+ tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, FileSystem.getLocal(conf).getRaw(), crc);
}
@Test
@@ -958,9 +1021,7 @@ public class TestShuffleHandler {
final File tmpDir = new File(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestShuffleHandler.class.getName());
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffle = new ShuffleHandler();
// emulate aux services startup with recovery enabled
@@ -1026,9 +1087,7 @@ public class TestShuffleHandler {
final File tmpDir = new File(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestShuffleHandler.class.getName());
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffle = new ShuffleHandler();
// emulate aux services startup with recovery enabled
@@ -1133,9 +1192,7 @@ public class TestShuffleHandler {
@Test(timeout = 100000)
public void testGetMapOutputInfo() throws Exception {
final AtomicBoolean failureEncountered = new AtomicBoolean(false);
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"simple");
@@ -1237,10 +1294,8 @@ public class TestShuffleHandler {
@Test(timeout = 5000)
public void testDagDelete() throws Exception {
final AtomicBoolean failureEncountered = new AtomicBoolean(false);
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"simple");
UserGroupInformation.setConfiguration(conf);
@@ -1318,9 +1373,8 @@ public class TestShuffleHandler {
@Test
public void testVertexShuffleDelete() throws Exception {
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
- Configuration conf = new Configuration();
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"simple");
UserGroupInformation.setConfiguration(conf);
@@ -1387,7 +1441,7 @@ public class TestShuffleHandler {
fail("Encountered Exception!" + e.getMessage());
}
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@@ -1395,9 +1449,8 @@ public class TestShuffleHandler {
@Test(timeout = 5000)
public void testFailedTaskAttemptDelete() throws Exception {
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
- Configuration conf = new Configuration();
+ Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"simple");
UserGroupInformation.setConfiguration(conf);
@@ -1469,7 +1522,7 @@ public class TestShuffleHandler {
Assert.assertEquals("sendError called due to shuffle error",
0, failures.size());
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@@ -1500,10 +1553,7 @@ public class TestShuffleHandler {
when(mockCh.writeAndFlush(Object.class)).thenReturn(mockFuture);
final ShuffleHandler sh = new MockShuffleHandler();
- Configuration conf = new Configuration();
- conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
- // The Shuffle handler port associated with the service is bound to but not used.
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
sh.init(conf);
sh.start();
int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
@@ -1523,8 +1573,7 @@ public class TestShuffleHandler {
@Test
public void testShuffleHandlerSendsDiskError() throws Exception {
- Configuration conf = new Configuration();
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
DataInputStream input = null;
MockShuffleHandlerWithFatalDiskError shuffleHandler =
@@ -1609,22 +1658,45 @@ public class TestShuffleHandler {
shuffleHandler.serviceStart();
Assert.assertEquals(port, shuffleHandler.getPort());
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
}
}
@Test
public void testConfigPortDynamic() throws Exception {
- Configuration conf = new Configuration();
- // 0 as config, should be dynamically chosen by netty
- conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ Configuration conf = getInitialConf();
MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
shuffleHandler.serviceInit(conf);
try {
shuffleHandler.serviceStart();
Assert.assertTrue("ShuffleHandler should use a random chosen port", shuffleHandler.getPort() > 0);
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
}
}
+
+ private Configuration getInitialConf() {
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
+ // 0 as config, should be dynamically chosen by netty
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ return conf;
+ }
+
+ private ShuffleHandler getShuffleHandlerWithNoVerify() {
+ return new ShuffleHandler() {
+
+ @Override
+ protected Shuffle getShuffle(Configuration conf) {
+ // replace the shuffle handler with one stubbed for testing
+ return new Shuffle(conf) {
+ @Override
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request,
+ HttpResponse response, URL requestUri) throws IOException {
+ // Do nothing.
+ }
+ };
+ }
+ };
+ }
}