You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/03/24 02:39:41 UTC
[incubator-uniffle] branch master updated: [MINOR] test: do not wrap test in try-catch block (#746)
This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 567872b8 [MINOR] test: do not wrap test in try-catch block (#746)
567872b8 is described below
commit 567872b8c89a19c16e31c7919ab196fe7568c02a
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Fri Mar 24 10:39:35 2023 +0800
[MINOR] test: do not wrap test in try-catch block (#746)
### What changes were proposed in this pull request?
Extract test from unnecessary `try-catch` blocks:
```java
@Test
public void test() throws Exception {
try {
// test
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
```
### Why are the changes needed?
It's unnecessary to wrap tests in `try-catch` block.
And it's hard to see outputs of `e.printStackTrace()` in CI.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
---
.../apache/uniffle/common/util/ExitUtilsTest.java | 32 ++-
.../apache/uniffle/common/web/JettyServerTest.java | 41 ++--
.../uniffle/coordinator/CoordinatorServerTest.java | 69 +++----
.../uniffle/server/ShuffleFlushManagerTest.java | 69 +++----
.../apache/uniffle/server/ShuffleServerTest.java | 58 +++---
.../storage/common/ShuffleFileInfoTest.java | 26 +--
.../handler/impl/HdfsClientReadHandlerTest.java | 168 ++++++++-------
.../handler/impl/HdfsShuffleReadHandlerTest.java | 228 ++++++++++-----------
.../impl/KerberizedHdfsClientReadHandlerTest.java | 4 +-
.../impl/KerberizedHdfsShuffleReadHandlerTest.java | 4 +-
10 files changed, 319 insertions(+), 380 deletions(-)
diff --git a/common/src/test/java/org/apache/uniffle/common/util/ExitUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/ExitUtilsTest.java
index 07359834..57654147 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/ExitUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/ExitUtilsTest.java
@@ -27,28 +27,22 @@ import static org.junit.jupiter.api.Assertions.fail;
public class ExitUtilsTest {
@Test
- public void test() {
+ public void test() throws Exception {
+ final int status = -1;
+ final String testExitMessage = "testExitMessage";
try {
- final int status = -1;
- final String testExitMessage = "testExitMessage";
- try {
- ExitUtils.disableSystemExit();
- ExitUtils.terminate(status, testExitMessage, null, null);
- fail();
- } catch (ExitException e) {
- assertEquals(status, e.getStatus());
- assertEquals(testExitMessage, e.getMessage());
- }
-
- final Thread t = new Thread(null, () -> {
- throw new AssertionError("TestUncaughtException");
- }, "testThread");
- t.start();
- t.join();
- } catch (Exception e) {
- e.printStackTrace();
+ ExitUtils.disableSystemExit();
+ ExitUtils.terminate(status, testExitMessage, null, null);
fail();
+ } catch (ExitException e) {
+ assertEquals(status, e.getStatus());
+ assertEquals(testExitMessage, e.getMessage());
}
+ final Thread t = new Thread(null, () -> {
+ throw new AssertionError("TestUncaughtException");
+ }, "testThread");
+ t.start();
+ t.join();
}
}
diff --git a/common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java b/common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java
index c2ecb0cf..149e2428 100644
--- a/common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/web/JettyServerTest.java
@@ -32,7 +32,6 @@ import org.apache.uniffle.common.util.ExitUtils.ExitException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
public class JettyServerTest {
@@ -60,33 +59,27 @@ public class JettyServerTest {
@Test
public void jettyServerStartTest() throws Exception {
- try {
- RssBaseConf conf = new RssBaseConf();
- conf.setString("rss.jetty.http.port", "9527");
- JettyServer jettyServer1 = new JettyServer(conf);
- JettyServer jettyServer2 = new JettyServer(conf);
- jettyServer1.start();
-
- ExitUtils.disableSystemExit();
- final String expectMessage = "Fail to start jetty http server";
- final int expectStatus = 1;
- try {
- jettyServer2.start();
- } catch (Exception e) {
- assertEquals(expectMessage, e.getMessage());
- assertEquals(expectStatus, ((ExitException) e).getStatus());
- }
+ RssBaseConf conf = new RssBaseConf();
+ conf.setString("rss.jetty.http.port", "9527");
+ JettyServer jettyServer1 = new JettyServer(conf);
+ JettyServer jettyServer2 = new JettyServer(conf);
+ jettyServer1.start();
- final Thread t = new Thread(null, () -> {
- throw new AssertionError("TestUncaughtException");
- }, "testThread");
- t.start();
- t.join();
+ ExitUtils.disableSystemExit();
+ final String expectMessage = "Fail to start jetty http server";
+ final int expectStatus = 1;
+ try {
+ jettyServer2.start();
} catch (Exception e) {
- e.printStackTrace();
- fail();
+ assertEquals(expectMessage, e.getMessage());
+ assertEquals(expectStatus, ((ExitException) e).getStatus());
}
+ final Thread t = new Thread(null, () -> {
+ throw new AssertionError("TestUncaughtException");
+ }, "testThread");
+ t.start();
+ t.join();
}
}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
index 33c9eb47..21a30e41 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
@@ -23,54 +23,45 @@ import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.ExitUtils.ExitException;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
public class CoordinatorServerTest {
@Test
- public void test() {
- try {
- CoordinatorConf coordinatorConf = new CoordinatorConf();
- coordinatorConf.setInteger("rss.rpc.server.port", 9537);
- coordinatorConf.setInteger("rss.jetty.http.port", 9528);
- coordinatorConf.setInteger("rss.rpc.executor.size", 10);
-
- CoordinatorServer cs1 = new CoordinatorServer(coordinatorConf);
- CoordinatorServer cs2 = new CoordinatorServer(coordinatorConf);
- cs1.start();
-
- ExitUtils.disableSystemExit();
- String expectMessage = "Fail to start jetty http server";
- final int expectStatus = 1;
- try {
- cs2.start();
- } catch (Exception e) {
- assertEquals(expectMessage, e.getMessage());
- assertEquals(expectStatus, ((ExitException) e).getStatus());
- }
-
- coordinatorConf.setInteger("rss.jetty.http.port", 9529);
- cs2 = new CoordinatorServer(coordinatorConf);
- expectMessage = "Fail to start grpc server";
- try {
- cs2.start();
- } catch (Exception e) {
- assertEquals(expectMessage, e.getMessage());
- assertEquals(expectStatus, ((ExitException) e).getStatus());
- }
-
- final Thread t = new Thread(null, () -> {
- throw new AssertionError("TestUncaughtException");
- }, "testThread");
- t.start();
- t.join();
+ public void test() throws Exception {
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+ coordinatorConf.setInteger("rss.rpc.server.port", 9537);
+ coordinatorConf.setInteger("rss.jetty.http.port", 9528);
+ coordinatorConf.setInteger("rss.rpc.executor.size", 10);
+ CoordinatorServer cs1 = new CoordinatorServer(coordinatorConf);
+ CoordinatorServer cs2 = new CoordinatorServer(coordinatorConf);
+ cs1.start();
+ ExitUtils.disableSystemExit();
+ String expectMessage = "Fail to start jetty http server";
+ final int expectStatus = 1;
+ try {
+ cs2.start();
+ } catch (Exception e) {
+ assertEquals(expectMessage, e.getMessage());
+ assertEquals(expectStatus, ((ExitException) e).getStatus());
+ }
+ coordinatorConf.setInteger("rss.jetty.http.port", 9529);
+ cs2 = new CoordinatorServer(coordinatorConf);
+ expectMessage = "Fail to start grpc server";
+ try {
+ cs2.start();
} catch (Exception e) {
- e.printStackTrace();
- fail();
+ assertEquals(expectMessage, e.getMessage());
+ assertEquals(expectStatus, ((ExitException) e).getStatus());
}
+
+ final Thread t = new Thread(null, () -> {
+ throw new AssertionError("TestUncaughtException");
+ }, "testThread");
+ t.start();
+ t.join();
}
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 497f6ca2..1645b9d1 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -547,42 +547,37 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
}
@Test
- public void processPendingEventsTest(@TempDir File tempDir) {
- try {
- shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.toString());
- shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
- shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
- shuffleServerConf.set(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 5L);
- StorageManager storageManager =
- StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
- ShuffleFlushManager manager =
- new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
- ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, null, null, null);
- assertEquals(0, manager.getPendingEventsSize());
- manager.addPendingEvents(event);
- Thread.sleep(1000);
- assertEquals(0, manager.getPendingEventsSize());
- do {
- Thread.sleep(1 * 1000);
- } while (manager.getEventNumInFlush() != 0);
-
- List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
- ShuffleDataFlushEvent bigEvent = new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, blocks, null, null);
- bigEvent.setUnderStorage(storageManager.selectStorage(event));
- storageManager.updateWriteMetrics(bigEvent, 0);
-
- manager.addPendingEvents(event);
- manager.addPendingEvents(event);
- manager.addPendingEvents(event);
- Thread.sleep(1000);
- assertTrue(2 <= manager.getPendingEventsSize());
- int eventNum = (int) ShuffleServerMetrics.counterTotalDroppedEventNum.get();
- Thread.sleep(6 * 1000);
- assertEquals(eventNum + 3, (int) ShuffleServerMetrics.counterTotalDroppedEventNum.get());
- assertEquals(0, manager.getPendingEventsSize());
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
+ public void processPendingEventsTest(@TempDir File tempDir) throws Exception {
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.toString());
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
+ shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
+ shuffleServerConf.set(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 5L);
+ StorageManager storageManager =
+ StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ ShuffleFlushManager manager =
+ new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
+ ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, null, null, null);
+ assertEquals(0, manager.getPendingEventsSize());
+ manager.addPendingEvents(event);
+ Thread.sleep(1000);
+ assertEquals(0, manager.getPendingEventsSize());
+ do {
+ Thread.sleep(1 * 1000);
+ } while (manager.getEventNumInFlush() != 0);
+
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+ ShuffleDataFlushEvent bigEvent = new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, blocks, null, null);
+ bigEvent.setUnderStorage(storageManager.selectStorage(event));
+ storageManager.updateWriteMetrics(bigEvent, 0);
+
+ manager.addPendingEvents(event);
+ manager.addPendingEvents(event);
+ manager.addPendingEvents(event);
+ Thread.sleep(1000);
+ assertTrue(2 <= manager.getPendingEventsSize());
+ int eventNum = (int) ShuffleServerMetrics.counterTotalDroppedEventNum.get();
+ Thread.sleep(6 * 1000);
+ assertEquals(eventNum + 3, (int) ShuffleServerMetrics.counterTotalDroppedEventNum.get());
+ assertEquals(0, manager.getPendingEventsSize());
}
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index a2cc0895..0dc57338 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -45,43 +45,37 @@ public class ShuffleServerTest {
private File tempDir;
@Test
- public void startTest() {
+ public void startTest() throws Exception {
+ ShuffleServerConf serverConf = createShuffleServerConf();
+ ShuffleServer ss1 = new ShuffleServer(serverConf);
+ ss1.start();
+ ExitUtils.disableSystemExit();
+ ShuffleServer ss2 = new ShuffleServer(serverConf);
+ String expectMessage = "Fail to start jetty http server";
+ final int expectStatus = 1;
try {
- ShuffleServerConf serverConf = createShuffleServerConf();
- ShuffleServer ss1 = new ShuffleServer(serverConf);
- ss1.start();
- ExitUtils.disableSystemExit();
- ShuffleServer ss2 = new ShuffleServer(serverConf);
- String expectMessage = "Fail to start jetty http server";
- final int expectStatus = 1;
- try {
- ss2.start();
- } catch (Exception e) {
- assertEquals(expectMessage, e.getMessage());
- assertEquals(expectStatus, ((ExitException) e).getStatus());
- }
-
- serverConf.setInteger("rss.jetty.http.port", 9529);
- ss2 = new ShuffleServer(serverConf);
- expectMessage = "Fail to start grpc server";
- try {
- ss2.start();
- } catch (Exception e) {
- assertEquals(expectMessage, e.getMessage());
- assertEquals(expectStatus, ((ExitException) e).getStatus());
- }
- ss1.stopServer();
+ ss2.start();
+ } catch (Exception e) {
+ assertEquals(expectMessage, e.getMessage());
+ assertEquals(expectStatus, ((ExitException) e).getStatus());
+ }
- final Thread t = new Thread(null, () -> {
- throw new AssertionError("TestUncaughtException");
- }, "testThread");
- t.start();
- t.join();
+ serverConf.setInteger("rss.jetty.http.port", 9529);
+ ss2 = new ShuffleServer(serverConf);
+ expectMessage = "Fail to start grpc server";
+ try {
+ ss2.start();
} catch (Exception e) {
- e.printStackTrace();
- fail();
+ assertEquals(expectMessage, e.getMessage());
+ assertEquals(expectStatus, ((ExitException) e).getStatus());
}
+ ss1.stopServer();
+ final Thread t = new Thread(null, () -> {
+ throw new AssertionError("TestUncaughtException");
+ }, "testThread");
+ t.start();
+ t.join();
}
@ParameterizedTest
diff --git a/storage/src/test/java/org/apache/uniffle/storage/common/ShuffleFileInfoTest.java b/storage/src/test/java/org/apache/uniffle/storage/common/ShuffleFileInfoTest.java
index 12f09581..cdef65fa 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/common/ShuffleFileInfoTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/common/ShuffleFileInfoTest.java
@@ -23,26 +23,20 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleFileInfoTest {
@Test
- public void test() {
- try {
- ShuffleFileInfo shuffleFileInfo = new ShuffleFileInfo();
- shuffleFileInfo.getDataFiles().add(File.createTempFile("dummy-data-file", ".data"));
- shuffleFileInfo.setKey("key");
- assertFalse(shuffleFileInfo.isValid());
+ public void test() throws Exception {
+ ShuffleFileInfo shuffleFileInfo = new ShuffleFileInfo();
+ shuffleFileInfo.getDataFiles().add(File.createTempFile("dummy-data-file", ".data"));
+ shuffleFileInfo.setKey("key");
+ assertFalse(shuffleFileInfo.isValid());
- shuffleFileInfo.getIndexFiles().add(File.createTempFile("dummy-data-file", ".index"));
- shuffleFileInfo.getPartitions().add(12);
- shuffleFileInfo.setSize(1024 * 1024 * 32);
- assertTrue(shuffleFileInfo.isValid());
- assertFalse(shuffleFileInfo.shouldCombine(32));
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ shuffleFileInfo.getIndexFiles().add(File.createTempFile("dummy-data-file", ".index"));
+ shuffleFileInfo.getPartitions().add(12);
+ shuffleFileInfo.setSize(1024 * 1024 * 32);
+ assertTrue(shuffleFileInfo.isValid());
+ assertFalse(shuffleFileInfo.shouldCombine(32));
}
}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
index 6309392f..2662a61f 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
@@ -45,100 +45,96 @@ import static org.junit.jupiter.api.Assertions.fail;
public class HdfsClientReadHandlerTest extends HdfsTestBase {
- public static void createAndRunCases(String clusterPathPrefix, Configuration hadoopConf, String writeUser) {
- try {
- String basePath = clusterPathPrefix + "clientReadTest1";
- HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler(
- "appId",
- 0,
- 1,
- 1,
- basePath,
- "test",
- hadoopConf,
- writeUser);
-
- Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
-
- int readBufferSize = 13;
- int total = 0;
- int totalBlockNum = 0;
- int expectTotalBlockNum = 0;
-
- for (int i = 0; i < 5; i++) {
- writeHandler.setFailTimes(i);
- int num = new Random().nextInt(17);
- writeTestData(writeHandler, num, 3, 0, expectedData);
- total += calcExpectedSegmentNum(num, 3, readBufferSize);
- expectTotalBlockNum += num;
- expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
- }
+ public static void createAndRunCases(String clusterPathPrefix, Configuration hadoopConf, String writeUser)
+ throws Exception {
+ String basePath = clusterPathPrefix + "clientReadTest1";
+ HdfsShuffleWriteHandler writeHandler =
+ new HdfsShuffleWriteHandler(
+ "appId",
+ 0,
+ 1,
+ 1,
+ basePath,
+ "test",
+ hadoopConf,
+ writeUser);
+
+ Map<Long, byte[]> expectedData = Maps.newHashMap();
+ Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+
+ int readBufferSize = 13;
+ int total = 0;
+ int totalBlockNum = 0;
+ int expectTotalBlockNum = 0;
+
+ for (int i = 0; i < 5; i++) {
+ writeHandler.setFailTimes(i);
+ int num = new Random().nextInt(17);
+ writeTestData(writeHandler, num, 3, 0, expectedData);
+ total += calcExpectedSegmentNum(num, 3, readBufferSize);
+ expectTotalBlockNum += num;
+ expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+ }
- /**
- * This part is to check the fault tolerance of reading HDFS incomplete index file
- */
- String indexFileName = ShuffleStorageUtils.generateIndexFileName("test_0");
- HdfsFileWriter indexWriter = writeHandler.createWriter(indexFileName);
- indexWriter.writeData(ByteBuffer.allocate(4).putInt(169560).array());
- indexWriter.writeData(ByteBuffer.allocate(4).putInt(999).array());
- indexWriter.close();
-
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
-
- HdfsShuffleReadHandler indexReader = new HdfsShuffleReadHandler(
- "appId", 0, 1, basePath + "/appId/0/1-1/test_0",
- readBufferSize, expectBlockIds, processBlockIds, hadoopConf);
- try {
- ShuffleIndexResult indexResult = indexReader.readShuffleIndex();
- assertEquals(0, indexResult.getIndexData().length % FileBasedShuffleSegment.SEGMENT_SIZE);
- } catch (Exception e) {
- fail();
- }
+ /**
+ * This part is to check the fault tolerance of reading HDFS incomplete index file
+ */
+ String indexFileName = ShuffleStorageUtils.generateIndexFileName("test_0");
+ HdfsFileWriter indexWriter = writeHandler.createWriter(indexFileName);
+ indexWriter.writeData(ByteBuffer.allocate(4).putInt(169560).array());
+ indexWriter.writeData(ByteBuffer.allocate(4).putInt(999).array());
+ indexWriter.close();
- HdfsClientReadHandler handler = new HdfsClientReadHandler(
- "appId",
- 0,
- 1,
- 1024 * 10214,
- 1,
- 10,
- readBufferSize,
- expectBlockIds,
- processBlockIds,
- basePath,
- hadoopConf);
- Set<Long> actualBlockIds = Sets.newHashSet();
-
- for (int i = 0; i < total; ++i) {
- ShuffleDataResult shuffleDataResult = handler.readShuffleData();
- totalBlockNum += shuffleDataResult.getBufferSegments().size();
- checkData(shuffleDataResult, expectedData);
- for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
- actualBlockIds.add(bufferSegment.getBlockId());
- }
- }
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
- assertTrue(handler.readShuffleData().isEmpty());
- assertEquals(
- total,
- handler.getHdfsShuffleFileReadHandlers()
- .stream()
- .mapToInt(i -> i.getShuffleDataSegments().size())
- .sum());
- assertEquals(expectTotalBlockNum, totalBlockNum);
- assertEquals(expectedData.keySet(), actualBlockIds);
- assertEquals(5, handler.getReadHandlerIndex());
- handler.close();
+ HdfsShuffleReadHandler indexReader = new HdfsShuffleReadHandler(
+ "appId", 0, 1, basePath + "/appId/0/1-1/test_0",
+ readBufferSize, expectBlockIds, processBlockIds, hadoopConf);
+ try {
+ ShuffleIndexResult indexResult = indexReader.readShuffleIndex();
+ assertEquals(0, indexResult.getIndexData().length % FileBasedShuffleSegment.SEGMENT_SIZE);
} catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ fail();
}
+
+ HdfsClientReadHandler handler = new HdfsClientReadHandler(
+ "appId",
+ 0,
+ 1,
+ 1024 * 10214,
+ 1,
+ 10,
+ readBufferSize,
+ expectBlockIds,
+ processBlockIds,
+ basePath,
+ hadoopConf);
+ Set<Long> actualBlockIds = Sets.newHashSet();
+
+ for (int i = 0; i < total; ++i) {
+ ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+ totalBlockNum += shuffleDataResult.getBufferSegments().size();
+ checkData(shuffleDataResult, expectedData);
+ for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
+ actualBlockIds.add(bufferSegment.getBlockId());
+ }
+ }
+
+ assertTrue(handler.readShuffleData().isEmpty());
+ assertEquals(
+ total,
+ handler.getHdfsShuffleFileReadHandlers()
+ .stream()
+ .mapToInt(i -> i.getShuffleDataSegments().size())
+ .sum());
+ assertEquals(expectTotalBlockNum, totalBlockNum);
+ assertEquals(expectedData.keySet(), actualBlockIds);
+ assertEquals(5, handler.getReadHandlerIndex());
+ handler.close();
}
@Test
- public void test() {
+ public void test() throws Exception {
createAndRunCases(HDFS_URI, conf, StringUtils.EMPTY);
}
}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
index cd2df454..1e12aff9 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
@@ -45,141 +45,127 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.fail;
public class HdfsShuffleReadHandlerTest extends HdfsTestBase {
- public static void createAndRunCases(String clusterPathPrefix, Configuration conf, String user) {
- try {
- String basePath = clusterPathPrefix + "HdfsShuffleFileReadHandlerTest";
- HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler(
- "appId",
- 0,
- 1,
- 1,
- basePath,
- "test",
- conf,
- user);
-
- Map<Long, byte[]> expectedData = Maps.newHashMap();
-
- int readBufferSize = 13;
- int totalBlockNum = 0;
- int expectTotalBlockNum = new Random().nextInt(37);
- int blockSize = new Random().nextInt(7) + 1;
- HdfsShuffleHandlerTestBase.writeTestData(writeHandler, expectTotalBlockNum, blockSize, 0, expectedData);
- int total = HdfsShuffleHandlerTestBase.calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
- expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
- String fileNamePrefix = ShuffleStorageUtils.getFullShuffleDataFolder(basePath,
- ShuffleStorageUtils.getShuffleDataPathWithRange("appId",
- 0, 1, 1, 10)) + "/test_0";
- HdfsShuffleReadHandler handler =
- new HdfsShuffleReadHandler("appId", 0, 1, fileNamePrefix,
- readBufferSize, expectBlockIds, processBlockIds, conf);
-
- Set<Long> actualBlockIds = Sets.newHashSet();
- for (int i = 0; i < total; ++i) {
- ShuffleDataResult shuffleDataResult = handler.readShuffleData();
- totalBlockNum += shuffleDataResult.getBufferSegments().size();
- HdfsShuffleHandlerTestBase.checkData(shuffleDataResult, expectedData);
- for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
- actualBlockIds.add(bufferSegment.getBlockId());
- }
+ public static void createAndRunCases(String clusterPathPrefix, Configuration conf, String user) throws Exception {
+ String basePath = clusterPathPrefix + "HdfsShuffleFileReadHandlerTest";
+ HdfsShuffleWriteHandler writeHandler =
+ new HdfsShuffleWriteHandler(
+ "appId",
+ 0,
+ 1,
+ 1,
+ basePath,
+ "test",
+ conf,
+ user);
+
+ Map<Long, byte[]> expectedData = Maps.newHashMap();
+
+ int readBufferSize = 13;
+ int totalBlockNum = 0;
+ int expectTotalBlockNum = new Random().nextInt(37);
+ int blockSize = new Random().nextInt(7) + 1;
+ HdfsShuffleHandlerTestBase.writeTestData(writeHandler, expectTotalBlockNum, blockSize, 0, expectedData);
+ int total = HdfsShuffleHandlerTestBase.calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize);
+ Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+ String fileNamePrefix = ShuffleStorageUtils.getFullShuffleDataFolder(basePath,
+ ShuffleStorageUtils.getShuffleDataPathWithRange("appId",
+ 0, 1, 1, 10)) + "/test_0";
+ HdfsShuffleReadHandler handler =
+ new HdfsShuffleReadHandler("appId", 0, 1, fileNamePrefix,
+ readBufferSize, expectBlockIds, processBlockIds, conf);
+
+ Set<Long> actualBlockIds = Sets.newHashSet();
+ for (int i = 0; i < total; ++i) {
+ ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+ totalBlockNum += shuffleDataResult.getBufferSegments().size();
+ HdfsShuffleHandlerTestBase.checkData(shuffleDataResult, expectedData);
+ for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
+ actualBlockIds.add(bufferSegment.getBlockId());
}
-
- assertNull(handler.readShuffleData());
- assertEquals(
- total,
- handler.getShuffleDataSegments().size());
- assertEquals(expectTotalBlockNum, totalBlockNum);
- assertEquals(expectedData.keySet(), actualBlockIds);
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
+
+ assertNull(handler.readShuffleData());
+ assertEquals(
+ total,
+ handler.getShuffleDataSegments().size());
+ assertEquals(expectTotalBlockNum, totalBlockNum);
+ assertEquals(expectedData.keySet(), actualBlockIds);
}
@Test
- public void test() {
+ public void test() throws Exception {
createAndRunCases(HDFS_URI, conf, StringUtils.EMPTY);
}
@Test
- public void testDataInconsistent() {
-
- try {
- String basePath = HDFS_URI + "HdfsShuffleFileReadHandlerTest#testDataInconsistent";
- TestHdfsShuffleWriteHandler writeHandler =
- new TestHdfsShuffleWriteHandler(
- "appId",
- 0,
- 1,
- 1,
- basePath,
- "test",
- conf,
- StringUtils.EMPTY);
-
- Map<Long, byte[]> expectedData = Maps.newHashMap();
- int totalBlockNum = 0;
- int expectTotalBlockNum = 6;
- int blockSize = 7;
- int taskAttemptId = 0;
-
- // write expectTotalBlockNum - 1 complete block
- HdfsShuffleHandlerTestBase.writeTestData(writeHandler, expectTotalBlockNum - 1,
- blockSize, taskAttemptId, expectedData);
-
- // write 1 incomplete block , which only write index file
- List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
- byte[] buf = new byte[blockSize];
- new Random().nextBytes(buf);
- long blockId = (expectTotalBlockNum
- << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
- blocks.add(new ShufflePartitionedBlock(blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId,
- taskAttemptId, buf));
- writeHandler.writeIndex(blocks);
-
- int readBufferSize = 13;
- int total = HdfsShuffleHandlerTestBase.calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
- expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
- String fileNamePrefix = ShuffleStorageUtils.getFullShuffleDataFolder(basePath,
- ShuffleStorageUtils.getShuffleDataPathWithRange("appId",
- 0, 1, 1, 10)) + "/test_0";
- HdfsShuffleReadHandler handler =
- new HdfsShuffleReadHandler("appId", 0, 1, fileNamePrefix,
- readBufferSize, expectBlockIds, processBlockIds, conf);
-
- Set<Long> actualBlockIds = Sets.newHashSet();
- for (int i = 0; i < total; ++i) {
- ShuffleDataResult shuffleDataResult = handler.readShuffleData();
- totalBlockNum += shuffleDataResult.getBufferSegments().size();
- HdfsShuffleHandlerTestBase.checkData(shuffleDataResult, expectedData);
- for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
- actualBlockIds.add(bufferSegment.getBlockId());
- }
+ public void testDataInconsistent() throws Exception {
+ String basePath = HDFS_URI + "HdfsShuffleFileReadHandlerTest#testDataInconsistent";
+ TestHdfsShuffleWriteHandler writeHandler =
+ new TestHdfsShuffleWriteHandler(
+ "appId",
+ 0,
+ 1,
+ 1,
+ basePath,
+ "test",
+ conf,
+ StringUtils.EMPTY);
+
+ Map<Long, byte[]> expectedData = Maps.newHashMap();
+ int totalBlockNum = 0;
+ int expectTotalBlockNum = 6;
+ int blockSize = 7;
+ int taskAttemptId = 0;
+
+ // write expectTotalBlockNum - 1 complete block
+ HdfsShuffleHandlerTestBase.writeTestData(writeHandler, expectTotalBlockNum - 1,
+ blockSize, taskAttemptId, expectedData);
+
+ // write 1 incomplete block , which only write index file
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
+ byte[] buf = new byte[blockSize];
+ new Random().nextBytes(buf);
+ long blockId = (expectTotalBlockNum
+ << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+ + taskAttemptId;
+ blocks.add(new ShufflePartitionedBlock(blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId,
+ taskAttemptId, buf));
+ writeHandler.writeIndex(blocks);
+
+ int readBufferSize = 13;
+ int total = HdfsShuffleHandlerTestBase.calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize);
+ Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+ String fileNamePrefix = ShuffleStorageUtils.getFullShuffleDataFolder(basePath,
+ ShuffleStorageUtils.getShuffleDataPathWithRange("appId",
+ 0, 1, 1, 10)) + "/test_0";
+ HdfsShuffleReadHandler handler =
+ new HdfsShuffleReadHandler("appId", 0, 1, fileNamePrefix,
+ readBufferSize, expectBlockIds, processBlockIds, conf);
+
+ Set<Long> actualBlockIds = Sets.newHashSet();
+ for (int i = 0; i < total; ++i) {
+ ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+ totalBlockNum += shuffleDataResult.getBufferSegments().size();
+ HdfsShuffleHandlerTestBase.checkData(shuffleDataResult, expectedData);
+ for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
+ actualBlockIds.add(bufferSegment.getBlockId());
}
-
- assertNull(handler.readShuffleData());
- assertEquals(
- total,
- handler.getShuffleDataSegments().size());
- // The last block cannot be read, only the index is generated
- assertEquals(expectTotalBlockNum - 1, totalBlockNum);
- assertEquals(expectedData.keySet(), actualBlockIds);
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
+
+ assertNull(handler.readShuffleData());
+ assertEquals(
+ total,
+ handler.getShuffleDataSegments().size());
+ // The last block cannot be read, only the index is generated
+ assertEquals(expectTotalBlockNum - 1, totalBlockNum);
+ assertEquals(expectedData.keySet(), actualBlockIds);
}
static class TestHdfsShuffleWriteHandler extends HdfsShuffleWriteHandler {
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsClientReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsClientReadHandlerTest.java
index 682fd927..78d1521f 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsClientReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsClientReadHandlerTest.java
@@ -17,8 +17,6 @@
package org.apache.uniffle.storage.handler.impl;
-import java.io.IOException;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -39,7 +37,7 @@ public class KerberizedHdfsClientReadHandlerTest extends KerberizedHdfsBase {
}
@Test
- public void test() throws IOException {
+ public void test() throws Exception {
HdfsClientReadHandlerTest.createAndRunCases(
kerberizedHdfs.getSchemeAndAuthorityPrefix(),
kerberizedHdfs.getConf(),
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsShuffleReadHandlerTest.java
index f5b9ef74..5e4ea693 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsShuffleReadHandlerTest.java
@@ -17,8 +17,6 @@
package org.apache.uniffle.storage.handler.impl;
-import java.io.IOException;
-
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -39,7 +37,7 @@ public class KerberizedHdfsShuffleReadHandlerTest extends KerberizedHdfsBase {
}
@Test
- public void test() throws IOException {
+ public void test() throws Exception {
HdfsShuffleReadHandlerTest.createAndRunCases(
kerberizedHdfs.getSchemeAndAuthorityPrefix(),
kerberizedHdfs.getConf(),