You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/14 02:41:32 UTC

[incubator-uniffle] branch master updated: [AQE][LocalOrder] Fix wrong parameter for expectedTaskIds in LocalOrderSegmentSplit (#319)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aa0117f [AQE][LocalOrder] Fix wrong parameter for expectedTaskIds in LocalOrderSegmentSplit (#319)
7aa0117f is described below

commit 7aa0117f3d0115771eb41602833014e58c77e4fd
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Mon Nov 14 10:41:26 2022 +0800

    [AQE][LocalOrder] Fix wrong parameter for expectedTaskIds in LocalOrderSegmentSplit (#319)
    
    ### What changes were proposed in this pull request?
    1. Fix wrong param of expectedTaskIds in `LocalOrderSegmentSplitter`
    2. Fix the `LOCAL_ORDER` type invalid when reading
    
    ### Why are the changes needed?
    In current codebase, the reads of `LOCAL_ORDER` is invalid. This PR is to fix it.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing UTs of `AQESkewedJoinWithLocalOrderTest` cover this case.
---
 .../uniffle/client/impl/ShuffleReadClientImpl.java | 43 +++++++++++-----------
 .../impl/LocalFileQuorumClientReadHandler.java     |  2 +-
 2 files changed, 22 insertions(+), 23 deletions(-)

diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 28e81f29..99387f59 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -61,7 +61,6 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
   private AtomicLong crcCheckTime = new AtomicLong(0);
   private ClientReadHandler clientReadHandler;
   private final IdHelper idHelper;
-  private ShuffleDataDistributionType dataDistributionType = ShuffleDataDistributionType.NORMAL;
 
   public ShuffleReadClientImpl(
       String storageType,
@@ -79,27 +78,6 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
       Configuration hadoopConf,
       IdHelper idHelper,
       ShuffleDataDistributionType dataDistributionType) {
-    this(storageType, appId, shuffleId, partitionId, indexReadLimit,
-        partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
-        blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf, idHelper);
-    this.dataDistributionType = dataDistributionType;
-  }
-
-  public ShuffleReadClientImpl(
-      String storageType,
-      String appId,
-      int shuffleId,
-      int partitionId,
-      int indexReadLimit,
-      int partitionNumPerRange,
-      int partitionNum,
-      int readBufferSize,
-      String storageBasePath,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      IdHelper idHelper) {
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.blockIdBitmap = blockIdBitmap;
@@ -140,6 +118,27 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
     clientReadHandler = ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
   }
 
+  public ShuffleReadClientImpl(
+      String storageType,
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int indexReadLimit,
+      int partitionNumPerRange,
+      int partitionNum,
+      int readBufferSize,
+      String storageBasePath,
+      Roaring64NavigableMap blockIdBitmap,
+      Roaring64NavigableMap taskIdBitmap,
+      List<ShuffleServerInfo> shuffleServerInfoList,
+      Configuration hadoopConf,
+      IdHelper idHelper) {
+    this(storageType, appId, shuffleId, partitionId, indexReadLimit,
+        partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
+        blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
+        idHelper, ShuffleDataDistributionType.NORMAL);
+  }
+
   @Override
   public CompressedShuffleBlock readShuffleBlockData() {
     // empty data expected, just return null
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
index d523cd63..2a6afdbe 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
@@ -70,7 +70,7 @@ public class LocalFileQuorumClientReadHandler extends AbstractClientReadHandler
           processBlockIds,
           client,
           distributionType,
-          expectBlockIds
+          expectTaskIds
       ));
     }
   }