You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/16 11:05:36 UTC
[incubator-uniffle] branch master updated: [ISSUE-329][Bug] Catch NPE in org.apache.uniffle.server.ShuffleTaskManager#addFinishedBlockIds (#331)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9f0610a9 [ISSUE-329][Bug] Catch NPE in org.apache.uniffle.server.ShuffleTaskManager#addFinishedBlockIds (#331)
9f0610a9 is described below
commit 9f0610a9464d859597e09996c9665f286837dc80
Author: xianjingfeng <58...@qq.com>
AuthorDate: Wed Nov 16 19:05:31 2022 +0800
[ISSUE-329][Bug] Catch NPE in org.apache.uniffle.server.ShuffleTaskManager#addFinishedBlockIds (#331)
### What changes were proposed in this pull request?
If app expired, thrown a runtime exception with message
### Why are the changes needed?
`reportShuffleResult` after app expired in shuffle server will throw NPE #329
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added
---
.../apache/uniffle/server/ShuffleTaskManager.java | 3 ++
.../uniffle/server/ShuffleTaskManagerTest.java | 36 ++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 504b791f..cea1dcb6 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -238,6 +238,9 @@ public class ShuffleTaskManager {
String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) {
refreshAppId(appId);
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
+ if (shuffleIdToPartitions == null) {
+ throw new RuntimeException("appId[" + appId + "] is expired!");
+ }
if (!shuffleIdToPartitions.containsKey(shuffleId)) {
Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum];
for (int i = 0; i < bitmapNum; i++) {
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index f557531f..99b79327 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -597,6 +597,42 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
assertEquals(expectedBlockIds, resBlockIds);
}
+
+ @Test
+ public void testAddFinishedBlockIdsWithoutRegister() throws Exception {
+ ShuffleServerConf conf = new ShuffleServerConf();
+ String storageBasePath = HDFS_URI + "rss/test";
+ String appId = "testAddFinishedBlockIdsToExpiredApp";
+ final int shuffleId = 1;
+ final int bitNum = 3;
+ conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+ conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+ conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+ conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+ conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
+ conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+ conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
+ conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+ ShuffleServer shuffleServer = new ShuffleServer(conf);
+ ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
+ ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
+ StorageManager storageManager = shuffleServer.getStorageManager();
+ ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager,
+ shuffleBufferManager, storageManager);
+ Map<Integer, long[]> blockIdsToReport = Maps.newHashMap();
+ try {
+ shuffleTaskManager.addFinishedBlockIds(appId, shuffleId, blockIdsToReport, bitNum);
+ fail("Exception should be thrown");
+ } catch (RuntimeException e) {
+ assertTrue(e.getMessage().equals("appId[" + appId + "] is expired!"));
+ }
+ }
+
// copy from ClientUtils
private Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))