You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/16 11:05:36 UTC

[incubator-uniffle] branch master updated: [ISSUE-329][Bug] Catch NPE in org.apache.uniffle.server.ShuffleTaskManager#addFinishedBlockIds (#331)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f0610a9 [ISSUE-329][Bug] Catch NPE in org.apache.uniffle.server.ShuffleTaskManager#addFinishedBlockIds (#331)
9f0610a9 is described below

commit 9f0610a9464d859597e09996c9665f286837dc80
Author: xianjingfeng <58...@qq.com>
AuthorDate: Wed Nov 16 19:05:31 2022 +0800

    [ISSUE-329][Bug] Catch NPE in org.apache.uniffle.server.ShuffleTaskManager#addFinishedBlockIds (#331)
    
    ### What changes were proposed in this pull request?
    If app expired, thrown a runtime exception with message
    
    ### Why are the changes needed?
    `reportShuffleResult` after app expired in shuffle server will throw NPE #329
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added
---
 .../apache/uniffle/server/ShuffleTaskManager.java  |  3 ++
 .../uniffle/server/ShuffleTaskManagerTest.java     | 36 ++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 504b791f..cea1dcb6 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -238,6 +238,9 @@ public class ShuffleTaskManager {
       String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) {
     refreshAppId(appId);
     Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
+    if (shuffleIdToPartitions == null) {
+      throw new RuntimeException("appId[" + appId  + "] is expired!");
+    }
     if (!shuffleIdToPartitions.containsKey(shuffleId)) {
       Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum];
       for (int i = 0; i < bitmapNum; i++) {
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index f557531f..99b79327 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -597,6 +597,42 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     assertEquals(expectedBlockIds, resBlockIds);
   }
 
+
+  @Test
+  public void testAddFinishedBlockIdsWithoutRegister() throws Exception {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    String storageBasePath = HDFS_URI + "rss/test";
+    String appId = "testAddFinishedBlockIdsToExpiredApp";
+    final int shuffleId = 1;
+    final int bitNum = 3;
+    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+    conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+    conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+    conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
+    conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+    conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+    conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
+    conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+    ShuffleServer shuffleServer = new ShuffleServer(conf);
+    ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
+    ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
+    StorageManager storageManager = shuffleServer.getStorageManager();
+    ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager,
+        shuffleBufferManager, storageManager);
+    Map<Integer, long[]>  blockIdsToReport = Maps.newHashMap();
+    try {
+      shuffleTaskManager.addFinishedBlockIds(appId, shuffleId, blockIdsToReport, bitNum);
+      fail("Exception should be thrown");
+    } catch (RuntimeException e) {
+      assertTrue(e.getMessage().equals("appId[" + appId + "] is expired!"));
+    }
+  }
+
   // copy from ClientUtils
   private Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
     return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))