You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/19 08:23:17 UTC
[incubator-uniffle] branch master updated: [#872] feat(tez): Add the common and utils class (#890)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 19f95668 [#872] feat(tez): Add the common and utils class (#890)
19f95668 is described below
commit 19f9566817cd8b9e7c110a9080328a176206a49a
Author: Qing <11...@qq.com>
AuthorDate: Fri May 19 16:23:11 2023 +0800
[#872] feat(tez): Add the common and utils class (#890)
### What changes were proposed in this pull request?
Tez Shuffle Read supporting Remote Shuffle related common and utils
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/872
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
---
client-tez/pom.xml | 98 ++---
.../java/org/apache/tez/common/RssTezConfig.java | 172 +++++++++
.../java/org/apache/tez/common/RssTezUtils.java | 404 +++++++++++++++++++++
.../java/org/apache/tez/common/TezIdHelper.java | 29 ++
.../org/apache/tez/common/RssTezUtilsTest.java | 160 ++++++++
.../org/apache/tez/common/TezIdHelperTest.java | 33 ++
pom.xml | 62 +++-
7 files changed, 909 insertions(+), 49 deletions(-)
diff --git a/client-tez/pom.xml b/client-tez/pom.xml
index fcc8931b..b3a19054 100644
--- a/client-tez/pom.xml
+++ b/client-tez/pom.xml
@@ -15,6 +15,7 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
+
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -33,52 +34,65 @@
<name>Apache Uniffle Client (Tez)</name>
<dependencies>
- <dependency>
- <groupId>org.apache.uniffle</groupId>
- <artifactId>rss-client</artifactId>
- </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client</artifactId>
+ </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- <dependency>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-netty-shaded</artifactId>
- <version>${grpc.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf</artifactId>
- </dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <exclusions>
+ <exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.github.luben</groupId>
- <artifactId>zstd-jni</artifactId>
- </dependency>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
new file mode 100644
index 00000000..f544773f
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.config.RssConf;
+
+public class RssTezConfig {
+
+ public static final String TEZ_RSS_CONFIG_PREFIX = "tez.";
+ public static final String RSS_CLIENT_HEARTBEAT_THREAD_NUM =
+ TEZ_RSS_CONFIG_PREFIX + "rss.client.heartBeat.threadNum";
+ public static final int RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE = 4;
+ public static final String RSS_CLIENT_TYPE = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_TYPE;
+ public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE;
+ public static final String RSS_CLIENT_RETRY_MAX = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX;
+ public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE;
+ public static final String RSS_CLIENT_RETRY_INTERVAL_MAX =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX;
+ public static final long RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE =
+ RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE;
+ public static final String RSS_COORDINATOR_QUORUM =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM;
+ public static final String RSS_DATA_REPLICA = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA;
+ public static final int RSS_DATA_REPLICA_DEFAULT_VALUE = RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE;
+ public static final String RSS_DATA_REPLICA_WRITE =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE;
+ public static final int RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE =
+ RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE;
+ public static final String RSS_DATA_REPLICA_READ =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ;
+ public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE =
+ RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE;
+ public static final String RSS_DATA_REPLICA_SKIP_ENABLED =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
+ public static final String RSS_DATA_TRANSFER_POOL_SIZE =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
+ public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
+ RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
+ public static final String RSS_DATA_COMMIT_POOL_SIZE =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE;
+ public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE =
+ RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE;
+
+ public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
+ RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
+ public static final String RSS_HEARTBEAT_INTERVAL =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL;
+ public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE =
+ RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE;
+ public static final String RSS_HEARTBEAT_TIMEOUT =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT;
+ public static final long RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE =
+ RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE;
+
+ // output:
+ public static final String RSS_RUNTIME_IO_SORT_MB = TEZ_RSS_CONFIG_PREFIX + "rss.runtime.io.sort.mb";
+ public static final int RSS_DEFAULT_RUNTIME_IO_SORT_MB = 100;
+ public static final String RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD = TEZ_RSS_CONFIG_PREFIX
+ + "rss.client.sort.memory.use.threshold";
+ public static final double RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD = 0.9f;
+ public static final String RSS_CLIENT_MAX_BUFFER_SIZE = TEZ_RSS_CONFIG_PREFIX + "rss.client.max.buffer.size";
+ public static final long RSS_CLIENT_DEFAULT_MAX_BUFFER_SIZE = 3 * 1024;
+ public static final String RSS_WRITER_BUFFER_SIZE = TEZ_RSS_CONFIG_PREFIX + "rss.writer.buffer.size";
+ public static final long RSS_DEFAULT_WRITER_BUFFER_SIZE = 1024 * 1024 * 14;
+ public static final String RSS_CLIENT_MEMORY_THRESHOLD = TEZ_RSS_CONFIG_PREFIX + "rss.client.memory.threshold";
+ public static final double RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD = 0.8f;
+ public static final String RSS_CLIENT_SEND_THRESHOLD = TEZ_RSS_CONFIG_PREFIX + "rss.client.send.threshold";
+ public static final double RSS_CLIENT_DEFAULT_SEND_THRESHOLD = 0.2f;
+ public static final String RSS_CLIENT_BATCH_TRIGGER_NUM = TEZ_RSS_CONFIG_PREFIX + "rss.client.batch.trigger.num";
+ public static final int RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM = 50;
+ public static final String RSS_DEFAULT_STORAGE_TYPE = "MEMORY";
+ public static final String RSS_CLIENT_SEND_CHECK_INTERVAL_MS = TEZ_RSS_CONFIG_PREFIX
+ + "rss.client.send.check.interval.ms";
+ public static final long RSS_CLIENT_DEFAULT_SEND_CHECK_INTERVAL_MS = 500L;
+ public static final String RSS_CLIENT_SEND_CHECK_TIMEOUT_MS = TEZ_RSS_CONFIG_PREFIX
+ + "rss.client.send.check.timeout.ms";
+ public static final long RSS_CLIENT_DEFAULT_SEND_CHECK_TIMEOUT_MS = 60 * 1000 * 10L;
+ public static final String RSS_CLIENT_BITMAP_NUM = TEZ_RSS_CONFIG_PREFIX + "rss.client.bitmap.num";
+ public static final int RSS_CLIENT_DEFAULT_BITMAP_NUM = 1;
+ public static final String HIVE_TEZ_LOG_LEVEL = "hive.tez.log.level";
+ public static final String DEFAULT_HIVE_TEZ_LOG_LEVEL = "INFO";
+ public static final String DEBUG_HIVE_TEZ_LOG_LEVEL = "debug";
+
+ public static final String RSS_STORAGE_TYPE = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE;
+
+ public static final String RSS_DYNAMIC_CLIENT_CONF_ENABLED =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED;
+ public static final boolean RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE =
+ RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE;
+
+ public static final String RSS_CLIENT_ASSIGNMENT_TAGS =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS;
+
+ public static final String RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER =
+ RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
+ public static final int RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE =
+ RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE;
+
+ public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL;
+ public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE =
+ RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE;
+ public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
+ public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE =
+ RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE;
+
+ public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED;
+ public static final boolean RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE =
+ RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE;
+
+ public static final String RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR;
+
+ public static final double RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE
+ = RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE;
+
+ public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER;
+ public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
+ RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;
+
+ public static final String RSS_CONF_FILE = "rss_conf.xml";
+
+ public static final String RSS_REMOTE_STORAGE_PATH =
+ TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
+
+ // Whether enable test mode for the MR Client
+ public static final String RSS_TEST_MODE_ENABLE = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE;
+
+ public static final String RSS_AM_SHUFFLE_MANAGER_ADDRESS = TEZ_RSS_CONFIG_PREFIX + "rss.am.shuffle.manager.address";
+ public static final String RSS_AM_SHUFFLE_MANAGER_PORT = TEZ_RSS_CONFIG_PREFIX + "rss.am.shuffle.manager.port";
+ public static final String RSS_AM_SHUFFLE_MANAGER_DEBUG = TEZ_RSS_CONFIG_PREFIX + "rss.am.shuffle.manager.debug";
+ public static final String RSS_AM_SLOW_START_ENABLE = TEZ_RSS_CONFIG_PREFIX + "rss.am.slow.start.enable";
+ public static final Boolean RSS_AM_SLOW_START_ENABLE_DEFAULT = false;
+
+ public static final String RSS_REDUCE_INITIAL_MEMORY = TEZ_RSS_CONFIG_PREFIX + "rss.reduce.initial.memory";
+
+ public static RssConf toRssConf(Configuration jobConf) {
+ RssConf rssConf = new RssConf();
+ for (Map.Entry<String, String> entry : jobConf) {
+ String key = entry.getKey();
+ if (!key.startsWith(TEZ_RSS_CONFIG_PREFIX)) {
+ continue;
+ }
+ key = key.substring(TEZ_RSS_CONFIG_PREFIX.length());
+ rssConf.setString(key, entry.getValue());
+ }
+ return rssConf;
+ }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
new file mode 100644
index 00000000..694147da
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
+
+public class RssTezUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
+
+ private static final int MAX_ATTEMPT_LENGTH = 6;
+ private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+
+ public static final String HOST_NAME = "hostname";
+
+ public static final String PLUS_DELIMITER = "+";
+ public static final String UNDERLINE_DELIMITER = "_";
+ public static final String COLON_DELIMITER = ":";
+ public static final String COMMA_DELIMITER = ",";
+
+ // constant to compute shuffle id
+ private static final int VERTEX_ID_MAPPING_MAX_ID = 500;
+ private static final String VERTEX_ID_MAPPING_MAP = "Map";
+ private static final String VERTEX_ID_MAPPING_REDUCER = "Reducer";
+ private static final int VERTEX_ID_MAPPING_MAGIC = 600;
+ private static final int SHUFFLE_ID_MAGIC = 1000;
+
+
+
+ private RssTezUtils() {
+ }
+
+ public static ShuffleWriteClient createShuffleClient(Configuration conf) {
+ int heartBeatThreadNum = conf.getInt(RssTezConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
+ RssTezConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+ int retryMax = conf.getInt(RssTezConfig.RSS_CLIENT_RETRY_MAX,
+ RssTezConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+ long retryIntervalMax = conf.getLong(RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+ RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
+ String clientType = conf.get(RssTezConfig.RSS_CLIENT_TYPE,
+ RssTezConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
+ int replicaWrite = conf.getInt(RssTezConfig.RSS_DATA_REPLICA_WRITE,
+ RssTezConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
+ int replicaRead = conf.getInt(RssTezConfig.RSS_DATA_REPLICA_READ,
+ RssTezConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+ int replica = conf.getInt(RssTezConfig.RSS_DATA_REPLICA,
+ RssTezConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
+ boolean replicaSkipEnabled = conf.getBoolean(RssTezConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
+ RssTezConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
+ int dataTransferPoolSize = conf.getInt(RssTezConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+ RssTezConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+ int dataCommitPoolSize = conf.getInt(RssTezConfig.RSS_DATA_COMMIT_POOL_SIZE,
+ RssTezConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
+ ShuffleWriteClient client = ShuffleClientFactory
+ .getInstance()
+ .createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
+ heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled,
+ dataTransferPoolSize, dataCommitPoolSize);
+ return client;
+ }
+
+ public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+ long initialMemRequestMb = conf.getLong(RssTezConfig.RSS_RUNTIME_IO_SORT_MB,
+ RssTezConfig.RSS_DEFAULT_RUNTIME_IO_SORT_MB);
+ LOG.info("InitialMemRequestMb is {}", initialMemRequestMb);
+ LOG.info("MaxAvailableTaskMemory is {}", maxAvailableTaskMemory);
+ long reqBytes = initialMemRequestMb << 20;
+ Preconditions.checkArgument(initialMemRequestMb > 0 && reqBytes < maxAvailableTaskMemory,
+ RssTezConfig.RSS_RUNTIME_IO_SORT_MB + initialMemRequestMb
+ + " should be " + "larger than 0 and should be less than the available task memory (MB):"
+ + (maxAvailableTaskMemory >> 20));
+ LOG.info("Requested BufferSize (" + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB
+ + ") : " + initialMemRequestMb);
+ return reqBytes;
+ }
+
+ public static String uniformPartitionHostInfo(Map<Integer, List<ShuffleServerInfo>> map) {
+ List<String> res = new ArrayList<>();
+ String tmp;
+ Set<Integer> pidSet = map.keySet();
+ for (Integer pid : pidSet) {
+ for (ShuffleServerInfo shuffleServerInfo : map.get(pid)) {
+ tmp = pid + UNDERLINE_DELIMITER + shuffleServerInfo.getHost() + COLON_DELIMITER + shuffleServerInfo.getNettyPort();
+ res.add(tmp);
+ }
+ }
+ return org.apache.commons.lang.StringUtils.join(res, COMMA_DELIMITER);
+ }
+
+ public static Map<String, List<String>> uniformServerToPartitions(String partitionToServers) {
+ Map<String, List<String>> serverToPartitions = new HashMap<>();
+ List<String> list;
+
+ String[] pidWithWorkerInfos = partitionToServers.split(COMMA_DELIMITER);
+ for (String pidWithWorkerInfo : pidWithWorkerInfos) {
+ String[] pidUnderLineWorkerInfo = pidWithWorkerInfo.split(UNDERLINE_DELIMITER);
+ if (serverToPartitions.containsKey(pidUnderLineWorkerInfo[1])) {
+ list = serverToPartitions.get(pidUnderLineWorkerInfo[1]);
+ list.add(pidUnderLineWorkerInfo[0]);
+ } else {
+ list = new ArrayList<>();
+ list.add(pidUnderLineWorkerInfo[0]);
+ serverToPartitions.put(pidUnderLineWorkerInfo[1], list);
+ }
+ }
+
+ return serverToPartitions;
+ }
+
+ public static String uniformServerToPartitions(Map<String, List<String>> map) {
+ List<String> res = new ArrayList<>();
+ Set<String> keySet = map.keySet();
+ for (String s : keySet) {
+ String join = org.apache.commons.lang.StringUtils.join(map.get(s), UNDERLINE_DELIMITER);
+ res.add(s + PLUS_DELIMITER + join);
+ }
+
+ return org.apache.commons.lang.StringUtils.join(res,COMMA_DELIMITER);
+ }
+
+ public static ApplicationAttemptId getApplicationAttemptId() {
+ String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ ContainerId containerId = ContainerId.fromString(containerIdStr);
+ return containerId.getApplicationAttemptId();
+ }
+
+ public static String uniqueIdentifierToAttemptId(String uniqueIdentifier) {
+ if (uniqueIdentifier == null) {
+ throw new RuntimeException("uniqueIdentifier should not be null");
+ }
+ String[] ids = uniqueIdentifier.split("_");
+ return StringUtils.join(ids, "_", 0, 7);
+ }
+
+ public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
+ long attemptId = taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
+ if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
+ throw new RuntimeException("Can't support attemptId [" + attemptId
+ + "], the max value should be " + MAX_ATTEMPT_ID);
+ }
+ long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
+ if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+ throw new RuntimeException("Can't support sequence [" + atomicInt
+ + "], the max value should be " + Constants.MAX_SEQUENCE_NO);
+ }
+ if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
+ throw new RuntimeException("Can't support partitionId["
+ + partitionId + "], the max value should be " + Constants.MAX_PARTITION_ID);
+ }
+ long taskId = taskAttemptId - (attemptId
+ << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
+ throw new RuntimeException("Can't support taskId["
+ + taskId + "], the max value should be " + Constants.MAX_TASK_ATTEMPT_ID);
+ }
+ return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+ + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) + taskId;
+ }
+
+ public static long getTaskAttemptId(long blockId) {
+ long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
+ long attemptId = (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
+ & MAX_ATTEMPT_ID;
+ return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH)) + mapId;
+ }
+
+ public static int estimateTaskConcurrency(Configuration jobConf, int mapNum, int reduceNum) {
+ double dynamicFactor = jobConf.getDouble(RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR,
+ RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE);
+ double slowStart = jobConf.getDouble(Constants.MR_SLOW_START, Constants.MR_SLOW_START_DEFAULT_VALUE);
+ int mapLimit = jobConf.getInt(Constants.MR_MAP_LIMIT, Constants.MR_MAP_LIMIT_DEFAULT_VALUE);
+ int reduceLimit = jobConf.getInt(Constants.MR_REDUCE_LIMIT, Constants.MR_REDUCE_LIMIT_DEFAULT_VALUE);
+
+ int estimateMapNum = mapLimit > 0 ? Math.min(mapNum, mapLimit) : mapNum;
+ int estimateReduceNum = reduceLimit > 0 ? Math.min(reduceNum, reduceLimit) : reduceNum;
+ if (slowStart == 1) {
+ return (int) (Math.max(estimateMapNum, estimateReduceNum) * dynamicFactor);
+ } else {
+ return (int) (((1 - slowStart) * estimateMapNum + estimateReduceNum) * dynamicFactor);
+ }
+ }
+
+ public static int getRequiredShuffleServerNumber(Configuration jobConf, int mapNum, int reduceNum) {
+ int requiredShuffleServerNumber = jobConf.getInt(
+ RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
+ RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
+ );
+ boolean enabledEstimateServer = jobConf.getBoolean(
+ RssTezConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED,
+ RssTezConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE
+ );
+ if (!enabledEstimateServer || requiredShuffleServerNumber > 0) {
+ return requiredShuffleServerNumber;
+ }
+ int taskConcurrency = estimateTaskConcurrency(jobConf, mapNum, reduceNum);
+ int taskConcurrencyPerServer = jobConf.getInt(RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER,
+ RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
+ return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
+ }
+
+ // compute shuffle id using InputContext
+ public static int computeShuffleId(InputContext inputContext) {
+ int dagIdentifier = inputContext.getDagIdentifier();
+ String sourceVertexName = inputContext.getSourceVertexName();
+ String taskVertexName = inputContext.getTaskVertexName();
+ return RssTezUtils.computeShuffleId(dagIdentifier, sourceVertexName, taskVertexName);
+ }
+
+ /**
+ *
+ * @param tezDagID Get from tez InputContext, represent dag id.
+ * @param upVertexName Up stream vertex name of the task, like "Map 1" or "Reducer 2".
+ * @param downVertexName The vertex name of task, like "Map 1" or "Reducer 2".
+ * @return The shuffle id. First convert upVertexName of String type to int, by invoke mapVertexId() method,
+ * Then convert downVertexName of String type to int, by invoke mapVertexId() method.
+ * Finally compute shuffle id by pass tezDagID, upVertexId, downVertexId and invoke computeShuffleId() method.
+ * By map vertex name of String type to int type, we can compute shuffle id.
+ */
+ public static int computeShuffleId(int tezDagID, String upVertexName, String downVertexName) {
+ int upVertexId = mapVertexId(upVertexName);
+ int downVertexId = mapVertexId(downVertexName);
+ int shuffleId = computeShuffleId(tezDagID, upVertexId, downVertexId);
+ LOG.info("Compute Shuffle Id, upVertexName:{}, id:{}, downVertexName:{}, id:{}, shuffleId:{}",
+ upVertexName, upVertexId, downVertexName, downVertexId, shuffleId);
+ return shuffleId;
+ }
+
+ private static int computeShuffleId(int tezDagID, int upTezVertexID, int downTezVertexID) {
+ return tezDagID * (SHUFFLE_ID_MAGIC * SHUFFLE_ID_MAGIC) + upTezVertexID * SHUFFLE_ID_MAGIC + downTezVertexID;
+ }
+
+ /**
+ *
+ * @param vertexName: vertex name, like "Map 1" or "Reducer 2"
+ * @return Map vertex name of String type to int type.
+ * Split vertex name, get vertex type and vertex id number, if it's map vertex, then return vertex id number,
+ * else if it's reducer vertex, then add VERTEX_ID_MAPPING_MAGIC and vertex id number finally return it.
+ */
+ private static int mapVertexId(String vertexName) {
+ String[] ss = vertexName.split("\\s+");
+ if (Integer.parseInt(ss[1]) > VERTEX_ID_MAPPING_MAX_ID) {
+ throw new RssException("Too large vertex name to id mapping, vertexName:" + vertexName);
+ }
+ if (VERTEX_ID_MAPPING_MAP.equals(ss[0])) {
+ return Integer.parseInt(ss[1]);
+ } else if (VERTEX_ID_MAPPING_REDUCER.equals(ss[0])) {
+ return VERTEX_ID_MAPPING_MAGIC + Integer.parseInt(ss[1]);
+ } else {
+ throw new RssException("Wrong vertex name to id mapping, vertexName:" + vertexName);
+ }
+ }
+
+ public static long convertTaskAttemptIdToLong(TezTaskAttemptID taskAttemptID, int appAttemptId) {
+ long lowBytes = taskAttemptID.getTaskID().getId();
+ if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
+ throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
+ }
+ if (appAttemptId < 1) {
+ throw new RssException("appAttemptId " + appAttemptId + " is wrong");
+ }
+ long highBytes = (long)taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+ if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
+ throw new RssException("TaskAttempt " + taskAttemptID + " high bytes " + highBytes
+ + " exceed, appAttemptId:" + appAttemptId);
+ }
+
+ long id = (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH)) + lowBytes;
+ LOG.info("ConvertTaskAttemptIdToLong id is {}", id);
+ LOG.info("LowBytes id is {}", lowBytes);
+ LOG.info("HighBytes id is {}", highBytes);
+ return id;
+ }
+
+ public static Roaring64NavigableMap fetchAllRssTaskIds(
+ Set<InputAttemptIdentifier> successMapTaskAttempts,
+ Integer totalMapsCount,
+ Integer appAttemptId) {
+ Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
+ Roaring64NavigableMap mapIndexBitmap = Roaring64NavigableMap.bitmapOf();
+ String errMsg = "TaskAttemptIDs are inconsistent with map tasks";
+ LOG.info("FetchAllRssTaskIds successMapTaskAttempts:{}", successMapTaskAttempts);
+ LOG.info("FetchAllRssTaskIds totalMapsCount:{}, appAttemptId:{}", totalMapsCount, appAttemptId);
+
+ for (InputAttemptIdentifier inputAttemptIdentifier: successMapTaskAttempts) {
+ int mapIndex = inputAttemptIdentifier.getInputIdentifier();
+ String pathComponent = inputAttemptIdentifier.getPathComponent();
+ int taskId = RssTezUtils.taskIdStrToTaskId(pathComponent);
+ // There can be multiple successful attempts on same map task.
+ // So we only need to accept one of them.
+ LOG.info("FetchAllRssTaskIds, taskId:{}, is contains:{}", taskId, mapIndexBitmap.contains(taskId));
+ if (!mapIndexBitmap.contains(taskId)) {
+ taskIdBitmap.addLong(taskId);
+
+ LOG.info("FetchAllRssTaskIds, successMapTaskAttempts:{}, totalMapsCount:{}, appAttemptId:{}, mapIndex:{}, "
+ + "totalMapsCount:{}, taskId: {} ",
+ successMapTaskAttempts.size(), totalMapsCount, appAttemptId, mapIndex, totalMapsCount, taskId);
+
+ if (mapIndex < totalMapsCount) { // up-stream map task index should < total task number(including failed task)
+ mapIndexBitmap.addLong(taskId);
+ } else {
+
+ LOG.error("SuccessMapTaskAttempts:{}, totalMapsCount:{}, appAttemptId:{}, mapIndex:{}, totalMapsCount:{} ",
+ successMapTaskAttempts, totalMapsCount, appAttemptId, mapIndex, totalMapsCount);
+
+ LOG.error(inputAttemptIdentifier + " has overflowed mapIndex");
+ throw new IllegalStateException(errMsg);
+ }
+ } else {
+ LOG.warn(inputAttemptIdentifier + " is redundant on index: " + mapIndex);
+ }
+ }
+ // each map should have only one success attempt
+ if (mapIndexBitmap.getLongCardinality() != taskIdBitmap.getLongCardinality()) {
+ throw new IllegalStateException(errMsg);
+ }
+ return taskIdBitmap;
+ }
+
+ public static int taskIdStrToTaskId(String taskIdStr) {
+ try {
+ int pos1 = taskIdStr.indexOf(UNDERLINE_DELIMITER);
+ int pos2 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos1 + 1);
+ int pos3 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos2 + 1);
+ int pos4 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos3 + 1);
+ int pos5 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos4 + 1);
+ int pos6 = taskIdStr.indexOf(UNDERLINE_DELIMITER, pos5 + 1);
+ return Integer.parseInt(taskIdStr.substring(pos5 + 1, pos6));
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.error("Failed to get VertexId, taskId:{}.",taskIdStr, e);
+ throw e;
+ }
+ }
+
+ // multiHostInfo is like:
+ // 0_172.19.193.152:19999,1_172.19.193.152:19999
+ private static void parseRssWorkerFromHostInfo(Map<Integer, Set<ShuffleServerInfo>> rssWorker, String multiHostInfo) {
+ for (String hostInfo : multiHostInfo.split(",")) {
+ LOG.info("ParseRssWorker, hostInfo:{}", hostInfo);
+ String[] info = hostInfo.split("_|:");
+ int partitionId = Integer.parseInt(info[0]);
+ ShuffleServerInfo serverInfo = new ShuffleServerInfo(info[1], Integer.parseInt(info[2]));
+ rssWorker.computeIfAbsent(partitionId, k -> new HashSet<>());
+ rssWorker.get(partitionId).add(serverInfo);
+ LOG.info("Parse Rss Worker, add partition:{}, serverInfo:{}", partitionId, serverInfo);
+ }
+ }
+
+ // hostnameInfo is like:
+ // Map 1=0_172.19.193.152:19999,0_172.19.193.152:19999;Map 2=0_172.19.193.152:19999,1_17
+ public static void parseRssWorker(Map<Integer, Set<ShuffleServerInfo>> rssWorker, int shuffleId,
+ String hostnameInfo) {
+ LOG.info("ParseRssWorker, hostnameInfo:{}", hostnameInfo);
+ for (String toVertex: hostnameInfo.split(";")) {
+ LOG.info("ParseRssWorker, hostnameInffdafdso:{}", toVertex);
+ String[] splits = toVertex.split("=");
+ if (splits.length == 2 && String.valueOf(shuffleId).equals(splits[0])) {
+ String workerStr = toVertex.split("=")[1];
+ LOG.info("ParseRssWorker, workerStr:{}", workerStr);
+ parseRssWorkerFromHostInfo(rssWorker, workerStr);
+ }
+ }
+ }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/common/TezIdHelper.java b/client-tez/src/main/java/org/apache/tez/common/TezIdHelper.java
new file mode 100644
index 00000000..ed661e5d
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/common/TezIdHelper.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+
+import org.apache.uniffle.common.util.IdHelper;
+
+public class TezIdHelper implements IdHelper {
+
+ @Override
+ public long getTaskAttemptId(long blockId) {
+ return RssTezUtils.getTaskAttemptId(blockId);
+ }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
new file mode 100644
index 00000000..faa27ab3
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class RssTezUtilsTest {
+
+ private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
+
+ @Test
+ public void baskAttemptIdTest() {
+ long taskAttemptId = 0x1000ad12;
+ ApplicationId appId = ApplicationId.newInstance(9999, 72);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+ TezTaskID taskId = TezTaskID.getInstance(vId, (int)taskAttemptId);
+ TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 3);
+
+ boolean isException = false;
+ try {
+ RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+ } catch (RssException e) {
+ isException = true;
+ }
+ assertTrue(isException);
+
+ taskId = TezTaskID.getInstance(vId, (int)(1 << 21));
+ tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
+ isException = false;
+ try {
+ RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+ } catch (RssException e) {
+ isException = true;
+ }
+ assertTrue(isException);
+ }
+
+ @Test
+ public void blockConvertTest() {
+ ApplicationId appId = ApplicationId.newInstance(9999, 72);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+ TezTaskID tId = TezTaskID.getInstance(vId, 389);
+ TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
+ long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+ long blockId = RssTezUtils.getBlockId(1, taskAttemptId, 0);
+ long newTaskAttemptId = RssTezUtils.getTaskAttemptId(blockId);
+ assertEquals(taskAttemptId, newTaskAttemptId);
+ blockId = RssTezUtils.getBlockId(2, taskAttemptId, 2);
+ newTaskAttemptId = RssTezUtils.getTaskAttemptId(blockId);
+ assertEquals(taskAttemptId, newTaskAttemptId);
+ }
+
+ @Test
+ public void testPartitionIdConvertBlock() {
+ ApplicationId appId = ApplicationId.newInstance(9999, 72);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+ TezTaskID tId = TezTaskID.getInstance(vId, 389);
+ TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
+ long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+ long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+ for (int partitionId = 0; partitionId <= 3000; partitionId++) {
+ for (int seqNo = 0; seqNo <= 10; seqNo++) {
+ long blockId = RssTezUtils.getBlockId(Long.valueOf(partitionId), taskAttemptId, seqNo);
+ int newPartitionId = Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+ assertEquals(partitionId, newPartitionId);
+ }
+ }
+ }
+
+ @Test
+ public void testEstimateTaskConcurrency() {
+ Configuration jobConf = new Configuration();
+ int mapNum = 500;
+ int reduceNum = 20;
+ assertEquals(495, RssTezUtils.estimateTaskConcurrency(jobConf, mapNum, reduceNum));
+
+ jobConf.setDouble(Constants.MR_SLOW_START, 1.0);
+ assertEquals(500, RssTezUtils.estimateTaskConcurrency(jobConf, mapNum, reduceNum));
+ jobConf.setInt(Constants.MR_MAP_LIMIT, 200);
+ jobConf.setInt(Constants.MR_REDUCE_LIMIT, 200);
+ assertEquals(200, RssTezUtils.estimateTaskConcurrency(jobConf, mapNum, reduceNum));
+
+ jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 0.5);
+ assertEquals(200, RssTezUtils.estimateTaskConcurrency(jobConf,mapNum, reduceNum));
+ }
+
+ @Test
+ public void testGetRequiredShuffleServerNumber() {
+ Configuration jobConf = new Configuration();
+ int mapNum = 500;
+ int reduceNum = 20;
+ jobConf.setInt(RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, 10);
+ assertEquals(10, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+ jobConf.setBoolean(RssTezConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED, true);
+ assertEquals(10, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+ jobConf.unset(RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
+ assertEquals(7, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+ jobConf.setDouble(Constants.MR_SLOW_START, 1.0);
+ assertEquals(7, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+ jobConf.setInt(Constants.MR_MAP_LIMIT, 200);
+ jobConf.setInt(Constants.MR_REDUCE_LIMIT, 200);
+ assertEquals(3, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+ jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 0.5);
+ assertEquals(3, RssTezUtils.getRequiredShuffleServerNumber(jobConf, mapNum, reduceNum));
+ }
+
+
+ @Test
+ public void testComputeShuffleId() {
+ int dagId = 1;
+ String upVertexName = "Map 1";
+ String downVertexName = "Reducer 2";
+ assertEquals(1001602, RssTezUtils.computeShuffleId(dagId, upVertexName, downVertexName));
+ }
+
+ @Test
+ public void testTaskIdStrToTaskId() {
+ assertEquals(0, RssTezUtils.taskIdStrToTaskId("attempt_1680867852986_0012_1_01_000000_0_10003"));
+ }
+
+ @Test
+ public void attemptTaskIdTest() {
+ String tezTaskAttemptId = "attempt_1677051234358_0091_1_00_000000_0";
+ TezTaskAttemptID originalTezTaskAttemptID = TezTaskAttemptID.fromString(tezTaskAttemptId);
+ String uniqueIdentifier = String.format("%s_%05d", tezTaskAttemptId, 3);
+ String uniqueIdentifierToAttemptId = RssTezUtils.uniqueIdentifierToAttemptId(uniqueIdentifier);
+ assertEquals(tezTaskAttemptId, uniqueIdentifierToAttemptId);
+ TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.fromString(uniqueIdentifierToAttemptId);
+ assertEquals(originalTezTaskAttemptID, tezTaskAttemptID);
+ }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/common/TezIdHelperTest.java b/client-tez/src/test/java/org/apache/tez/common/TezIdHelperTest.java
new file mode 100644
index 00000000..60da1e3a
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/common/TezIdHelperTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TezIdHelperTest {
+
+ @Test
+ public void TestTetTaskAttemptId(){
+ TezIdHelper tezIdHelper = new TezIdHelper();
+ assertEquals(0, tezIdHelper.getTaskAttemptId(27262976));
+ assertEquals(1, tezIdHelper.getTaskAttemptId(27262977));
+ assertEquals(0, RssTezUtils.taskIdStrToTaskId("attempt_1680867852986_0012_1_01_000000_0_10003"));
+ assertEquals(tezIdHelper.getTaskAttemptId(27262976), RssTezUtils.taskIdStrToTaskId("attempt_1680867852986_0012_1_01_000000_0_10003"));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 98cf3ed2..582a44e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1628,19 +1628,67 @@
<modules>
<module>client-tez</module>
</modules>
+ <properties>
+ <tez.version>0.9.1</tez.version>
+ </properties>
<dependencyManagement>
<dependencies>
<dependency>
- <groupId>org.apache.uniffle</groupId>
- <artifactId>rss-integration-common-test</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
</dependencyManagement>
- </profile>
- <profile>
+ </profile>
+ <profile>
<id>kubernetes</id>
<modules>
<module>deploy/kubernetes</module>