You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/01 11:17:53 UTC
[incubator-uniffle] branch master updated: [#872][FOLLOWUP] feat(tez): Modify utils and add test case (#916)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4979ca63 [#872][FOLLOWUP] feat(tez): Modify utils and add test case (#916)
4979ca63 is described below
commit 4979ca633d25c67cd1ca7067995fa373d0017c87
Author: Qing <11...@qq.com>
AuthorDate: Thu Jun 1 19:17:46 2023 +0800
[#872][FOLLOWUP] feat(tez): Modify utils and add test case (#916)
### What changes were proposed in this pull request?
modify utils and add test case
### Why are the changes needed?
Fix: #872
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../main/java/org/apache/tez/common/IdUtils.java | 61 ++++++++
.../java/org/apache/tez/common/RssTezUtils.java | 164 ++++++---------------
.../common/TezRemoteShuffleUmbilicalProtocol.java | 42 ++++++
.../java/org/apache/tez/common/IdUtilsTest.java | 42 ++++++
.../org/apache/tez/common/RssTezUtilsTest.java | 38 ++++-
5 files changed, 224 insertions(+), 123 deletions(-)
diff --git a/client-tez/src/main/java/org/apache/tez/common/IdUtils.java b/client-tez/src/main/java/org/apache/tez/common/IdUtils.java
new file mode 100644
index 00000000..f8d35932
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/IdUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IdUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(IdUtils.class);
+
+ private IdUtils() {
+ }
+
+ /**
+ *
+ * @param pathComponent, like: attempt_1681717153064_2768836_2_00_000000_0_10006
+ * @return remove last 6 char, return TezTaskAttemptID
+ */
+ public static TezTaskAttemptID convertTezTaskAttemptID(String pathComponent) {
+ LOG.info("convertTezTaskAttemptID, pathComponent:{}", pathComponent);
+ return TezTaskAttemptID.fromString(pathComponent.substring(0, pathComponent.length() - 6));
+
+ }
+
+
+ /**
+ * @return ApplicationAttemptId, eg: appattempt_1681717153064_2719964_000001
+ */
+ public static ApplicationAttemptId getApplicationAttemptId() {
+ String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ ContainerId containerId = ContainerId.fromString(containerIdStr);
+ return containerId.getApplicationAttemptId();
+ }
+
+ /**
+ * Get application id attempt.
+ * @return ApplicationAttemptId attempt id, eg: 1, 2, 3
+ */
+ public static int getAppAttemptId() {
+ return getApplicationAttemptId().getAttemptId();
+ }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index 5eac59e8..ed4df3e8 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -17,21 +17,14 @@
package org.apache.tez.common;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -53,11 +46,7 @@ public class RssTezUtils {
public static final String HOST_NAME = "hostname";
- public static final String PLUS_DELIMITER = "+";
public static final String UNDERLINE_DELIMITER = "_";
- public static final String COLON_DELIMITER = ":";
- public static final String COMMA_DELIMITER = ",";
-
// constant to compute shuffle id
private static final int VERTEX_ID_MAPPING_MAX_ID = 500;
private static final String VERTEX_ID_MAPPING_MAP = "Map";
@@ -114,58 +103,6 @@ public class RssTezUtils {
return reqBytes;
}
- public static String uniformPartitionHostInfo(Map<Integer, List<ShuffleServerInfo>> map) {
- List<String> res = new ArrayList<>();
- String tmp;
- for (Map.Entry<Integer, List<ShuffleServerInfo>> entry : map.entrySet()) {
- Integer partitionId = entry.getKey();
- List<ShuffleServerInfo> shuffleServerInfos = entry.getValue();
- for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfos) {
- tmp = partitionId + UNDERLINE_DELIMITER + shuffleServerInfo.getHost() + COLON_DELIMITER
- + shuffleServerInfo.getNettyPort();
- res.add(tmp);
- }
- }
- return StringUtils.join(res, COMMA_DELIMITER);
- }
-
- public static Map<String, List<String>> uniformServerToPartitions(String partitionToServers) {
- Map<String, List<String>> serverToPartitions = new HashMap<>();
- List<String> list;
-
- String[] pidWithWorkerInfos = partitionToServers.split(COMMA_DELIMITER);
- for (String pidWithWorkerInfo : pidWithWorkerInfos) {
- String[] pidUnderLineWorkerInfo = pidWithWorkerInfo.split(UNDERLINE_DELIMITER);
- if (serverToPartitions.containsKey(pidUnderLineWorkerInfo[1])) {
- list = serverToPartitions.get(pidUnderLineWorkerInfo[1]);
- list.add(pidUnderLineWorkerInfo[0]);
- } else {
- list = new ArrayList<>();
- list.add(pidUnderLineWorkerInfo[0]);
- serverToPartitions.put(pidUnderLineWorkerInfo[1], list);
- }
- }
-
- return serverToPartitions;
- }
-
- public static String uniformServerToPartitions(Map<String, List<String>> map) {
- List<String> res = new ArrayList<>();
- for (Map.Entry<String, List<String>> entry : map.entrySet()) {
- String server = entry.getKey();
- List<String> partitions = entry.getValue();
- String join = StringUtils.join(partitions, UNDERLINE_DELIMITER);
- res.add(server + PLUS_DELIMITER + join);
- }
- return StringUtils.join(res, COMMA_DELIMITER);
- }
-
- public static ApplicationAttemptId getApplicationAttemptId() {
- String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
- ContainerId containerId = ContainerId.fromString(containerIdStr);
- return containerId.getApplicationAttemptId();
- }
-
public static String uniqueIdentifierToAttemptId(String uniqueIdentifier) {
if (uniqueIdentifier == null) {
throw new RssException("uniqueIdentifier should not be null");
@@ -175,6 +112,7 @@ public class RssTezUtils {
}
public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
+ LOG.info("GetBlockId, partitionId:{}, taskAttemptId:{}, nextSeqNo:{}", partitionId, taskAttemptId, nextSeqNo);
long attemptId = taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
throw new RssException("Can't support attemptId [" + attemptId
@@ -191,6 +129,7 @@ public class RssTezUtils {
}
long taskId = taskAttemptId - (attemptId
<< (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+
if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
throw new RssException("Can't support taskId["
+ taskId + "], the max value should be " + Constants.MAX_TASK_ATTEMPT_ID);
@@ -240,14 +179,6 @@ public class RssTezUtils {
return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
}
- // compute shuffle id using InputContext
- public static int computeShuffleId(InputContext inputContext) {
- int dagIdentifier = inputContext.getDagIdentifier();
- String sourceVertexName = inputContext.getSourceVertexName();
- String taskVertexName = inputContext.getTaskVertexName();
- return RssTezUtils.computeShuffleId(dagIdentifier, sourceVertexName, taskVertexName);
- }
-
/**
*
* @param tezDagID Get from tez InputContext, represent dag id.
@@ -305,57 +236,43 @@ public class RssTezUtils {
throw new RssException("TaskAttempt " + taskAttemptID + " high bytes " + highBytes
+ " exceed, appAttemptId:" + appAttemptId);
}
-
long id = (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH)) + lowBytes;
- LOG.info("ConvertTaskAttemptIdToLong id is {}", id);
- LOG.info("LowBytes id is {}", lowBytes);
- LOG.info("HighBytes id is {}", highBytes);
+ LOG.info("ConvertTaskAttemptIdToLong taskAttemptID:{}, id is {}, .", taskAttemptID, id);
return id;
}
- public static Roaring64NavigableMap fetchAllRssTaskIds(
- Set<InputAttemptIdentifier> successMapTaskAttempts,
- Integer totalMapsCount,
- Integer appAttemptId) {
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap mapIndexBitmap = Roaring64NavigableMap.bitmapOf();
+ public static Roaring64NavigableMap fetchAllRssTaskIds(Set<InputAttemptIdentifier> successMapTaskAttempts,
+ int totalMapsCount, int appAttemptId) {
String errMsg = "TaskAttemptIDs are inconsistent with map tasks";
- LOG.info("FetchAllRssTaskIds successMapTaskAttempts:{}", successMapTaskAttempts);
+ Roaring64NavigableMap rssTaskIdBitmap = Roaring64NavigableMap.bitmapOf();
+ Roaring64NavigableMap mapTaskIdBitmap = Roaring64NavigableMap.bitmapOf();
+ LOG.info("FetchAllRssTaskIds successMapTaskAttempts size:{}", successMapTaskAttempts.size());
LOG.info("FetchAllRssTaskIds totalMapsCount:{}, appAttemptId:{}", totalMapsCount, appAttemptId);
for (InputAttemptIdentifier inputAttemptIdentifier: successMapTaskAttempts) {
- int mapIndex = inputAttemptIdentifier.getInputIdentifier();
String pathComponent = inputAttemptIdentifier.getPathComponent();
- int taskId = RssTezUtils.taskIdStrToTaskId(pathComponent);
- // There can be multiple successful attempts on same map task.
- // So we only need to accept one of them.
- LOG.info("FetchAllRssTaskIds, taskId:{}, is contains:{}", taskId, mapIndexBitmap.contains(taskId));
- if (!mapIndexBitmap.contains(taskId)) {
- taskIdBitmap.addLong(taskId);
-
- LOG.info("FetchAllRssTaskIds, successMapTaskAttempts:{}, totalMapsCount:{}, appAttemptId:{}, mapIndex:{}, "
- + "totalMapsCount:{}, taskId: {} ",
- successMapTaskAttempts.size(), totalMapsCount, appAttemptId, mapIndex, totalMapsCount, taskId);
-
- if (mapIndex < totalMapsCount) { // up-stream map task index should < total task number(including failed task)
- mapIndexBitmap.addLong(taskId);
- } else {
-
- LOG.error("SuccessMapTaskAttempts:{}, totalMapsCount:{}, appAttemptId:{}, mapIndex:{}, totalMapsCount:{} ",
- successMapTaskAttempts, totalMapsCount, appAttemptId, mapIndex, totalMapsCount);
-
- LOG.error(inputAttemptIdentifier + " has overflowed mapIndex");
- throw new IllegalStateException(errMsg);
+ TezTaskAttemptID mapTaskAttemptID = IdUtils.convertTezTaskAttemptID(pathComponent);
+ long rssTaskId = RssTezUtils.convertTaskAttemptIdToLong(mapTaskAttemptID, appAttemptId);
+ long mapTaskId = mapTaskAttemptID.getTaskID().getId();
+
+ LOG.info("FetchAllRssTaskIds, pathComponent: {}, mapTaskId:{}, rssTaskId:{}, is contains:{}",
+ pathComponent, mapTaskId, rssTaskId, mapTaskIdBitmap.contains(mapTaskId));
+ if (!mapTaskIdBitmap.contains(mapTaskId)) {
+ rssTaskIdBitmap.addLong(rssTaskId);
+ mapTaskIdBitmap.addLong(mapTaskId);
+ if (mapTaskId >= totalMapsCount) { // up-stream map task index should < total task number(including failed task)
+ LOG.warn(inputAttemptIdentifier + " has overflowed mapIndex, pathComponent: " + pathComponent
+ + ",totalMapsCount: " + totalMapsCount);
}
} else {
- LOG.warn(inputAttemptIdentifier + " is redundant on index: " + mapIndex);
+ LOG.warn(inputAttemptIdentifier + " is redundant on index: " + mapTaskId);
}
}
// each map should have only one success attempt
- if (mapIndexBitmap.getLongCardinality() != taskIdBitmap.getLongCardinality()) {
+ if (mapTaskIdBitmap.getLongCardinality() != rssTaskIdBitmap.getLongCardinality()) {
throw new IllegalStateException(errMsg);
}
- return taskIdBitmap;
+ return rssTaskIdBitmap;
}
public static int taskIdStrToTaskId(String taskIdStr) {
@@ -375,30 +292,35 @@ public class RssTezUtils {
}
// multiHostInfo is like:
- // 0_172.19.193.152:19999,1_172.19.193.152:19999
+ // 172.19.193.247:19999+1_4_7, 172.19.193.55:19999+2_5, 172.19.193.152:19999+0_3_6
private static void parseRssWorkerFromHostInfo(Map<Integer, Set<ShuffleServerInfo>> rssWorker, String multiHostInfo) {
for (String hostInfo : multiHostInfo.split(",")) {
- LOG.info("ParseRssWorker, hostInfo:{}", hostInfo);
- String[] info = hostInfo.split("_|:");
- int partitionId = Integer.parseInt(info[0]);
- ShuffleServerInfo serverInfo = new ShuffleServerInfo(info[1], Integer.parseInt(info[2]));
- rssWorker.computeIfAbsent(partitionId, k -> new HashSet<>());
- rssWorker.get(partitionId).add(serverInfo);
- LOG.info("Parse Rss Worker, add partition:{}, serverInfo:{}", partitionId, serverInfo);
+ // LOG.info("ParseRssWorker, hostInfo:{}", hostInfo);
+ String[] info = hostInfo.split("\\+");
+ ShuffleServerInfo serverInfo = new ShuffleServerInfo(info[0].split(":")[0],
+ Integer.parseInt(info[0].split(":")[1]));
+
+ String[] partitions = info[1].split("_");
+ assert (partitions.length > 0);
+ for (String partitionId: partitions) {
+ rssWorker.computeIfAbsent(Integer.parseInt(partitionId), k -> new HashSet<>());
+ rssWorker.get(Integer.parseInt(partitionId)).add(serverInfo);
+ }
}
}
// hostnameInfo is like:
- // Map 1=0_172.19.193.152:19999,0_172.19.193.152:19999;Map 2=0_172.19.193.152:19999,1_17
- public static void parseRssWorker(Map<Integer, Set<ShuffleServerInfo>> rssWorker, int shuffleId,
- String hostnameInfo) {
- LOG.info("ParseRssWorker, hostnameInfo:{}", hostnameInfo);
+ // 172.19.193.247:19999+1_4_7, 172.19.193.55:19999+2_5,172.19.193.152:19999+0_3_6
+ public static void parseRssWorker(
+ Map<Integer, Set<ShuffleServerInfo>> rssWorker,
+ int shuffleId,
+ String hostnameInfo) {
+ LOG.info("ParseRssWorker, hostnameInfo length:{}", hostnameInfo.length());
for (String toVertex: hostnameInfo.split(";")) {
- LOG.info("ParseRssWorker, hostnameInffdafdso:{}", toVertex);
+ // toVertex is like: 1001602=172.19.193.247:19999+1_4_7,172.19.193.55:19999+2_5,172.19.193.152:19999+0_3_6
String[] splits = toVertex.split("=");
if (splits.length == 2 && String.valueOf(shuffleId).equals(splits[0])) {
- String workerStr = toVertex.split("=")[1];
- LOG.info("ParseRssWorker, workerStr:{}", workerStr);
+ String workerStr = splits[1];
parseRssWorkerFromHostInfo(rssWorker, workerStr);
}
}
diff --git a/client-tez/src/main/java/org/apache/tez/common/TezRemoteShuffleUmbilicalProtocol.java b/client-tez/src/main/java/org/apache/tez/common/TezRemoteShuffleUmbilicalProtocol.java
new file mode 100644
index 00000000..1bbf0c32
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/TezRemoteShuffleUmbilicalProtocol.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.common.security.JobTokenSelector;
+
+
+@TokenInfo(JobTokenSelector.class)
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+
+// ProtocolInfo will be required once we move to Hadoop PB RPC
+//@ProtocolInfo(protocolName = "TezRemoteShuffleUmbilicalProtocol", protocolVersion = 1)
+public interface TezRemoteShuffleUmbilicalProtocol extends VersionedProtocol {
+ @SuppressWarnings("checkstyle:ConstantName")
+ long versionID = 31L;
+
+ GetShuffleServerResponse getShuffleAssignments(GetShuffleServerRequest request)
+ throws IOException, TezException;
+}
diff --git a/client-tez/src/test/java/org/apache/tez/common/IdUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/IdUtilsTest.java
new file mode 100644
index 00000000..83467695
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/common/IdUtilsTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class IdUtilsTest {
+
+ @Test
+ public void testConvertTezTaskAttemptID() {
+ ApplicationId appId = ApplicationId.newInstance(1681717153064L, 2768836);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 3);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 2);
+ TezTaskID taskId = TezTaskID.getInstance(vId, 1);
+ TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+
+ String testId = "attempt_1681717153064_2768836_3_02_000001_0_10006";
+ assertEquals(tezTaskAttemptId, IdUtils.convertTezTaskAttemptID(testId));
+ }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index 5344e543..abf03be2 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -17,6 +17,12 @@
package org.apache.tez.common;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.records.TezDAGID;
@@ -25,16 +31,16 @@ import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RssTezUtilsTest {
- private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
-
@Test
public void baskAttemptIdTest() {
long taskAttemptId = 0x1000ad12;
@@ -158,4 +164,32 @@ public class RssTezUtilsTest {
TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.fromString(uniqueIdentifierToAttemptId);
assertEquals(originalTezTaskAttemptID, tezTaskAttemptID);
}
+
+ @Test
+ public void testParseRssWorker() {
+ Map<Integer, Set<ShuffleServerInfo>> rssWorker = new HashMap<>();
+ int shuffleId = 1001602;
+ // 0_1_2_3 is consist of partition id.
+ String hostnameInfo = "localhost;1001602=172.19.193.152:19999+0_1_2_3,172.19.193.153:19999+2_3_4_5";
+ RssTezUtils.parseRssWorker(rssWorker, shuffleId, hostnameInfo);
+
+ assertEquals(6, rssWorker.size());
+
+ int partitionId = 0;
+ Set<ShuffleServerInfo> shuffleServerInfo = rssWorker.get(partitionId);
+ ShuffleServerInfo server = new ShuffleServerInfo("172.19.193.152", 19999);
+ assertEquals(ImmutableSet.of(server), shuffleServerInfo);
+
+ partitionId = 3;
+ shuffleServerInfo = rssWorker.get(partitionId);
+ ShuffleServerInfo server2 = new ShuffleServerInfo("172.19.193.153", 19999);
+ assertEquals(ImmutableSet.of(server, server2), shuffleServerInfo);
+
+ partitionId = 18;
+ shuffleServerInfo = rssWorker.get(partitionId);
+ assertNull(shuffleServerInfo);
+
+ Integer[] expectPartitionArr = new Integer[]{0, 1, 2, 3, 4, 5};
+ assertTrue(Arrays.equals(expectPartitionArr, rssWorker.keySet().toArray(new Integer[0])));
+ }
}