You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/19 08:23:17 UTC

[incubator-uniffle] branch master updated: [#872] feat(tez): Add the common and utils class (#890)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 19f95668 [#872] feat(tez): Add the common and utils class  (#890)
19f95668 is described below

commit 19f9566817cd8b9e7c110a9080328a176206a49a
Author: Qing <11...@qq.com>
AuthorDate: Fri May 19 16:23:11 2023 +0800

    [#872] feat(tez): Add the common and utils class  (#890)
    
    ### What changes were proposed in this pull request?
    
    Tez Shuffle Read supporting Remote Shuffle related common and utils
    
    
    ### Why are the changes needed?
    
    Fix:  https://github.com/apache/incubator-uniffle/issues/872
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT.
---
 client-tez/pom.xml                                 |  98 ++---
 .../java/org/apache/tez/common/RssTezConfig.java   | 172 +++++++++
 .../java/org/apache/tez/common/RssTezUtils.java    | 404 +++++++++++++++++++++
 .../java/org/apache/tez/common/TezIdHelper.java    |  29 ++
 .../org/apache/tez/common/RssTezUtilsTest.java     | 160 ++++++++
 .../org/apache/tez/common/TezIdHelperTest.java     |  33 ++
 pom.xml                                            |  62 +++-
 7 files changed, 909 insertions(+), 49 deletions(-)

diff --git a/client-tez/pom.xml b/client-tez/pom.xml
index fcc8931b..b3a19054 100644
--- a/client-tez/pom.xml
+++ b/client-tez/pom.xml
@@ -15,6 +15,7 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
+
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -33,52 +34,65 @@
     <name>Apache Uniffle Client (Tez)</name>
 
     <dependencies>
-        <dependency>
-            <groupId>org.apache.uniffle</groupId>
-            <artifactId>rss-client</artifactId>
-        </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-common</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-runtime-library</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-runtime-internals</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-dag</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-api</artifactId>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.uniffle</groupId>
+        <artifactId>rss-client</artifactId>
+      </dependency>
 
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>net.jpountz.lz4</groupId>
-            <artifactId>lz4</artifactId>
-        </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>net.jpountz.lz4</groupId>
+        <artifactId>lz4</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>com.google.protobuf</groupId>
+        <artifactId>protobuf-java</artifactId>
+      </dependency>
 
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-netty-shaded</artifactId>
-            <version>${grpc.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-protobuf</artifactId>
-        </dependency>
-        <dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-common</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-mapreduce-client-core</artifactId>
+        <exclusions>
+          <exclusion>
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.github.luben</groupId>
-            <artifactId>zstd-jni</artifactId>
-        </dependency>
+          </exclusion>
+        </exclusions>
+      </dependency>
     </dependencies>
 
     <build>
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
new file mode 100644
index 00000000..f544773f
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.config.RssConf;
+
+public class RssTezConfig {
+
+  public static final String TEZ_RSS_CONFIG_PREFIX = "tez.";
+  public static final String RSS_CLIENT_HEARTBEAT_THREAD_NUM =
+          TEZ_RSS_CONFIG_PREFIX + "rss.client.heartBeat.threadNum";
+  public static final int RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE = 4;
+  public static final String RSS_CLIENT_TYPE = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_TYPE;
+  public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE;
+  public static final String RSS_CLIENT_RETRY_MAX = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX;
+  public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE;
+  public static final String RSS_CLIENT_RETRY_INTERVAL_MAX =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX;
+  public static final long RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE =
+          RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE;
+  public static final String RSS_COORDINATOR_QUORUM =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM;
+  public static final String RSS_DATA_REPLICA = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA;
+  public static final int RSS_DATA_REPLICA_DEFAULT_VALUE = RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE;
+  public static final String RSS_DATA_REPLICA_WRITE =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE;
+  public static final int RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE =
+          RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE;
+  public static final String RSS_DATA_REPLICA_READ =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ;
+  public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE =
+          RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE;
+  public static final String RSS_DATA_REPLICA_SKIP_ENABLED =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
+  public static final String RSS_DATA_TRANSFER_POOL_SIZE =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
+  public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
+          RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+  public static final String RSS_DATA_COMMIT_POOL_SIZE =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
+  public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
+          RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;
+
+  public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
+          RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
+  public static final String RSS_HEARTBEAT_INTERVAL =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL;
+  public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE =
+          RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE;
+  public static final String RSS_HEARTBEAT_TIMEOUT =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT;
+  public static final long RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE =
+          RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE;
+
+  // output:
+  public static final String RSS_RUNTIME_IO_SORT_MB = TEZ_RSS_CONFIG_PREFIX + "rss.runtime.io.sort.mb";
+  public static final int RSS_DEFAULT_RUNTIME_IO_SORT_MB = 100;
+  public static final String RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD = TEZ_RSS_CONFIG_PREFIX
+          + "rss.client.sort.memory.use.threshold";
+  public static final double RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD = 0.9f;
+  public static final String RSS_CLIENT_MAX_BUFFER_SIZE = TEZ_RSS_CONFIG_PREFIX + "rss.client.max.buffer.size";
+  public static final long RSS_CLIENT_DEFAULT_MAX_BUFFER_SIZE = 3 * 1024;
+  public static final String RSS_WRITER_BUFFER_SIZE = TEZ_RSS_CONFIG_PREFIX + "rss.writer.buffer.size";
+  public static final long RSS_DEFAULT_WRITER_BUFFER_SIZE = 1024 * 1024 * 14;
+  public static final String RSS_CLIENT_MEMORY_THRESHOLD = TEZ_RSS_CONFIG_PREFIX + "rss.client.memory.threshold";
+  public static final double RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD = 0.8f;
+  public static final String RSS_CLIENT_SEND_THRESHOLD = TEZ_RSS_CONFIG_PREFIX + "rss.client.send.threshold";
+  public static final double RSS_CLIENT_DEFAULT_SEND_THRESHOLD = 0.2f;
+  public static final String RSS_CLIENT_BATCH_TRIGGER_NUM = TEZ_RSS_CONFIG_PREFIX + "rss.client.batch.trigger.num";
+  public static final int RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM = 50;
+  public static final String RSS_DEFAULT_STORAGE_TYPE = "MEMORY";
+  public static final String RSS_CLIENT_SEND_CHECK_INTERVAL_MS = TEZ_RSS_CONFIG_PREFIX
+          + "rss.client.send.check.interval.ms";
+  public static final long RSS_CLIENT_DEFAULT_SEND_CHECK_INTERVAL_MS = 500L;
+  public static final String RSS_CLIENT_SEND_CHECK_TIMEOUT_MS = TEZ_RSS_CONFIG_PREFIX
+          + "rss.client.send.check.timeout.ms";
+  public static final long RSS_CLIENT_DEFAULT_SEND_CHECK_TIMEOUT_MS = 60 * 1000 * 10L;
+  public static final String RSS_CLIENT_BITMAP_NUM = TEZ_RSS_CONFIG_PREFIX + "rss.client.bitmap.num";
+  public static final int RSS_CLIENT_DEFAULT_BITMAP_NUM = 1;
+  public static final String HIVE_TEZ_LOG_LEVEL = "hive.tez.log.level";
+  public static final String DEFAULT_HIVE_TEZ_LOG_LEVEL = "INFO";
+  public static final String DEBUG_HIVE_TEZ_LOG_LEVEL = "debug";
+
+  public static final String RSS_STORAGE_TYPE = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE;
+
+  public static final String RSS_DYNAMIC_CLIENT_CONF_ENABLED =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED;
+  public static final boolean RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE =
+          RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE;
+
+  public static final String RSS_CLIENT_ASSIGNMENT_TAGS =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS;
+
+  public static final String RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER =
+          RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
+  public static final int RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE =
+          RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE;
+
+  public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL;
+  public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE =
+          RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE;
+  public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
+  public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE =
+          RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE;
+
+  public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED;
+  public static final boolean RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE =
+          RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE;
+
+  public static final String RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR;
+
+  public static final double RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE
+          = RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE;
+
+  public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER =
+          TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER;
+  public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
+          RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;
+
+  public static final String RSS_CONF_FILE = "rss_conf.xml";
+
+  public static final String RSS_REMOTE_STORAGE_PATH =
+      TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
+
+  // Whether enable test mode for the MR Client
+  public static final String RSS_TEST_MODE_ENABLE = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE;
+
+  public static final String RSS_AM_SHUFFLE_MANAGER_ADDRESS = TEZ_RSS_CONFIG_PREFIX + "rss.am.shuffle.manager.address";
+  public static final String RSS_AM_SHUFFLE_MANAGER_PORT = TEZ_RSS_CONFIG_PREFIX + "rss.am.shuffle.manager.port";
+  public static final String RSS_AM_SHUFFLE_MANAGER_DEBUG = TEZ_RSS_CONFIG_PREFIX + "rss.am.shuffle.manager.debug";
+  public static final String RSS_AM_SLOW_START_ENABLE = TEZ_RSS_CONFIG_PREFIX + "rss.am.slow.start.enable";
+  public static final Boolean RSS_AM_SLOW_START_ENABLE_DEFAULT = false;
+
+  public static final String RSS_REDUCE_INITIAL_MEMORY = TEZ_RSS_CONFIG_PREFIX + "rss.reduce.initial.memory";
+
+  public static RssConf toRssConf(Configuration jobConf) {
+    RssConf rssConf = new RssConf();
+    for (Map.Entry<String, String> entry : jobConf) {
+      String key = entry.getKey();
+      if (!key.startsWith(TEZ_RSS_CONFIG_PREFIX)) {
+        continue;
+      }
+      key = key.substring(TEZ_RSS_CONFIG_PREFIX.length());
+      rssConf.setString(key, entry.getValue());
+    }
+    return rssConf;
+  }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
new file mode 100644
index 00000000..694147da
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
+
+public class RssTezUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
+
+  private static final int MAX_ATTEMPT_LENGTH = 6;
+  private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+
+  public static final String HOST_NAME = "hostname";
+
+  public static final String PLUS_DELIMITER = "+";
+  public static final String UNDERLINE_DELIMITER = "_";
+  public static final String COLON_DELIMITER = ":";
+  public static final String COMMA_DELIMITER = ",";
+
+  // constant to compute shuffle id
+  private static final int VERTEX_ID_MAPPING_MAX_ID = 500;
+  private static final String VERTEX_ID_MAPPING_MAP = "Map";
+  private static final String VERTEX_ID_MAPPING_REDUCER = "Reducer";
+  private static final int VERTEX_ID_MAPPING_MAGIC = 600;
+  private static final int SHUFFLE_ID_MAGIC = 1000;
+
+
+
+  private RssTezUtils() {
+  }
+
+  public static ShuffleWriteClient createShuffleClient(Configuration conf) {
+    int heartBeatThreadNum = conf.getInt(RssTezConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
+        RssTezConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+    int retryMax = conf.getInt(RssTezConfig.RSS_CLIENT_RETRY_MAX,
+        RssTezConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+    long retryIntervalMax = conf.getLong(RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+        RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
+    String clientType = conf.get(RssTezConfig.RSS_CLIENT_TYPE,
+        RssTezConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
+    int replicaWrite = conf.getInt(RssTezConfig.RSS_DATA_REPLICA_WRITE,
+        RssTezConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
+    int replicaRead = conf.getInt(RssTezConfig.RSS_DATA_REPLICA_READ,
+        RssTezConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+    int replica = conf.getInt(RssTezConfig.RSS_DATA_REPLICA,
+        RssTezConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
+    boolean replicaSkipEnabled = conf.getBoolean(RssTezConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
+        RssTezConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
+    int dataTransferPoolSize = conf.getInt(RssTezConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+        RssTezConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+    int dataCommitPoolSize = conf.getInt(RssTezConfig.RSS_DATA_COMMIT_POOL_SIZE,
+        RssTezConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
+    ShuffleWriteClient client = ShuffleClientFactory
+        .getInstance()
+        .createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
+            heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled,
+            dataTransferPoolSize, dataCommitPoolSize);
+    return client;
+  }
+
+  public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+    long initialMemRequestMb = conf.getLong(RssTezConfig.RSS_RUNTIME_IO_SORT_MB,
+        RssTezConfig.RSS_DEFAULT_RUNTIME_IO_SORT_MB);
+    LOG.info("InitialMemRequestMb is {}", initialMemRequestMb);
+    LOG.info("MaxAvailableTaskMemory is {}", maxAvailableTaskMemory);
+    long reqBytes = initialMemRequestMb << 20;
+    Preconditions.checkArgument(initialMemRequestMb > 0 && reqBytes < maxAvailableTaskMemory,
+            RssTezConfig.RSS_RUNTIME_IO_SORT_MB + initialMemRequestMb
+                    + " should be " + "larger than 0 and should be less than the available task memory (MB):"
+                + (maxAvailableTaskMemory >> 20));
+    LOG.info("Requested BufferSize (" + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB
+            + ") : " + initialMemRequestMb);
+    return reqBytes;
+  }
+
+  public static String uniformPartitionHostInfo(Map<Integer, List<ShuffleServerInfo>> map) {
+    List<String> res = new ArrayList<>();
+    String tmp;
+    Set<Integer> pidSet = map.keySet();
+    for (Integer pid : pidSet) {
+      for (ShuffleServerInfo shuffleServerInfo : map.get(pid)) {
+        tmp = pid + UNDERLINE_DELIMITER + shuffleServerInfo.getHost() + COLON_DELIMITER + shuffleServerInfo.getNettyPort();
+        res.add(tmp);
+      }
+    }
+    return org.apache.commons.lang.StringUtils.join(res, COMMA_DELIMITER);
+  }
+
+  public static Map<String, List<String>> uniformServerToPartitions(String partitionToServers) {
+    Map<String, List<String>> serverToPartitions = new HashMap<>();
+    List<String> list;
+
+    String[] pidWithWorkerInfos = partitionToServers.split(COMMA_DELIMITER);
+    for (String pidWithWorkerInfo : pidWithWorkerInfos) {
+      String[] pidUnderLineWorkerInfo = pidWithWorkerInfo.split(UNDERLINE_DELIMITER);
+      if (serverToPartitions.containsKey(pidUnderLineWorkerInfo[1])) {
+        list = serverToPartitions.get(pidUnderLineWorkerInfo[1]);
+        list.add(pidUnderLineWorkerInfo[0]);
+      } else {
+        list = new ArrayList<>();
+        list.add(pidUnderLineWorkerInfo[0]);
+        serverToPartitions.put(pidUnderLineWorkerInfo[1], list);
+      }
+    }
+
+    return serverToPartitions;
+  }
+
+  public static String uniformServerToPartitions(Map<String, List<String>> map) {
+    List<String> res = new ArrayList<>();
+    Set<String> keySet = map.keySet();
+    for (String s : keySet) {
+      String join = org.apache.commons.lang.StringUtils.join(map.get(s), UNDERLINE_DELIMITER);
+      res.add(s + PLUS_DELIMITER + join);
+    }
+
+    return org.apache.commons.lang.StringUtils.join(res,COMMA_DELIMITER);
+  }
+
+  public static ApplicationAttemptId getApplicationAttemptId() {
+    String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+    ContainerId containerId = ContainerId.fromString(containerIdStr);
+    return containerId.getApplicationAttemptId();
+  }
+
+  public static String uniqueIdentifierToAttemptId(String uniqueIdentifier) {
+    if (uniqueIdentifier == null) {
+      throw new RuntimeException("uniqueIdentifier should not be null");
+    }
+    String[] ids = uniqueIdentifier.split("_");
+    return StringUtils.join(ids, "_", 0, 7);
+  }
+
+  public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
+    long attemptId = taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
+    if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
+      throw new RuntimeException("Can't support attemptId [" + attemptId
+          + "], the max value should be " + MAX_ATTEMPT_ID);
+    }
+    long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
+    if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+      throw new RuntimeException("Can't support sequence [" + atomicInt
+          + "], the max value should be " + Constants.MAX_SEQUENCE_NO);
+    }
+    if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
+      throw new RuntimeException("Can't support partitionId["
+          + partitionId + "], the max value should be " + Constants.MAX_PARTITION_ID);
+    }
+    long taskId = taskAttemptId - (attemptId
+        << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+    if (taskId < 0 ||  taskId > Constants.MAX_TASK_ATTEMPT_ID) {
+      throw new RuntimeException("Can't support taskId["
+          + taskId + "], the max value should be " + Constants.MAX_TASK_ATTEMPT_ID);
+    }
+    return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+        + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) + taskId;
+  }
+
+  public static long getTaskAttemptId(long blockId) {
+    long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
+    long attemptId = (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
+        & MAX_ATTEMPT_ID;
+    return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH)) + mapId;
+  }
+
+  public static int estimateTaskConcurrency(Configuration jobConf, int mapNum, int reduceNum) {
+    double dynamicFactor = jobConf.getDouble(RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR,
+        RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE);
+    double slowStart = jobConf.getDouble(Constants.MR_SLOW_START, Constants.MR_SLOW_START_DEFAULT_VALUE);
+    int mapLimit = jobConf.getInt(Constants.MR_MAP_LIMIT, Constants.MR_MAP_LIMIT_DEFAULT_VALUE);
+    int reduceLimit = jobConf.getInt(Constants.MR_REDUCE_LIMIT, Constants.MR_REDUCE_LIMIT_DEFAULT_VALUE);
+
+    int estimateMapNum = mapLimit > 0 ? Math.min(mapNum, mapLimit) : mapNum;
+    int estimateReduceNum = reduceLimit > 0 ? Math.min(reduceNum, reduceLimit) : reduceNum;
+    if (slowStart == 1) {
+      return (int) (Math.max(estimateMapNum, estimateReduceNum) * dynamicFactor);
+    } else {
+      return (int) (((1 - slowStart) * estimateMapNum + estimateReduceNum) * dynamicFactor);
+    }
+  }
+
+  public static int getRequiredShuffleServerNumber(Configuration jobConf, int mapNum, int reduceNum) {
+    int requiredShuffleServerNumber = jobConf.getInt(
+        RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
+        RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
+    );
+    boolean enabledEstimateServer = jobConf.getBoolean(
+        RssTezConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED,
+        RssTezConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE
+    );
+    if (!enabledEstimateServer || requiredShuffleServerNumber > 0) {
+      return requiredShuffleServerNumber;
+    }
+    int taskConcurrency = estimateTaskConcurrency(jobConf, mapNum, reduceNum);
+    int taskConcurrencyPerServer = jobConf.getInt(RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER,
+        RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
+    return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
+  }
+
+  // compute shuffle id using InputContext
+  public static int computeShuffleId(InputContext inputContext) {
+    int dagIdentifier = inputContext.getDagIdentifier();
+    String sourceVertexName = inputContext.getSourceVertexName();
+    String taskVertexName  = inputContext.getTaskVertexName();
+    return RssTezUtils.computeShuffleId(dagIdentifier, sourceVertexName, taskVertexName);
+  }
+
+  /**
+   *
+   * @param tezDagID Get from tez InputContext, represent dag id.
+   * @param upVertexName Up stream vertex name of the task, like "Map 1" or "Reducer 2".
+   * @param downVertexName The vertex name of task, like "Map 1" or "Reducer 2".
+   * @return The shuffle id. First convert upVertexName of String type to int, by invoke mapVertexId() method,
+   * Then convert downVertexName of String type to int, by invoke mapVertexId() method.
+   * Finally compute shuffle id by pass tezDagID, upVertexId, downVertexId and invoke computeShuffleId() method.
+   * By map vertex name of String type to int type, we can compute shuffle id.
+   */
+  public static int computeShuffleId(int tezDagID, String upVertexName, String downVertexName) {
+    int upVertexId = mapVertexId(upVertexName);
+    int downVertexId = mapVertexId(downVertexName);
+    int shuffleId = computeShuffleId(tezDagID, upVertexId, downVertexId);
+    LOG.info("Compute Shuffle Id, upVertexName:{}, id:{}, downVertexName:{}, id:{}, shuffleId:{}",
+        upVertexName, upVertexId, downVertexName, downVertexId, shuffleId);
+    return shuffleId;
+  }
+
+  private static int computeShuffleId(int tezDagID, int upTezVertexID, int downTezVertexID) {
+    return tezDagID * (SHUFFLE_ID_MAGIC * SHUFFLE_ID_MAGIC)  + upTezVertexID * SHUFFLE_ID_MAGIC + downTezVertexID;
+  }
+
+  /**
+   *
+   * @param vertexName: vertex name, like "Map 1" or "Reducer 2"
+   * @return Map vertex name of String type to int type.
+   * Split vertex name, get vertex type and vertex id number, if it's map vertex, then return vertex id number,
+   * else if it's reducer vertex, then add VERTEX_ID_MAPPING_MAGIC and vertex id number finally return it.
+   */
+  private static int mapVertexId(String vertexName) {
+    String[] ss = vertexName.split("\\s+");
+    if (Integer.parseInt(ss[1]) > VERTEX_ID_MAPPING_MAX_ID) {
+      throw new RssException("Too large vertex name to id mapping, vertexName:" + vertexName);
+    }
+    if (VERTEX_ID_MAPPING_MAP.equals(ss[0])) {
+      return Integer.parseInt(ss[1]);
+    } else if (VERTEX_ID_MAPPING_REDUCER.equals(ss[0])) {
+      return VERTEX_ID_MAPPING_MAGIC + Integer.parseInt(ss[1]);
+    } else {
+      throw new RssException("Wrong vertex name to id mapping, vertexName:" + vertexName);
+    }
+  }
+
+  public static long convertTaskAttemptIdToLong(TezTaskAttemptID taskAttemptID, int appAttemptId) {
+    long lowBytes = taskAttemptID.getTaskID().getId();
+    if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
+      throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
+    }
+    if (appAttemptId < 1) {
+      throw new RssException("appAttemptId  " + appAttemptId + " is wrong");
+    }
+    long highBytes = (long)taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+    if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
+      throw new RssException("TaskAttempt " + taskAttemptID + " high bytes " + highBytes
+          + " exceed, appAttemptId:" + appAttemptId);
+    }
+
+    long id = (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH)) + lowBytes;
+    LOG.info("ConvertTaskAttemptIdToLong id is {}", id);
+    LOG.info("LowBytes id is {}", lowBytes);
+    LOG.info("HighBytes id is {}", highBytes);
+    return id;
+  }
+
+  public static Roaring64NavigableMap fetchAllRssTaskIds(
+          Set<InputAttemptIdentifier> successMapTaskAttempts,
+          Integer totalMapsCount,
+          Integer appAttemptId) {
+    Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
+    Roaring64NavigableMap mapIndexBitmap = Roaring64NavigableMap.bitmapOf();
+    String errMsg = "TaskAttemptIDs are inconsistent with map tasks";
+    LOG.info("FetchAllRssTaskIds successMapTaskAttempts:{}", successMapTaskAttempts);
+    LOG.info("FetchAllRssTaskIds totalMapsCount:{}, appAttemptId:{}", totalMapsCount, appAttemptId);
+
+    for (InputAttemptIdentifier inputAttemptIdentifier: successMapTaskAttempts) {
+      int mapIndex = inputAttemptIdentifier.getInputIdentifier();
+      String pathComponent = inputAttemptIdentifier.getPathComponent();
+      int taskId = RssTezUtils.taskIdStrToTaskId(pathComponent);
+      // There can be multiple successful attempts on same map task.
+      // So we only need to accept one of them.
+      LOG.info("FetchAllRssTaskIds, taskId:{}, is contains:{}", taskId, mapIndexBitmap.contains(taskId));
+      if (!mapIndexBitmap.contains(taskId)) {
+        taskIdBitmap.addLong(taskId);
+
+        LOG.info("FetchAllRssTaskIds, successMapTaskAttempts:{}, totalMapsCount:{}, appAttemptId:{}, mapIndex:{}, "
+                + "totalMapsCount:{}, taskId: {} ",
+            successMapTaskAttempts.size(), totalMapsCount, appAttemptId, mapIndex, totalMapsCount, taskId);
+
+        if (mapIndex < totalMapsCount) {  // up-stream map task index should < total task number(including failed task)
+          mapIndexBitmap.addLong(taskId);
+        } else {
+
+          LOG.error("SuccessMapTaskAttempts:{}, totalMapsCount:{}, appAttemptId:{}, mapIndex:{}, totalMapsCount:{} ",
+              successMapTaskAttempts, totalMapsCount, appAttemptId, mapIndex, totalMapsCount);
+
+          LOG.error(inputAttemptIdentifier + " has overflowed mapIndex");
+          throw new IllegalStateException(errMsg);
+        }
+      } else {
+        LOG.warn(inputAttemptIdentifier + " is redundant on index: " + mapIndex);
+      }
+    }
+    // each map should have only one success attempt
+    if (mapIndexBitmap.getLongCardinality() != taskIdBitmap.getLongCardinality()) {
+      throw new IllegalStateException(errMsg);
+    }
+    return taskIdBitmap;
+  }
+
+  public static int taskIdStrToTaskId(String taskIdStr) {
+    try {
+      int pos1 = taskIdStr.indexOf(UNDERLINE_DELIMITER);
+      int pos2 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos1 + 1);
+      int pos3 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos2 + 1);
+      int pos4 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos3 + 1);
+      int pos5 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos4 + 1);
+      int pos6 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos5 + 1);
+      return Integer.parseInt(taskIdStr.substring(pos5 + 1, pos6));
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.error("Failed to get VertexId, taskId:{}.",taskIdStr, e);
+      throw e;
+    }
+  }
+
+  // multiHostInfo is like:
+  // 0_172.19.193.152:19999,1_172.19.193.152:19999
+  private static void parseRssWorkerFromHostInfo(Map<Integer, Set<ShuffleServerInfo>> rssWorker, String multiHostInfo) {
+    for (String hostInfo : multiHostInfo.split(",")) {
+      LOG.info("ParseRssWorker, hostInfo:{}", hostInfo);
+      String[] info = hostInfo.split("_|:");
+      int partitionId = Integer.parseInt(info[0]);
+      ShuffleServerInfo serverInfo = new ShuffleServerInfo(info[1], Integer.parseInt(info[2]));
+      rssWorker.computeIfAbsent(partitionId, k -> new HashSet<>());
+      rssWorker.get(partitionId).add(serverInfo);
+      LOG.info("Parse Rss Worker, add partition:{}, serverInfo:{}", partitionId, serverInfo);
+    }
+  }
+
+  // hostnameInfo is like:
+  // Map 1=0_172.19.193.152:19999,0_172.19.193.152:19999;Map 2=0_172.19.193.152:19999,1_17
+  public static void parseRssWorker(Map<Integer, Set<ShuffleServerInfo>> rssWorker, int shuffleId,
+                                    String hostnameInfo) {
+    LOG.info("ParseRssWorker, hostnameInfo:{}", hostnameInfo);
+    for (String toVertex: hostnameInfo.split(";")) {
+      LOG.info("ParseRssWorker, hostnameInffdafdso:{}", toVertex);
+      String[] splits = toVertex.split("=");
+      if (splits.length == 2 && String.valueOf(shuffleId).equals(splits[0])) {
+        String workerStr = toVertex.split("=")[1];
+        LOG.info("ParseRssWorker, workerStr:{}", workerStr);
+        parseRssWorkerFromHostInfo(rssWorker, workerStr);
+      }
+    }
+  }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/common/TezIdHelper.java b/client-tez/src/main/java/org/apache/tez/common/TezIdHelper.java
new file mode 100644
index 00000000..ed661e5d
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/TezIdHelper.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+
+import org.apache.uniffle.common.util.IdHelper;
+
+public class TezIdHelper implements IdHelper {
+
+  @Override
+  public long getTaskAttemptId(long blockId) {
+    return RssTezUtils.getTaskAttemptId(blockId);
+  }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
new file mode 100644
index 00000000..faa27ab3
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class RssTezUtilsTest {
+
+  private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
+
+  @Test
+  public void baskAttemptIdTest() {
+    long taskAttemptId = 0x1000ad12;
+    ApplicationId appId = ApplicationId.newInstance(9999, 72);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+    TezTaskID taskId = TezTaskID.getInstance(vId, (int)taskAttemptId);
+    TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 3);
+
+    boolean isException = false;
+    try {
+      RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+    } catch (RssException e) {
+      isException = true;
+    }
+    assertTrue(isException);
+
+    taskId = TezTaskID.getInstance(vId, (int)(1 << 21));
+    tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
+    isException = false;
+    try {
+      RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+    } catch (RssException e) {
+      isException = true;
+    }
+    assertTrue(isException);
+  }
+
+  @Test
+  public void blockConvertTest() {
+    ApplicationId appId = ApplicationId.newInstance(9999, 72);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+    TezTaskID tId = TezTaskID.getInstance(vId, 389);
+    TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
+    long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+    long blockId = RssTezUtils.getBlockId(1, taskAttemptId, 0);
+    long newTaskAttemptId = RssTezUtils.getTaskAttemptId(blockId);
+    assertEquals(taskAttemptId, newTaskAttemptId);
+    blockId = RssTezUtils.getBlockId(2, taskAttemptId, 2);
+    newTaskAttemptId = RssTezUtils.getTaskAttemptId(blockId);
+    assertEquals(taskAttemptId, newTaskAttemptId);
+  }
+
+  @Test
+  public void testPartitionIdConvertBlock() {
+    ApplicationId appId = ApplicationId.newInstance(9999, 72);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+    TezTaskID tId = TezTaskID.getInstance(vId, 389);
+    TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
+    long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+    long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+    for (int partitionId = 0; partitionId <= 3000; partitionId++) {
+      for (int seqNo = 0; seqNo <= 10; seqNo++) {
+        long blockId = RssTezUtils.getBlockId(Long.valueOf(partitionId), taskAttemptId, seqNo);
+        int newPartitionId = Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+        assertEquals(partitionId, newPartitionId);
+      }
+    }
+  }
+
+  @Test
+  public void testEstimateTaskConcurrency() {
+    Configuration jobConf = new Configuration();
+    int mapNum = 500;
+    int reduceNum = 20;
+    assertEquals(495, RssTezUtils.estimateTaskConcurrency(jobConf, mapNum, reduceNum));
+
+    jobConf.setDouble(Constants.MR_SLOW_START, 1.0);
+    assertEquals(500, RssTezUtils.estimateTaskConcurrency(jobConf, mapNum, reduceNum));
+    jobConf.setInt(Constants.MR_MAP_LIMIT, 200);
+    jobConf.setInt(Constants.MR_REDUCE_LIMIT, 200);
+    assertEquals(200, RssTezUtils.estimateTaskConcurrency(jobConf, mapNum, reduceNum));
+
+    jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 0.5);
+    assertEquals(200, RssTezUtils.estimateTaskConcurrency(jobConf,mapNum, reduceNum));
+  }
+
+  @Test
+  public void testGetRequiredShuffleServerNumber() {
+    Configuration jobConf = new Configuration();
+    int mapNum = 500;
+    int reduceNum = 20;
+    jobConf.setInt(RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, 10);
+    assertEquals(10, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+    jobConf.setBoolean(RssTezConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED, true);
+    assertEquals(10, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+    jobConf.unset(RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
+    assertEquals(7, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+    jobConf.setDouble(Constants.MR_SLOW_START, 1.0);
+    assertEquals(7, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+    jobConf.setInt(Constants.MR_MAP_LIMIT, 200);
+    jobConf.setInt(Constants.MR_REDUCE_LIMIT, 200);
+    assertEquals(3, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+    jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 0.5);
+    assertEquals(3, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+  }
+
+
+  @Test
+  public void testComputeShuffleId() {
+    int dagId = 1;
+    String upVertexName = "Map 1";
+    String downVertexName = "Reducer 2";
+    assertEquals(1001602, RssTezUtils.computeShuffleId(dagId, upVertexName, downVertexName));
+  }
+
+  @Test
+  public void testTaskIdStrToTaskId() {
+    assertEquals(0, RssTezUtils.taskIdStrToTaskId("attempt_1680867852986_0012_1_01_000000_0_10003"));
+  }
+
+  @Test
+  public void attemptTaskIdTest() {
+    String tezTaskAttemptId = "attempt_1677051234358_0091_1_00_000000_0";
+    TezTaskAttemptID originalTezTaskAttemptID = TezTaskAttemptID.fromString(tezTaskAttemptId);
+    String uniqueIdentifier = String.format("%s_%05d", tezTaskAttemptId, 3);
+    String uniqueIdentifierToAttemptId = RssTezUtils.uniqueIdentifierToAttemptId(uniqueIdentifier);
+    assertEquals(tezTaskAttemptId, uniqueIdentifierToAttemptId);
+    TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.fromString(uniqueIdentifierToAttemptId);
+    assertEquals(originalTezTaskAttemptID, tezTaskAttemptID);
+  }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/common/TezIdHelperTest.java b/client-tez/src/test/java/org/apache/tez/common/TezIdHelperTest.java
new file mode 100644
index 00000000..60da1e3a
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/common/TezIdHelperTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TezIdHelperTest {
+
+  @Test
+  public void TestTetTaskAttemptId(){
+    TezIdHelper tezIdHelper = new TezIdHelper();
+    assertEquals(0, tezIdHelper.getTaskAttemptId(27262976));
+    assertEquals(1, tezIdHelper.getTaskAttemptId(27262977));
+    assertEquals(0, RssTezUtils.taskIdStrToTaskId("attempt_1680867852986_0012_1_01_000000_0_10003"));
+    assertEquals(tezIdHelper.getTaskAttemptId(27262976), RssTezUtils.taskIdStrToTaskId("attempt_1680867852986_0012_1_01_000000_0_10003"));
+  }
+}
diff --git a/pom.xml b/pom.xml
index 98cf3ed2..582a44e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1628,19 +1628,67 @@
       <modules>
         <module>client-tez</module>
       </modules>
+      <properties>
+        <tez.version>0.9.1</tez.version>
+      </properties>
       <dependencyManagement>
         <dependencies>
           <dependency>
-            <groupId>org.apache.uniffle</groupId>
-            <artifactId>rss-integration-common-test</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-common</artifactId>
+            <version>${tez.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-library</artifactId>
+            <version>${tez.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-internals</artifactId>
+            <version>${tez.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-dag</artifactId>
+            <version>${tez.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-api</artifactId>
+            <version>${tez.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-auth</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>com.google.protobuf</groupId>
+                <artifactId>protobuf-java</artifactId>
+              </exclusion>
+            </exclusions>
           </dependency>
         </dependencies>
       </dependencyManagement>
-    </profile>
-    <profile>
+      </profile>
+      <profile>
       <id>kubernetes</id>
       <modules>
         <module>deploy/kubernetes</module>