You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/01 11:17:53 UTC

[incubator-uniffle] branch master updated: [#872][FOLLOWUP] feat(tez): Modify utils and add test case (#916)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4979ca63 [#872][FOLLOWUP] feat(tez): Modify utils and add test case (#916)
4979ca63 is described below

commit 4979ca633d25c67cd1ca7067995fa373d0017c87
Author: Qing <11...@qq.com>
AuthorDate: Thu Jun 1 19:17:46 2023 +0800

    [#872][FOLLOWUP] feat(tez): Modify utils and add test case (#916)
    
    ### What changes were proposed in this pull request?
    
    modify utils and add test case
    
    ### Why are the changes needed?
    
    Fix: #872
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    unit test
---
 .../main/java/org/apache/tez/common/IdUtils.java   |  61 ++++++++
 .../java/org/apache/tez/common/RssTezUtils.java    | 164 ++++++---------------
 .../common/TezRemoteShuffleUmbilicalProtocol.java  |  42 ++++++
 .../java/org/apache/tez/common/IdUtilsTest.java    |  42 ++++++
 .../org/apache/tez/common/RssTezUtilsTest.java     |  38 ++++-
 5 files changed, 224 insertions(+), 123 deletions(-)

diff --git a/client-tez/src/main/java/org/apache/tez/common/IdUtils.java b/client-tez/src/main/java/org/apache/tez/common/IdUtils.java
new file mode 100644
index 00000000..f8d35932
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/IdUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IdUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(IdUtils.class);
+
+  private IdUtils() {
+  }
+
+  /**
+   *
+   * @param pathComponent, like: attempt_1681717153064_2768836_2_00_000000_0_10006
+   * @return remove last 6 char, return TezTaskAttemptID
+   */
+  public static TezTaskAttemptID convertTezTaskAttemptID(String pathComponent) {
+    LOG.info("convertTezTaskAttemptID, pathComponent:{}", pathComponent);
+    return TezTaskAttemptID.fromString(pathComponent.substring(0, pathComponent.length() - 6));
+
+  }
+
+
+  /**
+   * @return ApplicationAttemptId, eg: appattempt_1681717153064_2719964_000001
+   */
+  public static ApplicationAttemptId getApplicationAttemptId() {
+    String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+    ContainerId containerId = ContainerId.fromString(containerIdStr);
+    return containerId.getApplicationAttemptId();
+  }
+
+  /**
+   * Get application id attempt.
+   * @return ApplicationAttemptId attempt id, eg: 1, 2, 3
+   */
+  public static int getAppAttemptId() {
+    return getApplicationAttemptId().getAttemptId();
+  }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index 5eac59e8..ed4df3e8 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -17,21 +17,14 @@
 
 package org.apache.tez.common;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -53,11 +46,7 @@ public class RssTezUtils {
 
   public static final String HOST_NAME = "hostname";
 
-  public static final String PLUS_DELIMITER = "+";
   public static final String UNDERLINE_DELIMITER = "_";
-  public static final String COLON_DELIMITER = ":";
-  public static final String COMMA_DELIMITER = ",";
-
   // constant to compute shuffle id
   private static final int VERTEX_ID_MAPPING_MAX_ID = 500;
   private static final String VERTEX_ID_MAPPING_MAP = "Map";
@@ -114,58 +103,6 @@ public class RssTezUtils {
     return reqBytes;
   }
 
-  public static String uniformPartitionHostInfo(Map<Integer, List<ShuffleServerInfo>> map) {
-    List<String> res = new ArrayList<>();
-    String tmp;
-    for (Map.Entry<Integer, List<ShuffleServerInfo>> entry : map.entrySet()) {
-      Integer partitionId = entry.getKey();
-      List<ShuffleServerInfo> shuffleServerInfos = entry.getValue();
-      for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfos) {
-        tmp = partitionId + UNDERLINE_DELIMITER + shuffleServerInfo.getHost() + COLON_DELIMITER
-            + shuffleServerInfo.getNettyPort();
-        res.add(tmp);
-      }
-    }
-    return StringUtils.join(res, COMMA_DELIMITER);
-  }
-
-  public static Map<String, List<String>> uniformServerToPartitions(String partitionToServers) {
-    Map<String, List<String>> serverToPartitions = new HashMap<>();
-    List<String> list;
-
-    String[] pidWithWorkerInfos = partitionToServers.split(COMMA_DELIMITER);
-    for (String pidWithWorkerInfo : pidWithWorkerInfos) {
-      String[] pidUnderLineWorkerInfo = pidWithWorkerInfo.split(UNDERLINE_DELIMITER);
-      if (serverToPartitions.containsKey(pidUnderLineWorkerInfo[1])) {
-        list = serverToPartitions.get(pidUnderLineWorkerInfo[1]);
-        list.add(pidUnderLineWorkerInfo[0]);
-      } else {
-        list = new ArrayList<>();
-        list.add(pidUnderLineWorkerInfo[0]);
-        serverToPartitions.put(pidUnderLineWorkerInfo[1], list);
-      }
-    }
-
-    return serverToPartitions;
-  }
-
-  public static String uniformServerToPartitions(Map<String, List<String>> map) {
-    List<String> res = new ArrayList<>();
-    for (Map.Entry<String, List<String>> entry : map.entrySet()) {
-      String server = entry.getKey();
-      List<String> partitions = entry.getValue();
-      String join = StringUtils.join(partitions, UNDERLINE_DELIMITER);
-      res.add(server + PLUS_DELIMITER + join);
-    }
-    return StringUtils.join(res, COMMA_DELIMITER);
-  }
-
-  public static ApplicationAttemptId getApplicationAttemptId() {
-    String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
-    ContainerId containerId = ContainerId.fromString(containerIdStr);
-    return containerId.getApplicationAttemptId();
-  }
-
   public static String uniqueIdentifierToAttemptId(String uniqueIdentifier) {
     if (uniqueIdentifier == null) {
       throw new RssException("uniqueIdentifier should not be null");
@@ -175,6 +112,7 @@ public class RssTezUtils {
   }
 
   public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
+    LOG.info("GetBlockId, partitionId:{}, taskAttemptId:{}, nextSeqNo:{}", partitionId, taskAttemptId, nextSeqNo);
     long attemptId = taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
     if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
       throw new RssException("Can't support attemptId [" + attemptId
@@ -191,6 +129,7 @@ public class RssTezUtils {
     }
     long taskId = taskAttemptId - (attemptId
         << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+
     if (taskId < 0 ||  taskId > Constants.MAX_TASK_ATTEMPT_ID) {
       throw new RssException("Can't support taskId["
           + taskId + "], the max value should be " + Constants.MAX_TASK_ATTEMPT_ID);
@@ -240,14 +179,6 @@ public class RssTezUtils {
     return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
   }
 
-  // compute shuffle id using InputContext
-  public static int computeShuffleId(InputContext inputContext) {
-    int dagIdentifier = inputContext.getDagIdentifier();
-    String sourceVertexName = inputContext.getSourceVertexName();
-    String taskVertexName  = inputContext.getTaskVertexName();
-    return RssTezUtils.computeShuffleId(dagIdentifier, sourceVertexName, taskVertexName);
-  }
-
   /**
    *
    * @param tezDagID Get from tez InputContext, represent dag id.
@@ -305,57 +236,43 @@ public class RssTezUtils {
       throw new RssException("TaskAttempt " + taskAttemptID + " high bytes " + highBytes
           + " exceed, appAttemptId:" + appAttemptId);
     }
-
     long id = (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH)) + lowBytes;
-    LOG.info("ConvertTaskAttemptIdToLong id is {}", id);
-    LOG.info("LowBytes id is {}", lowBytes);
-    LOG.info("HighBytes id is {}", highBytes);
+    LOG.info("ConvertTaskAttemptIdToLong taskAttemptID:{}, id is {}, .", taskAttemptID, id);
     return id;
   }
 
-  public static Roaring64NavigableMap fetchAllRssTaskIds(
-          Set<InputAttemptIdentifier> successMapTaskAttempts,
-          Integer totalMapsCount,
-          Integer appAttemptId) {
-    Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap mapIndexBitmap = Roaring64NavigableMap.bitmapOf();
+  public static Roaring64NavigableMap fetchAllRssTaskIds(Set<InputAttemptIdentifier> successMapTaskAttempts,
+          int totalMapsCount, int appAttemptId) {
     String errMsg = "TaskAttemptIDs are inconsistent with map tasks";
-    LOG.info("FetchAllRssTaskIds successMapTaskAttempts:{}", successMapTaskAttempts);
+    Roaring64NavigableMap rssTaskIdBitmap = Roaring64NavigableMap.bitmapOf();
+    Roaring64NavigableMap mapTaskIdBitmap = Roaring64NavigableMap.bitmapOf();
+    LOG.info("FetchAllRssTaskIds successMapTaskAttempts size:{}", successMapTaskAttempts.size());
     LOG.info("FetchAllRssTaskIds totalMapsCount:{}, appAttemptId:{}", totalMapsCount, appAttemptId);
 
     for (InputAttemptIdentifier inputAttemptIdentifier: successMapTaskAttempts) {
-      int mapIndex = inputAttemptIdentifier.getInputIdentifier();
       String pathComponent = inputAttemptIdentifier.getPathComponent();
-      int taskId = RssTezUtils.taskIdStrToTaskId(pathComponent);
-      // There can be multiple successful attempts on same map task.
-      // So we only need to accept one of them.
-      LOG.info("FetchAllRssTaskIds, taskId:{}, is contains:{}", taskId, mapIndexBitmap.contains(taskId));
-      if (!mapIndexBitmap.contains(taskId)) {
-        taskIdBitmap.addLong(taskId);
-
-        LOG.info("FetchAllRssTaskIds, successMapTaskAttempts:{}, totalMapsCount:{}, appAttemptId:{}, mapIndex:{}, "
-                + "totalMapsCount:{}, taskId: {} ",
-            successMapTaskAttempts.size(), totalMapsCount, appAttemptId, mapIndex, totalMapsCount, taskId);
-
-        if (mapIndex < totalMapsCount) {  // up-stream map task index should < total task number(including failed task)
-          mapIndexBitmap.addLong(taskId);
-        } else {
-
-          LOG.error("SuccessMapTaskAttempts:{}, totalMapsCount:{}, appAttemptId:{}, mapIndex:{}, totalMapsCount:{} ",
-              successMapTaskAttempts, totalMapsCount, appAttemptId, mapIndex, totalMapsCount);
-
-          LOG.error(inputAttemptIdentifier + " has overflowed mapIndex");
-          throw new IllegalStateException(errMsg);
+      TezTaskAttemptID mapTaskAttemptID = IdUtils.convertTezTaskAttemptID(pathComponent);
+      long rssTaskId = RssTezUtils.convertTaskAttemptIdToLong(mapTaskAttemptID, appAttemptId);
+      long mapTaskId = mapTaskAttemptID.getTaskID().getId();
+
+      LOG.info("FetchAllRssTaskIds, pathComponent: {}, mapTaskId:{}, rssTaskId:{}, is contains:{}",
+              pathComponent, mapTaskId, rssTaskId, mapTaskIdBitmap.contains(mapTaskId));
+      if (!mapTaskIdBitmap.contains(mapTaskId)) {
+        rssTaskIdBitmap.addLong(rssTaskId);
+        mapTaskIdBitmap.addLong(mapTaskId);
+        if (mapTaskId >= totalMapsCount) { // up-stream map task index should < total task number(including failed task)
+          LOG.warn(inputAttemptIdentifier + " has overflowed mapIndex, pathComponent: " + pathComponent
+              + ",totalMapsCount: " + totalMapsCount);
         }
       } else {
-        LOG.warn(inputAttemptIdentifier + " is redundant on index: " + mapIndex);
+        LOG.warn(inputAttemptIdentifier + " is redundant on index: " + mapTaskId);
       }
     }
     // each map should have only one success attempt
-    if (mapIndexBitmap.getLongCardinality() != taskIdBitmap.getLongCardinality()) {
+    if (mapTaskIdBitmap.getLongCardinality() != rssTaskIdBitmap.getLongCardinality()) {
       throw new IllegalStateException(errMsg);
     }
-    return taskIdBitmap;
+    return rssTaskIdBitmap;
   }
 
   public static int taskIdStrToTaskId(String taskIdStr) {
@@ -375,30 +292,35 @@ public class RssTezUtils {
   }
 
   // multiHostInfo is like:
-  // 0_172.19.193.152:19999,1_172.19.193.152:19999
+  // 172.19.193.247:19999+1_4_7, 172.19.193.55:19999+2_5, 172.19.193.152:19999+0_3_6
   private static void parseRssWorkerFromHostInfo(Map<Integer, Set<ShuffleServerInfo>> rssWorker, String multiHostInfo) {
     for (String hostInfo : multiHostInfo.split(",")) {
-      LOG.info("ParseRssWorker, hostInfo:{}", hostInfo);
-      String[] info = hostInfo.split("_|:");
-      int partitionId = Integer.parseInt(info[0]);
-      ShuffleServerInfo serverInfo = new ShuffleServerInfo(info[1], Integer.parseInt(info[2]));
-      rssWorker.computeIfAbsent(partitionId, k -> new HashSet<>());
-      rssWorker.get(partitionId).add(serverInfo);
-      LOG.info("Parse Rss Worker, add partition:{}, serverInfo:{}", partitionId, serverInfo);
+      // LOG.info("ParseRssWorker, hostInfo:{}", hostInfo);
+      String[] info = hostInfo.split("\\+");
+      ShuffleServerInfo serverInfo = new ShuffleServerInfo(info[0].split(":")[0],
+          Integer.parseInt(info[0].split(":")[1]));
+
+      String[] partitions = info[1].split("_");
+      assert (partitions.length > 0);
+      for (String partitionId: partitions) {
+        rssWorker.computeIfAbsent(Integer.parseInt(partitionId), k -> new HashSet<>());
+        rssWorker.get(Integer.parseInt(partitionId)).add(serverInfo);
+      }
     }
   }
 
   // hostnameInfo is like:
-  // Map 1=0_172.19.193.152:19999,0_172.19.193.152:19999;Map 2=0_172.19.193.152:19999,1_17
-  public static void parseRssWorker(Map<Integer, Set<ShuffleServerInfo>> rssWorker, int shuffleId,
-                                    String hostnameInfo) {
-    LOG.info("ParseRssWorker, hostnameInfo:{}", hostnameInfo);
+  // 172.19.193.247:19999+1_4_7, 172.19.193.55:19999+2_5,172.19.193.152:19999+0_3_6
+  public static void parseRssWorker(
+          Map<Integer, Set<ShuffleServerInfo>> rssWorker,
+          int shuffleId,
+          String hostnameInfo) {
+    LOG.info("ParseRssWorker, hostnameInfo length:{}", hostnameInfo.length());
     for (String toVertex: hostnameInfo.split(";")) {
-      LOG.info("ParseRssWorker, hostnameInffdafdso:{}", toVertex);
+      // toVertex is like: 1001602=172.19.193.247:19999+1_4_7,172.19.193.55:19999+2_5,172.19.193.152:19999+0_3_6
       String[] splits = toVertex.split("=");
       if (splits.length == 2 && String.valueOf(shuffleId).equals(splits[0])) {
-        String workerStr = toVertex.split("=")[1];
-        LOG.info("ParseRssWorker, workerStr:{}", workerStr);
+        String workerStr = splits[1];
         parseRssWorkerFromHostInfo(rssWorker, workerStr);
       }
     }
diff --git a/client-tez/src/main/java/org/apache/tez/common/TezRemoteShuffleUmbilicalProtocol.java b/client-tez/src/main/java/org/apache/tez/common/TezRemoteShuffleUmbilicalProtocol.java
new file mode 100644
index 00000000..1bbf0c32
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/TezRemoteShuffleUmbilicalProtocol.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.common.security.JobTokenSelector;
+
+
+@TokenInfo(JobTokenSelector.class)
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+
+// ProtocolInfo will be required once we move to Hadoop PB RPC
+//@ProtocolInfo(protocolName = "TezRemoteShuffleUmbilicalProtocol", protocolVersion = 1)
+public interface TezRemoteShuffleUmbilicalProtocol  extends VersionedProtocol {
+  @SuppressWarnings("checkstyle:ConstantName")
+  long versionID = 31L;
+
+  GetShuffleServerResponse getShuffleAssignments(GetShuffleServerRequest request)
+          throws IOException, TezException;
+}
diff --git a/client-tez/src/test/java/org/apache/tez/common/IdUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/IdUtilsTest.java
new file mode 100644
index 00000000..83467695
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/common/IdUtilsTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class IdUtilsTest {
+
+  @Test
+  public void testConvertTezTaskAttemptID() {
+    ApplicationId appId = ApplicationId.newInstance(1681717153064L, 2768836);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 3);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 2);
+    TezTaskID taskId = TezTaskID.getInstance(vId, 1);
+    TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+
+    String testId = "attempt_1681717153064_2768836_3_02_000001_0_10006";
+    assertEquals(tezTaskAttemptId, IdUtils.convertTezTaskAttemptID(testId));
+  }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index 5344e543..abf03be2 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.tez.common;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.records.TezDAGID;
@@ -25,16 +31,16 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RssTezUtilsTest {
 
-  private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
-
   @Test
   public void baskAttemptIdTest() {
     long taskAttemptId = 0x1000ad12;
@@ -158,4 +164,32 @@ public class RssTezUtilsTest {
     TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.fromString(uniqueIdentifierToAttemptId);
     assertEquals(originalTezTaskAttemptID, tezTaskAttemptID);
   }
+
+  @Test
+  public void testParseRssWorker() {
+    Map<Integer, Set<ShuffleServerInfo>> rssWorker = new HashMap<>();
+    int shuffleId = 1001602;
+    // 0_1_2_3 is consist of partition id.
+    String hostnameInfo = "localhost;1001602=172.19.193.152:19999+0_1_2_3,172.19.193.153:19999+2_3_4_5";
+    RssTezUtils.parseRssWorker(rssWorker, shuffleId, hostnameInfo);
+
+    assertEquals(6, rssWorker.size());
+
+    int partitionId = 0;
+    Set<ShuffleServerInfo> shuffleServerInfo = rssWorker.get(partitionId);
+    ShuffleServerInfo server = new ShuffleServerInfo("172.19.193.152", 19999);
+    assertEquals(ImmutableSet.of(server), shuffleServerInfo);
+
+    partitionId = 3;
+    shuffleServerInfo = rssWorker.get(partitionId);
+    ShuffleServerInfo server2 = new ShuffleServerInfo("172.19.193.153", 19999);
+    assertEquals(ImmutableSet.of(server, server2), shuffleServerInfo);
+
+    partitionId = 18;
+    shuffleServerInfo = rssWorker.get(partitionId);
+    assertNull(shuffleServerInfo);
+
+    Integer[] expectPartitionArr = new Integer[]{0, 1, 2, 3, 4, 5};
+    assertTrue(Arrays.equals(expectPartitionArr, rssWorker.keySet().toArray(new Integer[0])));
+  }
 }