You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/10/16 01:55:35 UTC
[incubator-uniffle] branch master updated: [#1090] refactor: Refactor the reader code with builder pattern (#1232)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f49b9de0d [#1090] refactor: Refactor the reader code with builder pattern (#1232)
f49b9de0d is described below
commit f49b9de0d8d6504f7d943cea9325f4c137d58ec9
Author: summaryzb <su...@gmail.com>
AuthorDate: Sun Oct 15 20:55:29 2023 -0500
[#1090] refactor: Refactor the reader code with builder pattern (#1232)
### What changes were proposed in this pull request?
As the title
### Why are the changes needed?
https://github.com/apache/incubator-uniffle/issues/1090
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../hadoop/mapreduce/task/reduce/RssShuffle.java | 33 +-
.../shuffle/reader/RssShuffleDataIteratorTest.java | 32 +-
.../spark/shuffle/reader/RssShuffleReader.java | 32 +-
.../spark/shuffle/reader/RssShuffleReader.java | 33 +-
.../common/shuffle/impl/RssTezFetcherTask.java | 34 +-
.../orderedgrouped/RssShuffleScheduler.java | 34 +-
.../client/factory/ShuffleClientFactory.java | 222 +++++++++-
.../uniffle/client/impl/ShuffleReadClientImpl.java | 213 +++------
.../request/CreateShuffleReadClientRequest.java | 221 ----------
.../client/impl/ShuffleReadClientImplTest.java | 477 ++++++---------------
.../uniffle/test/DiskErrorToleranceTest.java | 62 +--
.../test/HybridStorageFaultToleranceBase.java | 33 +-
.../java/org/apache/uniffle/test/QuorumTest.java | 401 ++++++-----------
.../ShuffleServerConcurrentWriteOfHadoopTest.java | 33 +-
.../uniffle/test/ShuffleServerWithHadoopTest.java | 128 +++---
.../ShuffleServerWithKerberizedHadoopTest.java | 126 +++---
.../uniffle/test/ShuffleWithRssClientTest.java | 59 ++-
.../uniffle/test/SparkClientWithLocalTest.java | 289 ++++---------
18 files changed, 919 insertions(+), 1543 deletions(-)
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 47d061546..1f7aae942 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -42,7 +42,6 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.hadoop.shim.HadoopShimImpl;
@@ -217,23 +216,23 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
LOG.info("In reduce: " + reduceId + ", Rss MR client starts to fetch blocks from RSS server");
JobConf readerJobConf = getRemoteConf();
boolean expectedTaskIdsBitmapFilterEnable = serverInfoList.size() > 1;
- CreateShuffleReadClientRequest request =
- new CreateShuffleReadClientRequest(
- appId,
- 0,
- reduceId.getTaskID().getId(),
- basePath,
- partitionNumPerRange,
- partitionNum,
- blockIdBitmap,
- taskIdBitmap,
- serverInfoList,
- readerJobConf,
- new MRIdHelper(),
- expectedTaskIdsBitmapFilterEnable,
- RssMRConfig.toRssConf(rssJobConf));
ShuffleReadClient shuffleReadClient =
- ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+ ShuffleClientFactory.getInstance()
+ .createShuffleReadClient(
+ ShuffleClientFactory.newReadBuilder()
+ .appId(appId)
+ .shuffleId(0)
+ .partitionId(reduceId.getTaskID().getId())
+ .basePath(basePath)
+ .partitionNumPerRange(partitionNumPerRange)
+ .partitionNum(partitionNum)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(serverInfoList)
+ .hadoopConf(readerJobConf)
+ .idHelper(new MRIdHelper())
+ .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+ .rssConf(RssMRConfig.toRssConf(rssJobConf)));
RssFetcher fetcher =
new RssFetcher(
mrJobConf,
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 748a2a58b..0dea95147 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -24,7 +24,6 @@ import java.util.Map;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
@@ -39,9 +38,9 @@ import org.mockito.Mockito;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleReadClient;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.ChecksumUtils;
@@ -114,21 +113,20 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
List<ShuffleServerInfo> serverInfos,
boolean compress) {
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 10000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(serverInfos),
- new Configuration(),
- new DefaultIdHelper());
+ ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.HDFS.name())
+ .appId("appId")
+ .shuffleId(0)
+ .partitionId(1)
+ .indexReadLimit(100)
+ .partitionNumPerRange(2)
+ .partitionNum(10)
+ .readBufferSize(10000)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(serverInfos))
+ .build();
RssConf rc;
if (!compress) {
SparkConf sc = new SparkConf();
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 6f9f3186d..9130bc587 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
@@ -108,23 +107,22 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
@Override
public Iterator<Product2<K, C>> read() {
LOG.info("Shuffle read started:" + getReadInfo());
-
- CreateShuffleReadClientRequest request =
- new CreateShuffleReadClientRequest(
- appId,
- shuffleId,
- startPartition,
- basePath,
- partitionNumPerRange,
- partitionNum,
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- expectedTaskIdsBitmapFilterEnable,
- rssConf);
ShuffleReadClient shuffleReadClient =
- ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+ ShuffleClientFactory.getInstance()
+ .createShuffleReadClient(
+ ShuffleClientFactory.newReadBuilder()
+ .appId(appId)
+ .shuffleId(shuffleId)
+ .partitionId(startPartition)
+ .basePath(basePath)
+ .partitionNumPerRange(partitionNumPerRange)
+ .partitionNum(partitionNum)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(shuffleServerInfoList)
+ .hadoopConf(hadoopConf)
+ .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+ .rssConf(rssConf));
RssShuffleDataIterator rssShuffleDataIterator =
new RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(),
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 4cd4aaa2b..f8ed76f73 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -246,23 +245,23 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
boolean expectedTaskIdsBitmapFilterEnable =
!(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE)
|| shuffleServerInfoList.size() > 1;
- CreateShuffleReadClientRequest request =
- new CreateShuffleReadClientRequest(
- appId,
- shuffleId,
- partition,
- basePath,
- 1,
- partitionNum,
- partitionToExpectBlocks.get(partition),
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- dataDistributionType,
- expectedTaskIdsBitmapFilterEnable,
- rssConf);
ShuffleReadClient shuffleReadClient =
- ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+ ShuffleClientFactory.getInstance()
+ .createShuffleReadClient(
+ ShuffleClientFactory.newReadBuilder()
+ .appId(appId)
+ .shuffleId(shuffleId)
+ .partitionId(partition)
+ .basePath(basePath)
+ .partitionNumPerRange(1)
+ .partitionNum(partitionNum)
+ .blockIdBitmap(partitionToExpectBlocks.get(partition))
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(shuffleServerInfoList)
+ .hadoopConf(hadoopConf)
+ .shuffleDataDistributionType(dataDistributionType)
+ .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+ .rssConf(rssConf));
RssShuffleDataIterator<K, C> iterator =
new RssShuffleDataIterator<>(
shuffleDependency.serializer(), shuffleReadClient, readMetrics, rssConf);
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
index 65ee99b4b..f1dd85b8b 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.UnitConverter;
@@ -179,23 +178,24 @@ public class RssTezFetcherTask extends CallableWithNdc<FetchResult> {
Configuration hadoopConf = getRemoteConf();
LOG.info("RssTezFetcherTask storageType:{}", storageType);
boolean expectedTaskIdsBitmapFilterEnable = serverInfoSet.size() > 1;
- CreateShuffleReadClientRequest request =
- new CreateShuffleReadClientRequest(
- applicationAttemptId.toString(),
- shuffleId,
- partition,
- basePath,
- partitionNumPerRange,
- partitionNum,
- blockIdBitmap,
- taskIdBitmap,
- new ArrayList<>(serverInfoSet),
- hadoopConf,
- new TezIdHelper(),
- expectedTaskIdsBitmapFilterEnable,
- RssTezConfig.toRssConf(this.conf));
+
ShuffleReadClient shuffleReadClient =
- ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+ ShuffleClientFactory.getInstance()
+ .createShuffleReadClient(
+ ShuffleClientFactory.newReadBuilder()
+ .appId(applicationAttemptId.toString())
+ .shuffleId(shuffleId)
+ .partitionId(partition)
+ .basePath(basePath)
+ .partitionNumPerRange(partitionNumPerRange)
+ .partitionNum(partitionNum)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(new ArrayList<>(serverInfoSet))
+ .hadoopConf(hadoopConf)
+ .idHelper(new TezIdHelper())
+ .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+ .rssConf(RssTezConfig.toRssConf(this.conf)));
RssTezFetcher fetcher =
new RssTezFetcher(
fetcherCallback,
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index 89146ae94..0528037d3 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -95,7 +95,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
@@ -1851,24 +1850,23 @@ class RssShuffleScheduler extends ShuffleScheduler {
int partitionNum = partitionToServers.size();
boolean expectedTaskIdsBitmapFilterEnable = shuffleServerInfoSet.size() > 1;
- CreateShuffleReadClientRequest request =
- new CreateShuffleReadClientRequest(
- applicationAttemptId.toString(),
- shuffleId,
- mapHost.getPartitionId(),
- basePath,
- partitionNumPerRange,
- partitionNum,
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- new TezIdHelper(),
- expectedTaskIdsBitmapFilterEnable,
- RssTezConfig.toRssConf(conf));
-
ShuffleReadClient shuffleReadClient =
- ShuffleClientFactory.getInstance().createShuffleReadClient(request);
+ ShuffleClientFactory.getInstance()
+ .createShuffleReadClient(
+ ShuffleClientFactory.newReadBuilder()
+ .appId(applicationAttemptId.toString())
+ .shuffleId(shuffleId)
+ .partitionId(mapHost.getPartitionId())
+ .basePath(basePath)
+ .partitionNumPerRange(partitionNumPerRange)
+ .partitionNum(partitionNum)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(shuffleServerInfoList)
+ .hadoopConf(hadoopConf)
+ .idHelper(new TezIdHelper())
+ .expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+ .rssConf(RssTezConfig.toRssConf(conf)));
RssTezShuffleDataFetcher fetcher =
new RssTezShuffleDataFetcher(
partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).iterator().next(),
diff --git a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index cdabf0533..6baccc05b 100644
--- a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -17,12 +17,19 @@
package org.apache.uniffle.client.factory;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
-import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.IdHelper;
public class ShuffleClientFactory {
@@ -41,22 +48,8 @@ public class ShuffleClientFactory {
return builder.build();
}
- public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest request) {
- return new ShuffleReadClientImpl(
- request.getAppId(),
- request.getShuffleId(),
- request.getPartitionId(),
- request.getPartitionNumPerRange(),
- request.getPartitionNum(),
- request.getBasePath(),
- request.getBlockIdBitmap(),
- request.getTaskIdBitmap(),
- request.getShuffleServerInfoList(),
- request.getHadoopConf(),
- request.getIdHelper(),
- request.getShuffleDataDistributionType(),
- request.isExpectedTaskIdsBitmapFilterEnable(),
- request.getRssConf());
+ public ShuffleReadClient createShuffleReadClient(ReadClientBuilder builder) {
+ return builder.build();
}
public static class WriteClientBuilder {
@@ -198,7 +191,202 @@ public class ShuffleClientFactory {
}
}
+ public static class ReadClientBuilder {
+ private String appId;
+ private int shuffleId;
+ private int partitionId;
+ private String basePath;
+ private int partitionNumPerRange;
+ private int partitionNum;
+ private Roaring64NavigableMap blockIdBitmap;
+ private Roaring64NavigableMap taskIdBitmap;
+ private List<ShuffleServerInfo> shuffleServerInfoList;
+ private Configuration hadoopConf;
+ private IdHelper idHelper;
+ private ShuffleDataDistributionType shuffleDataDistributionType;
+ private boolean expectedTaskIdsBitmapFilterEnable;
+ private RssConf rssConf;
+ private boolean offHeapEnable;
+ private String storageType;
+ private int indexReadLimit;
+ private long readBufferSize;
+
+ public ReadClientBuilder appId(String appId) {
+ this.appId = appId;
+ return this;
+ }
+
+ public ReadClientBuilder shuffleId(int shuffleId) {
+ this.shuffleId = shuffleId;
+ return this;
+ }
+
+ public ReadClientBuilder partitionId(int partitionId) {
+ this.partitionId = partitionId;
+ return this;
+ }
+
+ public ReadClientBuilder basePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ public ReadClientBuilder partitionNumPerRange(int partitionNumPerRange) {
+ this.partitionNumPerRange = partitionNumPerRange;
+ return this;
+ }
+
+ public ReadClientBuilder partitionNum(int partitionNum) {
+ this.partitionNum = partitionNum;
+ return this;
+ }
+
+ public ReadClientBuilder blockIdBitmap(Roaring64NavigableMap blockIdBitmap) {
+ this.blockIdBitmap = blockIdBitmap;
+ return this;
+ }
+
+ public ReadClientBuilder taskIdBitmap(Roaring64NavigableMap taskIdBitmap) {
+ this.taskIdBitmap = taskIdBitmap;
+ return this;
+ }
+
+ public ReadClientBuilder shuffleServerInfoList(List<ShuffleServerInfo> shuffleServerInfoList) {
+ this.shuffleServerInfoList = shuffleServerInfoList;
+ return this;
+ }
+
+ public ReadClientBuilder hadoopConf(Configuration hadoopConf) {
+ this.hadoopConf = hadoopConf;
+ return this;
+ }
+
+ public ReadClientBuilder idHelper(IdHelper idHelper) {
+ this.idHelper = idHelper;
+ return this;
+ }
+
+ public ReadClientBuilder shuffleDataDistributionType(
+ ShuffleDataDistributionType shuffleDataDistributionType) {
+ this.shuffleDataDistributionType = shuffleDataDistributionType;
+ return this;
+ }
+
+ public ReadClientBuilder expectedTaskIdsBitmapFilterEnable(
+ boolean expectedTaskIdsBitmapFilterEnable) {
+ this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
+ return this;
+ }
+
+ public ReadClientBuilder rssConf(RssConf rssConf) {
+ this.rssConf = rssConf;
+ return this;
+ }
+
+ public ReadClientBuilder offHeapEnable(boolean offHeapEnable) {
+ this.offHeapEnable = offHeapEnable;
+ return this;
+ }
+
+ public ReadClientBuilder storageType(String storageType) {
+ this.storageType = storageType;
+ return this;
+ }
+
+ public ReadClientBuilder indexReadLimit(int indexReadLimit) {
+ this.indexReadLimit = indexReadLimit;
+ return this;
+ }
+
+ public ReadClientBuilder readBufferSize(long readBufferSize) {
+ this.readBufferSize = readBufferSize;
+ return this;
+ }
+
+ public ReadClientBuilder() {}
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public int getShuffleId() {
+ return shuffleId;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public int getPartitionNumPerRange() {
+ return partitionNumPerRange;
+ }
+
+ public int getPartitionNum() {
+ return partitionNum;
+ }
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ public Roaring64NavigableMap getBlockIdBitmap() {
+ return blockIdBitmap;
+ }
+
+ public Roaring64NavigableMap getTaskIdBitmap() {
+ return taskIdBitmap;
+ }
+
+ public List<ShuffleServerInfo> getShuffleServerInfoList() {
+ return shuffleServerInfoList;
+ }
+
+ public Configuration getHadoopConf() {
+ return hadoopConf;
+ }
+
+ public IdHelper getIdHelper() {
+ return idHelper;
+ }
+
+ public ShuffleDataDistributionType getShuffleDataDistributionType() {
+ return shuffleDataDistributionType;
+ }
+
+ public boolean isExpectedTaskIdsBitmapFilterEnable() {
+ return expectedTaskIdsBitmapFilterEnable;
+ }
+
+ public RssConf getRssConf() {
+ return rssConf;
+ }
+
+ public boolean isOffHeapEnable() {
+ return offHeapEnable;
+ }
+
+ public String getStorageType() {
+ return storageType;
+ }
+
+ public int getIndexReadLimit() {
+ return indexReadLimit;
+ }
+
+ public long getReadBufferSize() {
+ return readBufferSize;
+ }
+
+ public ShuffleReadClientImpl build() {
+ return new ShuffleReadClientImpl(this);
+ }
+ }
+
public static WriteClientBuilder newWriteBuilder() {
return new WriteClientBuilder();
}
+
+ public static ReadClientBuilder newReadBuilder() {
+ return new ReadClientBuilder();
+ }
}
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index f7986d574..24fb20609 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -31,7 +31,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
@@ -64,131 +66,82 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
private ClientReadHandler clientReadHandler;
private IdHelper idHelper;
- public ShuffleReadClientImpl(
- String appId,
- int shuffleId,
- int partitionId,
- int partitionNumPerRange,
- int partitionNum,
- String storageBasePath,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- IdHelper idHelper,
- ShuffleDataDistributionType dataDistributionType,
- boolean expectedTaskIdsBitmapFilterEnable,
- RssConf rssConf) {
- final int indexReadLimit = rssConf.get(RssClientConf.RSS_INDEX_READ_LIMIT);
- final String storageType = rssConf.get(RssClientConf.RSS_STORAGE_TYPE);
- long readBufferSize =
- rssConf.getSizeAsBytes(
- RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key(),
- RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.defaultValue());
- if (readBufferSize > Integer.MAX_VALUE) {
- LOG.warn(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key() + " can support 2g as max");
- readBufferSize = Integer.MAX_VALUE;
+ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
+ // add default value
+ if (builder.getIdHelper() == null) {
+ builder.idHelper(new DefaultIdHelper());
}
- boolean offHeapEnabled = rssConf.get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);
- init(
- storageType,
- appId,
- shuffleId,
- partitionId,
- indexReadLimit,
- partitionNumPerRange,
- partitionNum,
- (int) readBufferSize,
- storageBasePath,
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- idHelper,
- dataDistributionType,
- expectedTaskIdsBitmapFilterEnable,
- offHeapEnabled,
- rssConf);
- }
+ if (builder.getShuffleDataDistributionType() == null) {
+ builder.shuffleDataDistributionType(ShuffleDataDistributionType.NORMAL);
+ }
+ if (builder.getHadoopConf() == null) {
+ builder.hadoopConf(new Configuration());
+ }
+ if (builder.getRssConf() != null) {
+ final int indexReadLimit = builder.getRssConf().get(RssClientConf.RSS_INDEX_READ_LIMIT);
+ final String storageType = builder.getRssConf().get(RssClientConf.RSS_STORAGE_TYPE);
+ long readBufferSize =
+ builder
+ .getRssConf()
+ .getSizeAsBytes(
+ RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key(),
+ RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.defaultValue());
+ if (readBufferSize > Integer.MAX_VALUE) {
+ LOG.warn(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key() + " can support 2g as max");
+ readBufferSize = Integer.MAX_VALUE;
+ }
+ boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);
+
+ builder.indexReadLimit(indexReadLimit);
+ builder.storageType(storageType);
+ builder.readBufferSize(readBufferSize);
+ builder.offHeapEnable(offHeapEnabled);
+ } else {
+ // most for test
+ RssConf rssConf = new RssConf();
+ rssConf.set(RssClientConf.RSS_STORAGE_TYPE, builder.getStorageType());
+ rssConf.set(RssClientConf.RSS_INDEX_READ_LIMIT, builder.getIndexReadLimit());
+ rssConf.set(
+ RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE, String.valueOf(builder.getReadBufferSize()));
- public ShuffleReadClientImpl(
- String appId,
- int shuffleId,
- int partitionId,
- int partitionNumPerRange,
- int partitionNum,
- String storageBasePath,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- IdHelper idHelper,
- RssConf rssConf) {
- this(
- appId,
- shuffleId,
- partitionId,
- partitionNumPerRange,
- partitionNum,
- storageBasePath,
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- idHelper,
- ShuffleDataDistributionType.NORMAL,
- false,
- rssConf);
+ builder.rssConf(rssConf);
+ builder.offHeapEnable(false);
+ builder.expectedTaskIdsBitmapFilterEnable(false);
+ }
+
+ init(builder);
}
- private void init(
- String storageType,
- String appId,
- int shuffleId,
- int partitionId,
- int indexReadLimit,
- int partitionNumPerRange,
- int partitionNum,
- int readBufferSize,
- String storageBasePath,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- IdHelper idHelper,
- ShuffleDataDistributionType dataDistributionType,
- boolean expectedTaskIdsBitmapFilterEnable,
- boolean offHeapEnabled,
- RssConf rssConf) {
- this.shuffleId = shuffleId;
- this.partitionId = partitionId;
- this.blockIdBitmap = blockIdBitmap;
- this.taskIdBitmap = taskIdBitmap;
- this.idHelper = idHelper;
- this.shuffleServerInfoList = shuffleServerInfoList;
+ private void init(ShuffleClientFactory.ReadClientBuilder builder) {
+ this.shuffleId = builder.getShuffleId();
+ this.partitionId = builder.getPartitionId();
+ this.blockIdBitmap = builder.getBlockIdBitmap();
+ this.taskIdBitmap = builder.getTaskIdBitmap();
+ this.idHelper = builder.getIdHelper();
+ this.shuffleServerInfoList = builder.getShuffleServerInfoList();
CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest();
- request.setStorageType(storageType);
- request.setAppId(appId);
+ request.setStorageType(builder.getStorageType());
+ request.setAppId(builder.getAppId());
request.setShuffleId(shuffleId);
request.setPartitionId(partitionId);
- request.setIndexReadLimit(indexReadLimit);
- request.setPartitionNumPerRange(partitionNumPerRange);
- request.setPartitionNum(partitionNum);
- request.setReadBufferSize((int) readBufferSize);
- request.setStorageBasePath(storageBasePath);
+ request.setIndexReadLimit(builder.getIndexReadLimit());
+ request.setPartitionNumPerRange(builder.getPartitionNumPerRange());
+ request.setPartitionNum(builder.getPartitionNum());
+ request.setReadBufferSize((int) builder.getReadBufferSize());
+ request.setStorageBasePath(builder.getBasePath());
request.setShuffleServerInfoList(shuffleServerInfoList);
- request.setHadoopConf(hadoopConf);
+ request.setHadoopConf(builder.getHadoopConf());
request.setExpectBlockIds(blockIdBitmap);
request.setProcessBlockIds(processedBlockIds);
- request.setDistributionType(dataDistributionType);
+ request.setDistributionType(builder.getShuffleDataDistributionType());
request.setIdHelper(idHelper);
request.setExpectTaskIds(taskIdBitmap);
- request.setClientConf(rssConf);
- if (expectedTaskIdsBitmapFilterEnable) {
+ request.setClientConf(builder.getRssConf());
+ if (builder.isExpectedTaskIdsBitmapFilterEnable()) {
request.useExpectedTaskIdsBitmapFilter();
}
- if (offHeapEnabled) {
+ if (builder.isOffHeapEnable()) {
request.enableOffHeap();
}
@@ -210,46 +163,6 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
clientReadHandler = ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
}
- public ShuffleReadClientImpl(
- String storageType,
- String appId,
- int shuffleId,
- int partitionId,
- int indexReadLimit,
- int partitionNumPerRange,
- int partitionNum,
- int readBufferSize,
- String storageBasePath,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- IdHelper idHelper) {
- RssConf rssConf = new RssConf();
- rssConf.set(RssClientConf.RSS_STORAGE_TYPE, storageType);
- rssConf.set(RssClientConf.RSS_INDEX_READ_LIMIT, indexReadLimit);
- rssConf.set(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE, String.valueOf(readBufferSize));
- init(
- storageType,
- appId,
- shuffleId,
- partitionId,
- indexReadLimit,
- partitionNumPerRange,
- partitionNum,
- readBufferSize,
- storageBasePath,
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- idHelper,
- ShuffleDataDistributionType.NORMAL,
- false,
- false,
- rssConf);
- }
-
@Override
public CompressedShuffleBlock readShuffleBlockData() {
// empty data expected, just return null
diff --git a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
deleted file mode 100644
index 97a568179..000000000
--- a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.client.request;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-
-import org.apache.uniffle.client.util.DefaultIdHelper;
-import org.apache.uniffle.common.ShuffleDataDistributionType;
-import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.config.RssConf;
-import org.apache.uniffle.common.util.IdHelper;
-
-public class CreateShuffleReadClientRequest {
-
- private String appId;
- private int shuffleId;
- private int partitionId;
- private String basePath;
- private int partitionNumPerRange;
- private int partitionNum;
- private Roaring64NavigableMap blockIdBitmap;
- private Roaring64NavigableMap taskIdBitmap;
- private List<ShuffleServerInfo> shuffleServerInfoList;
- private Configuration hadoopConf;
- private IdHelper idHelper;
- private ShuffleDataDistributionType shuffleDataDistributionType =
- ShuffleDataDistributionType.NORMAL;
- private boolean expectedTaskIdsBitmapFilterEnable = false;
- private RssConf rssConf;
-
- public CreateShuffleReadClientRequest(
- String appId,
- int shuffleId,
- int partitionId,
- String basePath,
- int partitionNumPerRange,
- int partitionNum,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- ShuffleDataDistributionType dataDistributionType,
- boolean expectedTaskIdsBitmapFilterEnable,
- RssConf rssConf) {
- this(
- appId,
- shuffleId,
- partitionId,
- basePath,
- partitionNumPerRange,
- partitionNum,
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- new DefaultIdHelper(),
- expectedTaskIdsBitmapFilterEnable,
- rssConf);
- this.shuffleDataDistributionType = dataDistributionType;
- }
-
- public CreateShuffleReadClientRequest(
- String appId,
- int shuffleId,
- int partitionId,
- String basePath,
- int partitionNumPerRange,
- int partitionNum,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- IdHelper idHelper,
- boolean expectedTaskIdsBitmapFilterEnable,
- RssConf rssConf) {
- this(
- appId,
- shuffleId,
- partitionId,
- basePath,
- partitionNumPerRange,
- partitionNum,
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- idHelper,
- expectedTaskIdsBitmapFilterEnable);
- this.rssConf = rssConf;
- }
-
- public CreateShuffleReadClientRequest(
- String appId,
- int shuffleId,
- int partitionId,
- String basePath,
- int partitionNumPerRange,
- int partitionNum,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- IdHelper idHelper,
- boolean expectedTaskIdsBitmapFilterEnable) {
- this.appId = appId;
- this.shuffleId = shuffleId;
- this.partitionId = partitionId;
- this.basePath = basePath;
- this.partitionNumPerRange = partitionNumPerRange;
- this.partitionNum = partitionNum;
- this.blockIdBitmap = blockIdBitmap;
- this.taskIdBitmap = taskIdBitmap;
- this.shuffleServerInfoList = shuffleServerInfoList;
- this.hadoopConf = hadoopConf;
- this.idHelper = idHelper;
- this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
- }
-
- public CreateShuffleReadClientRequest(
- String appId,
- int shuffleId,
- int partitionId,
- String basePath,
- int partitionNumPerRange,
- int partitionNum,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- boolean expectedTaskIdsBitmapFilterEnable,
- RssConf rssConf) {
- this(
- appId,
- shuffleId,
- partitionId,
- basePath,
- partitionNumPerRange,
- partitionNum,
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfoList,
- hadoopConf,
- new DefaultIdHelper(),
- expectedTaskIdsBitmapFilterEnable);
- this.rssConf = rssConf;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public int getShuffleId() {
- return shuffleId;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public int getPartitionNumPerRange() {
- return partitionNumPerRange;
- }
-
- public int getPartitionNum() {
- return partitionNum;
- }
-
- public String getBasePath() {
- return basePath;
- }
-
- public Roaring64NavigableMap getBlockIdBitmap() {
- return blockIdBitmap;
- }
-
- public Roaring64NavigableMap getTaskIdBitmap() {
- return taskIdBitmap;
- }
-
- public List<ShuffleServerInfo> getShuffleServerInfoList() {
- return shuffleServerInfoList;
- }
-
- public Configuration getHadoopConf() {
- return hadoopConf;
- }
-
- public IdHelper getIdHelper() {
- return idHelper;
- }
-
- public ShuffleDataDistributionType getShuffleDataDistributionType() {
- return shuffleDataDistributionType;
- }
-
- public boolean isExpectedTaskIdsBitmapFilterEnable() {
- return expectedTaskIdsBitmapFilterEnable;
- }
-
- public RssConf getRssConf() {
- return rssConf;
- }
-}
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 8ee0e9407..1b19ef084 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
@@ -35,8 +34,8 @@ import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.ChecksumUtils;
@@ -59,6 +58,19 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1", 0);
private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2", 0);
+ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+ return ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.HDFS.name())
+ .appId("appId")
+ .shuffleId(0)
+ .partitionId(1)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .shuffleServerInfoList(Lists.newArrayList(ssi1));
+ }
+
@Test
public void readTest1() throws Exception {
String basePath = HDFS_URI + "clientReadTest1";
@@ -69,24 +81,12 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
-
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
@@ -94,21 +94,14 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
blockIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
try {
// can't find all expected block id, data loss
@@ -136,22 +129,13 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
writeTestData(writeHandler2, 2, 30, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1, ssi2),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
+ .build();
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
@@ -203,22 +187,13 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
conf);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1, ssi2),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
+ .build();
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
@@ -236,21 +211,12 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
Path dataFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() + "_0.data");
// data file is deleted after readClient checkExpectedBlockIds
fs.delete(dataFile, true);
@@ -282,23 +248,13 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
-
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
// index file is deleted after iterator initialization, it should be ok, all index infos are
// read already
Path indexFile = new Path(basePath + "/appId/0/0-1/" + ssi1.getId() + "_0.index");
@@ -324,39 +280,22 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
writeTestData(writeHandler, 10, 30, 0, expectedData2, blockIdBitmap2);
writeTestData(writeHandler, 10, 30, 0, expectedData1, blockIdBitmap1);
-
ShuffleReadClientImpl readClient1 =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 0,
- 100,
- 2,
- 10,
- 100,
- basePath,
- blockIdBitmap1,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .partitionId(0)
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap1)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
+
final ShuffleReadClientImpl readClient2 =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 100,
- basePath,
- blockIdBitmap2,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap2)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient1, expectedData1);
readClient1.checkProcessedBlockIds();
readClient1.close();
@@ -376,39 +315,21 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
-
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
ShuffleReadClientImpl readClient2 =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1, ssi2),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
+ .build();
// crc32 is incorrect
try (MockedStatic<ChecksumUtils> checksumUtilsMock = Mockito.mockStatic(ChecksumUtils.class)) {
checksumUtilsMock.when(() -> ChecksumUtils.getCrc32((ByteBuffer) any())).thenReturn(-1L);
@@ -433,21 +354,12 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
public void readTest9() {
// empty data
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- "basePath",
- Roaring64NavigableMap.bitmapOf(),
- Roaring64NavigableMap.bitmapOf(),
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .partitionNumPerRange(2)
+ .basePath("basePath")
+ .blockIdBitmap(Roaring64NavigableMap.bitmapOf())
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf())
+ .build();
assertNull(readClient.readShuffleBlockData());
readClient.checkProcessedBlockIds();
}
@@ -469,21 +381,13 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
}
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 0,
- 100,
- 2,
- 10,
- 100,
- basePath,
- wrongBlockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .partitionId(0)
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(wrongBlockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
assertNull(readClient.readShuffleBlockData());
try {
readClient.checkProcessedBlockIds();
@@ -503,109 +407,58 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 10, 30, 0, expectedData, blockIdBitmap);
-
// test with different indexReadLimit to validate result
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 1,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .indexReadLimit(1)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 2,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .indexReadLimit(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 3,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .indexReadLimit(3)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 10,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .indexReadLimit(10)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 11,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .indexReadLimit(11)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
@@ -626,22 +479,11 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
readClient.checkProcessedBlockIds();
@@ -666,22 +508,11 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
assertEquals(20, readClient.getProcessedBlockIds().getLongCardinality());
readClient.checkProcessedBlockIds();
@@ -703,22 +534,11 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
readClient.checkProcessedBlockIds();
@@ -742,22 +562,11 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {
writeTestData(writeHandler, 5, 30, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- "appId",
- 0,
- 1,
- 100,
- 1,
- 10,
- 1000,
- basePath,
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(ssi1),
- new Configuration(),
- new DefaultIdHelper());
-
+ baseReadBuilder()
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
TestUtils.validateResult(readClient, expectedData);
assertEquals(25, readClient.getProcessedBlockIds().getLongCardinality());
readClient.checkProcessedBlockIds();
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 70697d511..d55317cf7 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -36,13 +36,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -112,21 +112,21 @@ public class DiskErrorToleranceTest extends ShuffleReadWriteBase {
RssFinishShuffleRequest rf1 = new RssFinishShuffleRequest(appId, 0);
shuffleServerClient.finishShuffle(rf1);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- appId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- null,
- blockIdBitmap1,
- Roaring64NavigableMap.bitmapOf(1),
- shuffleServerInfo,
- conf,
- new DefaultIdHelper());
+ ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.LOCALFILE.name())
+ .appId(appId)
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .basePath(null)
+ .blockIdBitmap(blockIdBitmap1)
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(1))
+ .shuffleServerInfoList(shuffleServerInfo)
+ .hadoopConf(conf)
+ .build();
validateResult(readClient, expectedData);
File shuffleData = new File(data2, appId);
@@ -153,21 +153,21 @@ public class DiskErrorToleranceTest extends ShuffleReadWriteBase {
shuffleServerClient.finishShuffle(rf1);
readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- appId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- null,
- blockIdBitmap2,
- Roaring64NavigableMap.bitmapOf(2),
- shuffleServerInfo,
- conf,
- new DefaultIdHelper());
+ ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.LOCALFILE.name())
+ .appId(appId)
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .basePath(null)
+ .blockIdBitmap(blockIdBitmap2)
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(2))
+ .shuffleServerInfoList(shuffleServerInfo)
+ .hadoopConf(conf)
+ .build();
validateResult(readClient, expectedData);
shuffleData = new File(data1, appId);
assertTrue(shuffleData.exists());
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
index bde7222d2..d2b27b952 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
@@ -39,7 +40,6 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -122,21 +122,22 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa
Roaring64NavigableMap taskBitmap,
Map<Long, byte[]> expectedData) {
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE_HDFS.name(),
- appId,
- shuffleId,
- partitionId,
- 100,
- 1,
- 10,
- 1000,
- REMOTE_STORAGE,
- blockBitmap,
- taskBitmap,
- Lists.newArrayList(new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT)),
- conf,
- new DefaultIdHelper());
+ ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.LOCALFILE_HDFS.name())
+ .appId(appId)
+ .shuffleId(shuffleId)
+ .partitionId(partitionId)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .basePath(REMOTE_STORAGE)
+ .blockIdBitmap(blockBitmap)
+ .taskIdBitmap(taskBitmap)
+ .shuffleServerInfoList(
+ Lists.newArrayList(new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT)))
+ .hadoopConf(conf)
+ .build();
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
while (csb != null && csb.getByteBuffer() != null) {
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index f653103fd..03acbc74f 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -37,7 +37,6 @@ import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.response.SendShuffleDataResult;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
@@ -72,6 +71,17 @@ public class QuorumTest extends ShuffleReadWriteBase {
private static ShuffleServerInfo fakedShuffleServerInfo4;
private MockedShuffleWriteClientImpl shuffleWriteClientImpl;
+ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+ return ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.MEMORY_LOCALFILE.name())
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000);
+ }
+
public static MockedShuffleServer createServer(int id, File tmpDir) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
@@ -244,22 +254,15 @@ public class QuorumTest extends ShuffleReadWriteBase {
assertEquals(blockIdBitmap, succBlockIdBitmap);
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2))
+ .build();
// The data should be read
validateResult(readClient, expectedData);
@@ -398,21 +401,13 @@ public class QuorumTest extends ShuffleReadWriteBase {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
+ .build();
validateResult(readClient, expectedData);
}
@@ -427,7 +422,6 @@ public class QuorumTest extends ShuffleReadWriteBase {
// When 2 servers are timeout, the block sending should fail
enableTimeout((MockedShuffleServer) shuffleServers.get(1), 500);
enableTimeout((MockedShuffleServer) shuffleServers.get(2), 500);
-
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
0,
@@ -579,21 +573,13 @@ public class QuorumTest extends ShuffleReadWriteBase {
}
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
+ .build();
validateResult(readClient, expectedData);
}
@@ -768,59 +754,33 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 2
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo2))
+ .build();
assertTrue(readClient.readShuffleBlockData() == null);
// we can read blocks from server 0,1
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1))
+ .build();
validateResult(readClient, expectedData);
// we can also read blocks from server 0,1,2
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
+ .build();
validateResult(readClient, expectedData);
}
@@ -854,59 +814,33 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 1
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo1),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1))
+ .build();
assertTrue(readClient.readShuffleBlockData() == null);
// we can read blocks from server 2, which is sent in to secondary round
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo2))
+ .build();
validateResult(readClient, expectedData);
// we can read blocks from server 0,1,2
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, fakedShuffleServerInfo1, shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(
+ Lists.newArrayList(shuffleServerInfo0, fakedShuffleServerInfo1, shuffleServerInfo2))
+ .build();
validateResult(readClient, expectedData);
}
@@ -944,40 +878,23 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 3, 4
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4))
+ .build();
assertTrue(readClient.readShuffleBlockData() == null);
// we can also read blocks from server 0,1,2
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2))
+ .build();
validateResult(readClient, expectedData);
}
@@ -1016,40 +933,22 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 1 due to failures
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo1),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1))
+ .build();
assertTrue(readClient.readShuffleBlockData() == null);
// we can also read blocks from server 3,4
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4))
+ .build();
validateResult(readClient, expectedData);
}
@@ -1087,41 +986,24 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 4 because the secondary round is skipped
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo4),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo4))
+ .build();
assertTrue(readClient.readShuffleBlockData() == null);
// we can read blocks from server 0,1,2,3
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(
- shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2, shuffleServerInfo3),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(
+ Lists.newArrayList(
+ shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2, shuffleServerInfo3))
+ .build();
validateResult(readClient, expectedData);
}
@@ -1153,59 +1035,32 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we can read blocks from server 0
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo0))
+ .build();
validateResult(readClient, expectedData);
// we can also read blocks from server 1
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo1),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1))
+ .build();
validateResult(readClient, expectedData);
// we can also read blocks from server 2
readClient =
- new ShuffleReadClientImpl(
- StorageType.MEMORY_LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo2))
+ .build();
validateResult(readClient, expectedData);
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index e46e87447..eee24a3a2 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -30,7 +30,6 @@ import java.util.stream.Stream;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeAll;
@@ -39,12 +38,12 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -144,23 +143,21 @@ public class ShuffleServerConcurrentWriteOfHadoopTest extends ShuffleServerWithH
blocksBitmap.add(iterator.next());
}
});
-
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 0,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- blocksBitmap,
- Roaring64NavigableMap.bitmapOf(0),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.HDFS.name())
+ .appId(appId)
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(2)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .basePath(dataBasePath)
+ .blockIdBitmap(blocksBitmap)
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedDataList, blocksBitmap);
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
index 4ae09ba1e..590355ab1 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
@@ -23,13 +23,13 @@ import java.util.Map.Entry;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -37,7 +37,6 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -73,6 +72,17 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase {
shuffleServerClient.close();
}
+ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+ return ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.HDFS.name())
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(2)
+ .partitionNum(10)
+ .readBufferSize(1000);
+ }
+
@Test
public void hadoopWriteReadTest() {
String appId = "app_hdfs_read_write";
@@ -106,22 +116,15 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase {
RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT);
+
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 0,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[0],
- Roaring64NavigableMap.bitmapOf(0),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[0])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
assertNull(readClient.readShuffleBlockData());
shuffleServerClient.finishShuffle(rfsr);
@@ -149,75 +152,46 @@ public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase {
shuffleServerClient.finishShuffle(rfsr);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 0,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[0],
- Roaring64NavigableMap.bitmapOf(0),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[0])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedData, bitmaps[0]);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[1],
- Roaring64NavigableMap.bitmapOf(1),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .partitionId(1)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[1])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(1))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedData, bitmaps[1]);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 2,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[2],
- Roaring64NavigableMap.bitmapOf(2),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .partitionId(2)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[2])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(2))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedData, bitmaps[2]);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 3,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[3],
- Roaring64NavigableMap.bitmapOf(3),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .partitionId(3)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[3])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(3))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedData, bitmaps[3]);
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
index 60ad05358..69c11db53 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -41,7 +42,6 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.KerberizedHadoopBase;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
@@ -177,6 +177,17 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase
return partitionToBlocks;
}
+ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+ return ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.HDFS.name())
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(2)
+ .partitionNum(10)
+ .readBufferSize(1000);
+ }
+
@Test
public void hadoopWriteReadTest() throws Exception {
String alexDir = kerberizedHadoop.getSchemeAndAuthorityPrefix() + "/alex/";
@@ -229,21 +240,13 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase
ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 0,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[0],
- Roaring64NavigableMap.bitmapOf(0),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[0])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
assertNull(readClient.readShuffleBlockData());
shuffleServerClient.finishShuffle(rfsr);
@@ -271,75 +274,46 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase
shuffleServerClient.finishShuffle(rfsr);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 0,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[0],
- Roaring64NavigableMap.bitmapOf(0),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[0])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedData, bitmaps[0]);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[1],
- Roaring64NavigableMap.bitmapOf(1),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .partitionId(1)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[1])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(1))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedData, bitmaps[1]);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 2,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[2],
- Roaring64NavigableMap.bitmapOf(2),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .partitionId(2)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[2])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(2))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedData, bitmaps[2]);
readClient =
- new ShuffleReadClientImpl(
- StorageType.HDFS.name(),
- appId,
- 0,
- 3,
- 100,
- 2,
- 10,
- 1000,
- dataBasePath,
- bitmaps[3],
- Roaring64NavigableMap.bitmapOf(3),
- Lists.newArrayList(ssi),
- new Configuration(),
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(appId)
+ .partitionId(3)
+ .basePath(dataBasePath)
+ .blockIdBitmap(bitmaps[3])
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(3))
+ .shuffleServerInfoList(Lists.newArrayList(ssi))
+ .build();
validateResult(readClient, expectedData, bitmaps[3]);
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 2ad1e0492..eee83cb34 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -37,7 +37,6 @@ import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
@@ -344,21 +343,20 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
assertTrue(commitResult);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.LOCALFILE.name())
+ .appId(testAppId)
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .basePath("")
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2))
+ .build();
assertNull(readClient.readShuffleBlockData());
readClient.close();
@@ -368,21 +366,20 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
Sets.newHashSet(shuffleServerInfo1, shuffleServerInfo2), testAppId, 0, 2);
assertTrue(commitResult);
readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2),
- null,
- new DefaultIdHelper());
+ ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.LOCALFILE.name())
+ .appId(testAppId)
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .basePath("")
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2))
+ .build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index c876c937d..2eb2ffad7 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -32,13 +32,13 @@ import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -85,6 +85,18 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
shuffleServerClient.close();
}
+ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+ return ShuffleClientFactory.newReadBuilder()
+ .storageType(StorageType.LOCALFILE.name())
+ .shuffleId(0)
+ .partitionId(0)
+ .indexReadLimit(100)
+ .partitionNumPerRange(1)
+ .partitionNum(10)
+ .readBufferSize(1000)
+ .shuffleServerInfoList(shuffleServerInfo);
+ }
+
@Test
public void readTest1() {
String testAppId = "localReadTest1";
@@ -96,21 +108,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
blockIdBitmap.addLong((1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
ShuffleReadClientImpl readClient;
readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient, expectedData);
try {
// can't find all expected block id, data loss
@@ -138,21 +140,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
sendTestData(testAppId, blocks);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -177,21 +169,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
assertNull(readClient.readShuffleBlockData());
readClient.close();
}
@@ -217,37 +199,20 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
sendTestData(testAppId, blocks);
ShuffleReadClientImpl readClient1 =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 2,
- 10,
- 100,
- "",
- blockIdBitmap1,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .partitionNumPerRange(2)
+ .blockIdBitmap(blockIdBitmap1)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
final ShuffleReadClientImpl readClient2 =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 1,
- 100,
- 2,
- 10,
- 100,
- "",
- blockIdBitmap2,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .partitionId(1)
+ .partitionNumPerRange(2)
+ .blockIdBitmap(blockIdBitmap2)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient1, expectedData1);
readClient1.checkProcessedBlockIds();
readClient1.close();
@@ -261,21 +226,13 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
public void readTest5() {
String testAppId = "localReadTest5";
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 1,
- 100,
- 2,
- 10,
- 1000,
- "",
- Roaring64NavigableMap.bitmapOf(),
- Roaring64NavigableMap.bitmapOf(),
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .partitionId(1)
+ .partitionNumPerRange(2)
+ .blockIdBitmap(Roaring64NavigableMap.bitmapOf())
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf())
+ .build();
assertNull(readClient.readShuffleBlockData());
readClient.checkProcessedBlockIds();
}
@@ -299,21 +256,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
}
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 100,
- "",
- wrongBlockIdBitmap,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(wrongBlockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
assertNull(readClient.readShuffleBlockData());
try {
readClient.checkProcessedBlockIds();
@@ -344,21 +291,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -393,21 +330,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -431,43 +358,28 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
sendTestData(testAppId, blocks);
// test with un-changed expected blockId
ShuffleReadClientImpl readClient;
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(beforeAdded)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- beforeAdded,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(beforeAdded)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
// test with changed expected blockId
readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
@@ -492,22 +404,17 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
// send some expected data
blocks = createShuffleBlockList(0, 0, 1, 2, 30, expectedBlockIds, expectedData, mockSSI);
sendTestData(testAppId, blocks);
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(expectedBlockIds)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- expectedBlockIds,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(expectedBlockIds)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -546,21 +453,11 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
sendTestData(testAppId, blocks);
ShuffleReadClientImpl readClient =
- new ShuffleReadClientImpl(
- StorageType.LOCALFILE.name(),
- testAppId,
- 0,
- 0,
- 100,
- 1,
- 10,
- 1000,
- "",
- blockIdBitmap,
- taskIdBitmap,
- shuffleServerInfo,
- null,
- new DefaultIdHelper());
+ baseReadBuilder()
+ .appId(testAppId)
+ .blockIdBitmap(blockIdBitmap)
+ .taskIdBitmap(taskIdBitmap)
+ .build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();