You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/10/20 03:14:05 UTC
[incubator-uniffle] branch branch-0.6 updated: [ISSUE-257] RssMRUtils#getBlockId change the partitionId of int type to long (#266)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new d468f863 [ISSUE-257] RssMRUtils#getBlockId change the partitionId of int type to long (#266)
d468f863 is described below
commit d468f863a94c71805ba5641553fab039a944afeb
Author: aboy <fp...@users.noreply.github.com>
AuthorDate: Wed Oct 19 10:23:10 2022 +0800
[ISSUE-257] RssMRUtils#getBlockId change the partitionId of int type to long (#266)
### What changes were proposed in this pull request?
In the RssMRUtils#getBlockId method, change the partitionId of int type to long, make sure no overflow
### Why are the changes needed?
1: In the RssMRUtils#getBlockId method, change the partitionId of int type to long, make sure no overflow
### Does this PR introduce _any_ user-facing change?
Fix #257 issue ,view this issue for details
### How was this patch tested?
org.apache.hadoop.mapreduce.RssMRUtilsTest#partitionIdConvertBlockTest
Co-authored-by: fengpeikun <ka...@vipshop.com>
---
.../apache/hadoop/mapred/SortWriteBufferManager.java | 2 +-
.../java/org/apache/hadoop/mapreduce/RssMRUtils.java | 2 +-
.../org/apache/hadoop/mapreduce/RssMRUtilsTest.java | 17 +++++++++++++++++
3 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 4995cc1c..aa4da547 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -312,7 +312,7 @@ public class SortWriteBufferManager<K, V> {
final byte[] compressed = RssShuffleUtils.compressData(data);
final long crc32 = ChecksumUtils.getCrc32(compressed);
compressTime += System.currentTimeMillis() - start;
- final long blockId = RssMRUtils.getBlockId(partitionId, taskAttemptId, getNextSeqNo(partitionId));
+ final long blockId = RssMRUtils.getBlockId((long)partitionId, taskAttemptId, getNextSeqNo(partitionId));
uncompressedDataLen += data.length;
// add memory to indicate bytes which will be sent to shuffle server
inSendListBytes.addAndGet(wb.getDataLength());
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 740de51e..53026bbd 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -174,7 +174,7 @@ public class RssMRUtils {
return rssJobConf.get(key, mrJobConf.get(key, defaultValue));
}
- public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {
+ public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
long attemptId = taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
throw new RuntimeException("Can't support attemptId [" + attemptId
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index cf3fe781..de89de3b 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -74,6 +75,22 @@ public class RssMRUtilsTest {
assertEquals(taskAttemptId, newTaskAttemptId);
}
+ @Test
+ public void partitionIdConvertBlockTest() {
+ JobID jobID = new JobID();
+ TaskID taskId = new TaskID(jobID, TaskType.MAP, 233);
+ TaskAttemptID taskAttemptID = new TaskAttemptID(taskId, 1);
+ long taskAttemptId = RssMRUtils.convertTaskAttemptIdToLong(taskAttemptID, 1);
+ long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+ for (int partitionId = 0; partitionId <= 3000; partitionId++) {
+ for (int seqNo = 0; seqNo <= 10; seqNo++) {
+ long blockId = RssMRUtils.getBlockId(Long.valueOf(partitionId), taskAttemptId, seqNo);
+ int newPartitionId = Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+ assertEquals(partitionId, newPartitionId);
+ }
+ }
+ }
+
@Test
public void applyDynamicClientConfTest() {
final JobConf conf = new JobConf();