You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/10/20 03:14:05 UTC

[incubator-uniffle] branch branch-0.6 updated: [ISSUE-257] RssMRUtils#getBlockId change the partitionId of int type to long (#266)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new d468f863 [ISSUE-257] RssMRUtils#getBlockId change the partitionId of int type to long (#266)
d468f863 is described below

commit d468f863a94c71805ba5641553fab039a944afeb
Author: aboy <fp...@users.noreply.github.com>
AuthorDate: Wed Oct 19 10:23:10 2022 +0800

    [ISSUE-257] RssMRUtils#getBlockId change the partitionId of int type to long (#266)
    
    ### What changes were proposed in this pull request?
    In the RssMRUtils#getBlockId method, change the partitionId of int type to long, make sure no overflow
    
    ### Why are the changes needed?
    1: In the RssMRUtils#getBlockId method, change the partitionId of int type to long, make sure no overflow
    
    
    ### Does this PR introduce _any_ user-facing change?
    Fix #257 issue ,view this issue for details
    
    
    ### How was this patch tested?
    org.apache.hadoop.mapreduce.RssMRUtilsTest#partitionIdConvertBlockTest
    
    Co-authored-by: fengpeikun <ka...@vipshop.com>
---
 .../apache/hadoop/mapred/SortWriteBufferManager.java    |  2 +-
 .../java/org/apache/hadoop/mapreduce/RssMRUtils.java    |  2 +-
 .../org/apache/hadoop/mapreduce/RssMRUtilsTest.java     | 17 +++++++++++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 4995cc1c..aa4da547 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -312,7 +312,7 @@ public class SortWriteBufferManager<K, V> {
     final byte[] compressed = RssShuffleUtils.compressData(data);
     final long crc32 = ChecksumUtils.getCrc32(compressed);
     compressTime += System.currentTimeMillis() - start;
-    final long blockId = RssMRUtils.getBlockId(partitionId, taskAttemptId, getNextSeqNo(partitionId));
+    final long blockId = RssMRUtils.getBlockId((long)partitionId, taskAttemptId, getNextSeqNo(partitionId));
     uncompressedDataLen += data.length;
     // add memory to indicate bytes which will be sent to shuffle server
     inSendListBytes.addAndGet(wb.getDataLength());
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 740de51e..53026bbd 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -174,7 +174,7 @@ public class RssMRUtils {
     return rssJobConf.get(key, mrJobConf.get(key, defaultValue));
   }
 
-  public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {
+  public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
     long attemptId = taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
     if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
       throw new RuntimeException("Can't support attemptId [" + attemptId
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index cf3fe781..de89de3b 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -74,6 +75,22 @@ public class RssMRUtilsTest {
     assertEquals(taskAttemptId, newTaskAttemptId);
   }
 
+  @Test
+  public void partitionIdConvertBlockTest() {
+    JobID jobID =  new JobID();
+    TaskID taskId =  new TaskID(jobID, TaskType.MAP, 233);
+    TaskAttemptID taskAttemptID = new TaskAttemptID(taskId, 1);
+    long taskAttemptId = RssMRUtils.convertTaskAttemptIdToLong(taskAttemptID, 1);
+    long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+    for (int partitionId = 0; partitionId <= 3000; partitionId++) {
+      for (int seqNo = 0; seqNo <= 10; seqNo++) {
+        long blockId = RssMRUtils.getBlockId(Long.valueOf(partitionId), taskAttemptId, seqNo);
+        int newPartitionId = Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+        assertEquals(partitionId, newPartitionId);
+      }
+    }
+  }
+
   @Test
   public void applyDynamicClientConfTest() {
     final JobConf conf = new JobConf();