You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/24 02:46:34 UTC
[incubator-uniffle] branch master updated: [ISSUE-123] Fix all test code style (#185)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2a026197 [ISSUE-123] Fix all test code style (#185)
2a026197 is described below
commit 2a0261970a27f51f22fb6ea221b738e261a30e74
Author: macroguo-ghy <10...@users.noreply.github.com>
AuthorDate: Wed Aug 24 10:46:30 2022 +0800
[ISSUE-123] Fix all test code style (#185)
What changes were proposed in this pull request?
To solve all module test code style in pr Tencent/Firestorm#155
Why are the changes needed?
Improve test code style
Does this PR introduce any user-facing change?
No
How was this patch tested?
All test and check-style passed
https://github.com/macroguo-ghy/incubator-uniffle/runs/7974486008?check_suite_focus=true
but im not sure if commits by other contributor violate the checkstyle...
Co-authored-by: ghy <gu...@pku.edu.cn>
---
.../hadoop/mapred/SortWriteBufferManagerTest.java | 11 +-
.../apache/hadoop/mapred/SortWriteBufferTest.java | 22 +-
.../apache/hadoop/mapreduce/RssMRUtilsTest.java | 6 +-
.../mapreduce/task/reduce/EvenFetcherTest.java | 106 ++++----
.../hadoop/mapreduce/task/reduce/FetcherTest.java | 117 ++++----
.../task/reduce/RssInMemoryRemoteMergerTest.java | 25 +-
.../task/reduce/RssRemoteMergeManagerTest.java | 43 +--
.../spark/shuffle/RssSparkShuffleUtilsTest.java | 5 +-
.../shuffle/reader/AbstractRssReaderTest.java | 25 +-
.../shuffle/reader/RssShuffleDataIteratorTest.java | 45 ++--
.../shuffle/writer/WriteBufferManagerTest.java | 20 +-
.../spark/shuffle/writer/WriteBufferTest.java | 4 +-
.../shuffle/DelegationRssShuffleManagerTest.java | 30 +--
.../spark/shuffle/reader/RssShuffleReaderTest.java | 20 +-
.../spark/shuffle/writer/RssShuffleWriterTest.java | 16 +-
.../shuffle/DelegationRssShuffleManagerTest.java | 27 +-
.../java/org/apache/spark/shuffle/TestUtils.java | 1 -
.../spark/shuffle/reader/RssShuffleReaderTest.java | 6 +-
.../spark/shuffle/writer/RssShuffleWriterTest.java | 12 +-
.../client/impl/ShuffleReadClientImplTest.java | 96 ++++---
.../client/impl/ShuffleWriteClientImplTest.java | 19 +-
.../org/apache/uniffle/common/KerberizedHdfs.java | 2 +-
.../filesystem/HadoopFilesystemProviderTest.java | 2 +-
.../common/security/HadoopSecurityContextTest.java | 2 +-
.../apache/uniffle/common/util/RetryUtilsTest.java | 2 +-
.../coordinator/SimpleClusterManagerTest.java | 1 -
.../test/AccessCandidatesCheckerHdfsTest.java | 5 +-
.../org/apache/uniffle/test/AccessClusterTest.java | 5 +-
.../test/AssignmentServerNodesNumberTest.java | 2 +-
.../uniffle/test/AssignmentWithTagsTest.java | 295 +++++++++++----------
.../uniffle/test/ClientConfManagerHdfsTest.java | 4 +-
.../test/ClientConfManagerKerberlizedHdfsTest.java | 14 +-
.../apache/uniffle/test/CoordinatorGrpcTest.java | 4 +-
.../apache/uniffle/test/CoordinatorTestBase.java | 6 +-
.../uniffle/test/DiskErrorToleranceTest.java | 4 +-
.../apache/uniffle/test/FetchClientConfTest.java | 4 +-
.../test/HealthCheckCoordinatorGrpcTest.java | 23 +-
.../apache/uniffle/test/IntegrationTestBase.java | 2 +-
.../test/MultiStorageFaultToleranceTest.java | 13 +-
.../test/PartitionBalanceCoordinatorGrpcTest.java | 11 +-
.../java/org/apache/uniffle/test/QuorumTest.java | 202 +++++++-------
.../apache/uniffle/test/ShuffleReadWriteBase.java | 37 +--
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 11 +-
.../uniffle/test/ShuffleServerWithHdfsTest.java | 19 +-
.../uniffle/test/ShuffleServerWithLocalTest.java | 11 +-
.../test/ShuffleServerWithMemLocalHdfsTest.java | 45 ++--
.../uniffle/test/ShuffleServerWithMemoryTest.java | 36 +--
.../uniffle/test/ShuffleWithRssClientTest.java | 16 +-
.../org/apache/uniffle/test/FailoverAppMaster.java | 1 +
.../apache/uniffle/test/MRIntegrationTestBase.java | 14 +-
.../org/apache/uniffle/test/WordCountTest.java | 1 +
.../org/apache/uniffle/test/AutoAccessTest.java | 9 +-
.../org/apache/uniffle/test/CombineByKeyTest.java | 1 +
.../uniffle/test/DynamicFetchClientConfTest.java | 2 +-
.../org/apache/uniffle/test/GroupByKeyTest.java | 2 +-
.../org/apache/uniffle/test/RepartitionTest.java | 1 +
.../uniffle/test/RepartitionWithMemoryRssTest.java | 2 +-
.../uniffle/test/SparkClientWithLocalTest.java | 26 +-
.../uniffle/test/SparkIntegrationTestBase.java | 4 +-
.../java/org/apache/uniffle/test/SparkSQLTest.java | 2 +-
.../test/SparkSQLWithDelegationShuffleManager.java | 3 +-
...arkSQLWithDelegationShuffleManagerFallback.java | 3 +-
.../java/org/apache/uniffle/test/TestUtils.java | 9 +-
.../org/apache/uniffle/test/GetReaderTest.java | 9 +-
.../apache/uniffle/test/AQERepartitionTest.java | 11 +-
.../org/apache/uniffle/test/AQESkewedJoinTest.java | 13 +-
.../org/apache/uniffle/test/GetReaderTest.java | 29 +-
pom.xml | 8 +-
.../apache/uniffle/server/HealthyMockChecker.java | 15 +-
.../apache/uniffle/server/MockedGrpcServer.java | 2 +
.../uniffle/server/MockedShuffleServerFactory.java | 1 +
.../server/MockedShuffleServerGrpcService.java | 5 +-
.../ShuffleFlushManagerOnKerberizedHdfsTest.java | 2 +-
.../uniffle/server/ShuffleFlushManagerTest.java | 25 +-
.../uniffle/server/ShuffleServerMetricsTest.java | 1 -
.../uniffle/server/ShuffleTaskManagerTest.java | 17 +-
.../apache/uniffle/server/StorageCheckerTest.java | 16 +-
.../uniffle/server/UnHealthyMockChecker.java | 1 +
.../uniffle/server/buffer/BufferTestBase.java | 2 +-
.../server/buffer/ShuffleBufferManagerTest.java | 8 +-
.../uniffle/server/buffer/ShuffleBufferTest.java | 19 +-
.../server/storage/MultiStorageManagerTest.java | 5 +-
.../storage/handler/impl/HdfsHandlerTest.java | 1 -
83 files changed, 878 insertions(+), 826 deletions(-)
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 76138838..3574c9c3 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -24,7 +24,6 @@ import java.util.Set;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparator;
@@ -35,6 +34,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -309,17 +309,20 @@ public class SortWriteBufferManagerTest {
}
@Override
- public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId, int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
+ public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
+ int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
}
@Override
- public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum, int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
+ public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum,
+ int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
return null;
}
@Override
- public Roaring64NavigableMap getShuffleResult(String clientType, Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int partitionId) {
+ public Roaring64NavigableMap getShuffleResult(String clientType, Set<ShuffleServerInfo> shuffleServerInfoSet,
+ String appId, int shuffleId, int partitionId) {
return null;
}
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java
index 52fd71cd..b36e3014 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferTest.java
@@ -17,6 +17,12 @@
package org.apache.hadoop.mapred;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
@@ -27,13 +33,6 @@ import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.junit.jupiter.api.Test;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Random;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
public class SortWriteBufferTest {
@@ -90,13 +89,13 @@ public class SortWriteBufferTest {
keySerializer.serialize(key);
byte[] valueBytes = new byte[200];
Map<String, BytesWritable> valueMap = Maps.newConcurrentMap();
- Map<String, Long> recordLenMap = Maps.newConcurrentMap();
Random random = new Random();
random.nextBytes(valueBytes);
value = new BytesWritable(valueBytes);
valueMap.putIfAbsent(keyStr, value);
valSerializer.serialize(value);
recordLength = buffer.addRecord(key, value);
+ Map<String, Long> recordLenMap = Maps.newConcurrentMap();
recordLenMap.putIfAbsent(keyStr, recordLength);
keyStr = "key1";
@@ -114,12 +113,11 @@ public class SortWriteBufferTest {
bigKey[1] = 'e';
bigKey[2] = 'y';
bigKey[3] = '4';
- BytesWritable bigWritableKey = new BytesWritable(bigKey);
+ final BytesWritable bigWritableKey = new BytesWritable(bigKey);
valueBytes = new byte[253];
random.nextBytes(valueBytes);
- BytesWritable bigWritableValue = new BytesWritable(valueBytes);
- long bigRecordLength = buffer.addRecord(bigWritableKey, bigWritableValue);
-
+ final BytesWritable bigWritableValue = new BytesWritable(valueBytes);
+ final long bigRecordLength = buffer.addRecord(bigWritableKey, bigWritableValue);
keyStr = "key2";
key = new BytesWritable(keyStr.getBytes());
valueBytes = new byte[3100];
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index 6d039312..cf3fe781 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -33,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RssMRUtilsTest {
@Test
- public void TaskAttemptIdTest() {
+ public void baskAttemptIdTest() {
long taskAttemptId = 0x1000ad12;
JobID jobID = new JobID();
TaskID taskId = new TaskID(jobID, TaskType.MAP, (int) taskAttemptId);
@@ -61,7 +61,7 @@ public class RssMRUtilsTest {
}
@Test
- public void BlockConvertTest() {
+ public void blockConvertTest() {
JobID jobID = new JobID();
TaskID taskId = new TaskID(jobID, TaskType.MAP, 233);
TaskAttemptID taskAttemptID = new TaskAttemptID(taskId, 1);
@@ -76,7 +76,7 @@ public class RssMRUtilsTest {
@Test
public void applyDynamicClientConfTest() {
- JobConf conf = new JobConf();
+ final JobConf conf = new JobConf();
Map<String, String> clientConf = Maps.newHashMap();
String remoteStoragePath = "hdfs://path1";
String mockKey = "mapreduce.mockKey";
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EvenFetcherTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EvenFetcherTest.java
index 530f7fa0..d2a03f39 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EvenFetcherTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/EvenFetcherTest.java
@@ -23,11 +23,11 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
-import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.RssMRUtils;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.jupiter.api.Test;
@@ -50,11 +50,11 @@ public class EvenFetcherTest {
jobConf.setNumMapTasks(mapTaskNum);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
- eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
- .thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum));
+ eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum));
RssEventFetcher ef =
- new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
+ new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(
@@ -74,14 +74,14 @@ public class EvenFetcherTest {
jobConf.setNumMapTasks(mapTaskNum);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
- eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
- .thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum,
+ eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum,
Sets.newHashSet(70, 80, 90),
Sets.newHashSet(),
Sets.newHashSet()));
RssEventFetcher ef =
- new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
+ new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(
@@ -102,18 +102,18 @@ public class EvenFetcherTest {
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
- eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
- .thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
+ eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
- eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
- .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
+ eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
- eq(MAX_EVENTS_TO_FETCH * 2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
- .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH * 2, 3));
+ eq(MAX_EVENTS_TO_FETCH * 2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH * 2, 3));
RssEventFetcher ef =
- new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
+ new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
@@ -134,12 +134,12 @@ public class EvenFetcherTest {
jobConf.setNumMapTasks(mapTaskNum);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
- eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
- .thenReturn(getInconsistentCompletionEventsUpdate(0, mapTaskNum,
+ eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getInconsistentCompletionEventsUpdate(0, mapTaskNum,
Sets.newHashSet(45, 67), Sets.newHashSet()));
RssEventFetcher ef =
- new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
+ new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(
@@ -151,7 +151,7 @@ public class EvenFetcherTest {
ef.fetchAllRssTaskIds();
fail();
} catch (Exception e) {
- assert(e.getMessage()
+ assert (e.getMessage()
.contains("TaskAttemptIDs are inconsistent with map tasks"));
}
}
@@ -164,12 +164,12 @@ public class EvenFetcherTest {
jobConf.setNumMapTasks(mapTaskNum);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
- eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
- .thenReturn(getInconsistentCompletionEventsUpdate(0, mapTaskNum,
+ eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getInconsistentCompletionEventsUpdate(0, mapTaskNum,
Sets.newHashSet(), Sets.newHashSet(101)));
RssEventFetcher ef =
- new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
+ new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(
@@ -181,7 +181,7 @@ public class EvenFetcherTest {
ef.fetchAllRssTaskIds();
fail();
} catch (Exception e) {
- assert(e.getMessage()
+ assert (e.getMessage()
.contains("TaskAttemptIDs are inconsistent with map tasks"));
}
}
@@ -197,14 +197,14 @@ public class EvenFetcherTest {
Set<Integer> tipFailed = Sets.newHashSet(89);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
- eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
- .thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum,
+ eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum,
Sets.newHashSet(), obsoleted, tipFailed));
ExceptionReporter reporter = mock(ExceptionReporter.class);
RssEventFetcher ef =
- new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
+ new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
@@ -227,21 +227,23 @@ public class EvenFetcherTest {
private void validate(Roaring64NavigableMap expected, Roaring64NavigableMap actual) {
assert (expected.getLongCardinality() == actual.getLongCardinality());
- actual.forEach(taskId -> { assert(expected.contains(taskId)); });
+ actual.forEach(taskId -> {
+ assert (expected.contains(taskId));
+ });
}
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
- int startIdx, int numEvents) {
+ int startIdx, int numEvents) {
return getMockedCompletionEventsUpdate(startIdx, numEvents,
Sets.newHashSet(), Sets.newHashSet(), Sets.newHashSet());
}
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
- int startIdx,
- int numEvents,
- Set<Integer> repeatedSuccEvents,
- Set<Integer> obsoletedEvents,
- Set<Integer> tipFailedEvents) {
+ int startIdx,
+ int numEvents,
+ Set<Integer> repeatedSuccEvents,
+ Set<Integer> obsoletedEvents,
+ Set<Integer> tipFailedEvents) {
ArrayList<TaskCompletionEvent> tceList = new ArrayList<org.apache.hadoop.mapred.TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
@@ -252,34 +254,34 @@ public class EvenFetcherTest {
status = TaskCompletionEvent.Status.TIPFAILED;
}
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
- new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
- eventIdx, true, status,
- "http://somehost:8888");
+ new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
+ eventIdx, true, status,
+ "http://somehost:8888");
tceList.add(tce);
}
obsoletedEvents.forEach(i -> {
TaskCompletionEvent tce = new TaskCompletionEvent(tceList.size(),
- new TaskAttemptID("12345", 1, TaskType.MAP, i, 0),
- i, true, TaskCompletionEvent.Status.OBSOLETE,
- "http://somehost:8888");
+ new TaskAttemptID("12345", 1, TaskType.MAP, i, 0),
+ i, true, TaskCompletionEvent.Status.OBSOLETE,
+ "http://somehost:8888");
tceList.add(tce);
});
// use new attempt number - 1
repeatedSuccEvents.forEach(i -> {
TaskCompletionEvent tce = new TaskCompletionEvent(tceList.size(),
- new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
- i, true, TaskCompletionEvent.Status.SUCCEEDED,
- "http://somehost:8888");
+ new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
+ i, true, TaskCompletionEvent.Status.SUCCEEDED,
+ "http://somehost:8888");
tceList.add(tce);
});
// use new attempt number - 1
obsoletedEvents.forEach(i -> {
TaskCompletionEvent tce = new TaskCompletionEvent(tceList.size(),
- new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
- i, true, TaskCompletionEvent.Status.SUCCEEDED,
- "http://somehost:8888");
+ new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
+ i, true, TaskCompletionEvent.Status.SUCCEEDED,
+ "http://somehost:8888");
tceList.add(tce);
});
TaskCompletionEvent[] events = {};
@@ -288,25 +290,25 @@ public class EvenFetcherTest {
private MapTaskCompletionEventsUpdate getInconsistentCompletionEventsUpdate(
- int startIdx, int numEvents, Set<Integer> missEvents, Set<Integer> extraEvents) {
+ int startIdx, int numEvents, Set<Integer> missEvents, Set<Integer> extraEvents) {
ArrayList<TaskCompletionEvent> tceList = new ArrayList<org.apache.hadoop.mapred.TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
if (!missEvents.contains(eventIdx)) {
TaskCompletionEvent.Status status = TaskCompletionEvent.Status.SUCCEEDED;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
- new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
- eventIdx, true, status,
- "http://somehost:8888");
+ new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
+ eventIdx, true, status,
+ "http://somehost:8888");
tceList.add(tce);
}
}
extraEvents.forEach(i -> {
TaskCompletionEvent tce = new TaskCompletionEvent(i,
- new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
- i, true, TaskCompletionEvent.Status.SUCCEEDED,
- "http://somehost:8888");
+ new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
+ i, true, TaskCompletionEvent.Status.SUCCEEDED,
+ "http://somehost:8888");
tceList.add(tce);
});
TaskCompletionEvent[] events = {};
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index cc622f09..531f5405 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -29,15 +29,6 @@ import java.util.TreeMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.uniffle.client.api.ShuffleWriteClient;
-import org.apache.uniffle.client.response.SendShuffleDataResult;
-import org.apache.uniffle.common.PartitionRange;
-import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.ShuffleAssignmentsInfo;
-import org.apache.uniffle.common.ShuffleBlockInfo;
-import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.exception.RssException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -51,32 +42,41 @@ import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MROutputFiles;
+import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SortWriteBufferManager;
-import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.RssMRUtils;
-import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleReadClient;
+import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.response.SendShuffleDataResult;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.RssShuffleUtils;
+import org.apache.uniffle.common.ShuffleAssignmentsInfo;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class FetcherTest {
static JobID jobId = new JobID("a", 0);
static TaskAttemptID reduceId1 = new TaskAttemptID(
- new TaskID(jobId, TaskType.REDUCE, 0), 0);
+ new TaskID(jobId, TaskType.REDUCE, 0), 0);
static Configuration conf = new Configuration();
static JobConf jobConf = new JobConf();
static LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
@@ -97,7 +97,7 @@ public class FetcherTest {
null, null, new Progress(), new MROutputFiles());
ShuffleReadClient shuffleReadClient = new MockedShuffleReadClient(data);
RssFetcher fetcher = new RssFetcher(jobConf, reduceId1, taskStatus, merger, new Progress(),
- reporter, metrics, shuffleReadClient, 3);
+ reporter, metrics, shuffleReadClient, 3);
fetcher.fetchAllRssBlocks();
@@ -107,7 +107,7 @@ public class FetcherTest {
List<String> allValuesExpected = Lists.newArrayList("v11", "v22", "v22", "v33", "v44", "v55", "v55");
List<String> allKeys = Lists.newArrayList();
List<String> allValues = Lists.newArrayList();
- while(iterator.next()){
+ while (iterator.next()) {
byte[] key = new byte[iterator.getKey().getLength()];
byte[] value = new byte[iterator.getValue().getLength()];
System.arraycopy(iterator.getKey().getData(), 0, key, 0, key.length);
@@ -138,7 +138,7 @@ public class FetcherTest {
List<String> allValuesExpected = Lists.newArrayList("v11", "v22", "v22", "v33", "v44", "v55", "v55");
List<String> allKeys = Lists.newArrayList();
List<String> allValues = Lists.newArrayList();
- while(iterator.next()){
+ while (iterator.next()) {
byte[] key = new byte[iterator.getKey().getLength()];
byte[] value = new byte[iterator.getValue().getLength()];
System.arraycopy(iterator.getKey().getData(), 0, key, 0, key.length);
@@ -161,7 +161,7 @@ public class FetcherTest {
null, null, new Progress(), new MROutputFiles(), expectedFails);
ShuffleReadClient shuffleReadClient = new MockedShuffleReadClient(data);
RssFetcher fetcher = new RssFetcher(jobConf, reduceId1, taskStatus, merger, new Progress(),
- reporter, metrics, shuffleReadClient, 3);
+ reporter, metrics, shuffleReadClient, 3);
fetcher.fetchAllRssBlocks();
RawKeyValueIterator iterator = merger.close();
@@ -170,7 +170,7 @@ public class FetcherTest {
List<String> allValuesExpected = Lists.newArrayList("v11", "v22", "v22", "v33", "v44", "v55", "v55");
List<String> allKeys = Lists.newArrayList();
List<String> allValues = Lists.newArrayList();
- while(iterator.next()){
+ while (iterator.next()) {
byte[] key = new byte[iterator.getKey().getLength()];
byte[] value = new byte[iterator.getValue().getLength()];
System.arraycopy(iterator.getKey().getData(), 0, key, 0, key.length);
@@ -181,8 +181,8 @@ public class FetcherTest {
validate(allKeysExpected, allKeys);
validate(allValuesExpected, allValues);
// There will be 2 retries
- assert(fetcher.getRetryCount() == 2);
- assert(((MockMergeManagerImpl)merger).happenedFails.size() == 2);
+ assert (fetcher.getRetryCount() == 2);
+ assert (((MockMergeManagerImpl)merger).happenedFails.size() == 2);
}
public void testCodecIsDuplicated() throws Exception {
@@ -203,9 +203,9 @@ public class FetcherTest {
}
private void validate(List<String> expected, List<String> actual) {
- assert(expected.size() == actual.size());
- for(int i = 0; i < expected.size(); i++) {
- assert(expected.get(i).equals(actual.get(i)));
+ assert (expected.size() == actual.size());
+ for (int i = 0; i < expected.size(); i++) {
+ assert (expected.get(i).equals(actual.get(i)));
}
}
@@ -244,7 +244,7 @@ public class FetcherTest {
}
private static byte[] writeMapOutputRss(Configuration conf, Map<String, String> keysToValues)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
SerializationFactory serializationFactory = new SerializationFactory(jobConf);
MockShuffleWriteClient client = new MockShuffleWriteClient();
client.setMode(2);
@@ -254,26 +254,26 @@ public class FetcherTest {
Counters.Counter mapOutputByteCounter = new Counters.Counter();
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
SortWriteBufferManager<Text, Text> manager = new SortWriteBufferManager(
- 10240,
- 1L,
- 10,
- serializationFactory.getSerializer(Text.class),
- serializationFactory.getSerializer(Text.class),
- WritableComparator.get(Text.class),
- 0.9,
- "test",
- client,
- 500,
- 5 * 1000,
- partitionToServers,
- successBlocks,
- failedBlocks,
- mapOutputByteCounter,
- mapOutputRecordCounter,
- 1,
- 100,
- 2000,
- true,
+ 10240,
+ 1L,
+ 10,
+ serializationFactory.getSerializer(Text.class),
+ serializationFactory.getSerializer(Text.class),
+ WritableComparator.get(Text.class),
+ 0.9,
+ "test",
+ client,
+ 500,
+ 5 * 1000,
+ partitionToServers,
+ successBlocks,
+ failedBlocks,
+ mapOutputByteCounter,
+ mapOutputRecordCounter,
+ 1,
+ 100,
+ 2000,
+ true,
5,
0.2f,
1024000L);
@@ -288,11 +288,11 @@ public class FetcherTest {
private static byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
- throws IOException {
+ throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
- Text.class, Text.class, null, null);
+ Text.class, Text.class, null, null);
for (String key : keysToValues.keySet()) {
String value = keysToValues.get(key);
writer.append(new Text(key), new Text(value));
@@ -306,6 +306,8 @@ public class FetcherTest {
public Set<Integer> happenedFails = Sets.newHashSet();
private Set<Integer> expectedFails;
private int currentMapout = 0;
+
+ @SuppressWarnings("checkstyle:RedundantModifier")
public MockMergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf,
FileSystem localFS, LocalDirAllocator localDirAllocator,
Reporter reporter, CompressionCodec codec, Class combinerClass,
@@ -314,8 +316,8 @@ public class FetcherTest {
Counters.Counter mergedMapOutputsCounter, ExceptionReporter exceptionReporter,
Progress mergePhase, MapOutputFile mapOutputFile, Set<Integer> expectedFails) {
super(reduceId, jobConf, localFS, localDirAllocator, reporter, codec, combinerClass,
- combineCollector, spilledRecordsCounter, reduceCombineInputCounter, mergedMapOutputsCounter, exceptionReporter,
- mergePhase, mapOutputFile);
+ combineCollector, spilledRecordsCounter, reduceCombineInputCounter,
+ mergedMapOutputsCounter, exceptionReporter, mergePhase, mapOutputFile);
this.expectedFails = expectedFails;
}
@@ -342,6 +344,7 @@ public class FetcherTest {
}
public List<byte[]> data = new LinkedList<>();
+
@Override
public SendShuffleDataResult sendShuffleData(String appId, List<ShuffleBlockInfo> shuffleBlockInfoList) {
if (mode == 0) {
@@ -353,7 +356,7 @@ public class FetcherTest {
for (ShuffleBlockInfo blockInfo : shuffleBlockInfoList) {
successBlockIds.add(blockInfo.getBlockId());
}
- shuffleBlockInfoList.forEach( block -> {
+ shuffleBlockInfoList.forEach(block -> {
data.add(RssShuffleUtils.decompressData(block.getData(), block.getUncompressLength()));
});
return new SendShuffleDataResult(successBlockIds, Sets.newHashSet());
@@ -396,17 +399,20 @@ public class FetcherTest {
}
@Override
- public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId, int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
+ public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
+ int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
}
@Override
- public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum, int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
+ public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum,
+ int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
return null;
}
@Override
- public Roaring64NavigableMap getShuffleResult(String clientType, Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int partitionId) {
+ public Roaring64NavigableMap getShuffleResult(String clientType, Set<ShuffleServerInfo> shuffleServerInfoSet,
+ String appId, int shuffleId, int partitionId) {
return null;
}
@@ -419,9 +425,10 @@ public class FetcherTest {
static class MockedShuffleReadClient implements ShuffleReadClient {
List<CompressedShuffleBlock> blocks;
int index = 0;
+
MockedShuffleReadClient(List<byte[]> data) {
this.blocks = new LinkedList<>();
- data.forEach( bytes -> {
+ data.forEach(bytes -> {
byte[] compressed = RssShuffleUtils.compressData(bytes);
blocks.add(new CompressedShuffleBlock(ByteBuffer.wrap(compressed), bytes.length));
});
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java
index 1fc133bc..51e535fb 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java
@@ -17,6 +17,14 @@
package org.apache.hadoop.mapreduce.task.reduce;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
@@ -31,7 +39,6 @@ import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MROutputFiles;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
@@ -41,14 +48,6 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.jupiter.api.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -62,15 +61,15 @@ public class RssInMemoryRemoteMergerTest {
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
JobID jobId = new JobID("a", 0);
- TaskAttemptID mapId1 = new TaskAttemptID(
+ final TaskAttemptID mapId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1), 0);
- TaskAttemptID mapId2 = new TaskAttemptID(
+ final TaskAttemptID mapId2 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 2), 0);
TaskAttemptID reduceId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0);
- RssRemoteMergeManagerImpl<Text, Text> mergeManager = new RssRemoteMergeManagerImpl<Text, Text>(
+ final RssRemoteMergeManagerImpl<Text, Text> mergeManager = new RssRemoteMergeManagerImpl<Text, Text>(
"app", reduceId1, jobConf, tmpDir.toString(), 1,5, fs, lda, Reporter.NULL,
- null, null, null, null, null,
+ null, null, null, null, null,
null, null, new Progress(), new MROutputFiles(), new JobConf());
// write map outputs
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java
index 122d91c0..c8baab60 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java
@@ -17,6 +17,13 @@
package org.apache.hadoop.mapreduce.task.reduce;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
@@ -41,13 +48,6 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.jupiter.api.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RssRemoteMergeManagerTest {
@@ -55,16 +55,17 @@ public class RssRemoteMergeManagerTest {
JobID jobId = new JobID(appId, 0);
TaskAttemptID mapId1 = new TaskAttemptID(
- new TaskID(jobId, TaskType.MAP, 1), 0);
+ new TaskID(jobId, TaskType.MAP, 1), 0);
TaskAttemptID mapId2 = new TaskAttemptID(
- new TaskID(jobId, TaskType.MAP, 2), 0);
+ new TaskID(jobId, TaskType.MAP, 2), 0);
TaskAttemptID reduceId1 = new TaskAttemptID(
- new TaskID(jobId, TaskType.REDUCE, 0), 0);
+ new TaskID(jobId, TaskType.REDUCE, 0), 0);
+
@Test
public void mergerTest() throws Throwable {
JobConf jobConf = new JobConf();
- FileSystem fs = FileSystem.getLocal(jobConf);
- LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+ final FileSystem fs = FileSystem.getLocal(jobConf);
+ final LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
@@ -72,10 +73,10 @@ public class RssRemoteMergeManagerTest {
jobConf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.01");
jobConf.set("mapreduce.reduce.shuffle.merge.percent", "0.1");
- RssRemoteMergeManagerImpl<Text, Text> mergeManager = new RssRemoteMergeManagerImpl<Text, Text>(
- appId, reduceId1, jobConf, tmpDir.toString(), 1, 5, fs, lda, Reporter.NULL,
- null, null, null, null, null,
- null, null, new Progress(), new MROutputFiles(), new JobConf());
+ final RssRemoteMergeManagerImpl<Text, Text> mergeManager = new RssRemoteMergeManagerImpl<Text, Text>(
+ appId, reduceId1, jobConf, tmpDir.toString(), 1, 5, fs, lda, Reporter.NULL,
+ null, null, null, null, null,
+ null, null, new Progress(), new MROutputFiles(), new JobConf());
// write map outputs
Map<String, String> map1 = new TreeMap<String, String>();
@@ -88,9 +89,9 @@ public class RssRemoteMergeManagerTest {
InMemoryMapOutput mapOutput1 = (InMemoryMapOutput)mergeManager.reserve(mapId1, mapOutputBytes1.length, 0);
InMemoryMapOutput mapOutput2 = (InMemoryMapOutput)mergeManager.reserve(mapId2, mapOutputBytes2.length, 0);
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
- mapOutputBytes1.length);
+ mapOutputBytes1.length);
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
- mapOutputBytes2.length);
+ mapOutputBytes2.length);
mapOutput1.commit();
mapOutput2.commit();
@@ -122,11 +123,11 @@ public class RssRemoteMergeManagerTest {
}
private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
- throws IOException {
+ throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
- Text.class, Text.class, null, null);
+ Text.class, Text.class, null, null);
for (String key : keysToValues.keySet()) {
String value = keysToValues.get(key);
writer.append(new Text(key), new Text(value));
@@ -140,7 +141,7 @@ public class RssRemoteMergeManagerTest {
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
- fs.getFileStatus(path).getLen(), null, null);
+ fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
index cdd30ff6..09d1e63e 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
@@ -22,13 +22,12 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.Maps;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
-import org.apache.uniffle.common.util.Constants;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -82,7 +81,7 @@ public class RssSparkShuffleUtilsTest {
@Test
public void applyDynamicClientConfTest() {
- SparkConf conf = new SparkConf();
+ final SparkConf conf = new SparkConf();
Map<String, String> clientConf = Maps.newHashMap();
String remoteStoragePath = "hdfs://path1";
String mockKey = "spark.mockKey";
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index c02aa1d2..422c6d04 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -17,21 +17,14 @@
package org.apache.spark.shuffle.reader;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.ShufflePartitionedBlock;
-import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.storage.HdfsTestBase;
-import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
@@ -40,6 +33,16 @@ import scala.Product2;
import scala.collection.Iterator;
import scala.reflect.ClassTag$;
+import org.apache.uniffle.client.util.ClientUtils;
+import org.apache.uniffle.common.RssShuffleUtils;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.storage.HdfsTestBase;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
public abstract class AbstractRssReaderTest extends HdfsTestBase {
private AtomicInteger atomicInteger = new AtomicInteger(0);
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 12f5a318..f4f55c18 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -17,24 +17,11 @@
package org.apache.spark.shuffle.reader;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.uniffle.client.api.ShuffleReadClient;
-import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
-import org.apache.uniffle.client.util.ClientUtils;
-import org.apache.uniffle.client.util.DefaultIdHelper;
-import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
-import org.apache.uniffle.storage.util.StorageType;
import java.nio.ByteBuffer;
import java.util.Map;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -47,6 +34,19 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.api.ShuffleReadClient;
+import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
+import org.apache.uniffle.client.util.ClientUtils;
+import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -89,10 +89,12 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
assertEquals(10, recNum);
}
- private RssShuffleDataIterator getDataIterator(String basePath, Roaring64NavigableMap blockIdBitmap, Roaring64NavigableMap taskIdBitmap) {
+ private RssShuffleDataIterator getDataIterator(String basePath, Roaring64NavigableMap blockIdBitmap,
+ Roaring64NavigableMap taskIdBitmap) {
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
StorageType.HDFS.name(), "appId", 0, 1, 100, 2,
- 10, 10000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 10000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(),
+ new Configuration(), new DefaultIdHelper());
return new RssShuffleDataIterator(KRYO_SERIALIZER, readClient,
new ShuffleReadMetrics());
}
@@ -131,7 +133,7 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler1, 2, 5, expectedData,
blockIdBitmap, "key1", KRYO_SERIALIZER, 0);
writeTestData(writeHandler2, 2, 5, expectedData,
@@ -175,6 +177,7 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
fs.listStatus(dataFile);
fail("Index file should be deleted");
} catch (Exception e) {
+ // ignore
}
try {
@@ -199,7 +202,7 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
- RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, blockIdBitmap, taskIdBitmap);
+ final RssShuffleDataIterator rssShuffleDataIterator = getDataIterator(basePath, blockIdBitmap, taskIdBitmap);
// index file is deleted after iterator initialization, it should be ok, all index infos are read already
Path indexFile = new Path(basePath + "/appId/0/0-1/test.index");
fs.delete(indexFile, true);
@@ -209,6 +212,7 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
fs.listStatus(indexFile);
fail("Index file should be deleted");
} catch (Exception e) {
+ // ignore
}
validateResult(rssShuffleDataIterator, expectedData, 10);
}
@@ -245,7 +249,8 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
public void cleanup() throws Exception {
ShuffleReadClient mockClient = mock(ShuffleReadClient.class);
doNothing().when(mockClient).close();
- RssShuffleDataIterator dataIterator = new RssShuffleDataIterator(KRYO_SERIALIZER, mockClient, new ShuffleReadMetrics());
+ RssShuffleDataIterator dataIterator =
+ new RssShuffleDataIterator(KRYO_SERIALIZER, mockClient, new ShuffleReadMetrics());
dataIterator.cleanup();
verify(mockClient, times(1)).close();
}
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
index 1a226334..665f5d2d 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
@@ -17,17 +17,9 @@
package org.apache.spark.shuffle.writer;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import java.util.List;
import com.google.common.collect.Maps;
-import org.apache.uniffle.common.ShuffleBlockInfo;
-import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.TaskMemoryManager;
@@ -36,6 +28,16 @@ import org.apache.spark.serializer.Serializer;
import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
public class WriteBufferManagerTest {
private WriteBufferManager createManager(SparkConf conf) {
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferTest.java
index 5d612768..04a52b43 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferTest.java
@@ -17,8 +17,6 @@
package org.apache.spark.shuffle.writer;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
import org.apache.spark.SparkConf;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.serializer.SerializationStream;
@@ -26,6 +24,8 @@ import org.apache.spark.serializer.Serializer;
import org.junit.jupiter.api.Test;
import scala.reflect.ClassTag$;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class WriteBufferTest {
private SparkConf conf = new SparkConf(false);
diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index dca71bbe..12897489 100644
--- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -19,13 +19,8 @@ package org.apache.spark.shuffle;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.Set;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.uniffle.client.request.RssAccessClusterRequest;
-import org.apache.uniffle.client.response.ResponseStatusCode;
-import org.apache.uniffle.storage.util.StorageType;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.sort.SortShuffleManager;
import org.junit.jupiter.api.AfterAll;
@@ -36,8 +31,7 @@ import org.mockito.Mockito;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
-import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.RetryUtils;
+import org.apache.uniffle.storage.util.StorageType;
import static org.apache.uniffle.client.response.ResponseStatusCode.ACCESS_DENIED;
import static org.apache.uniffle.client.response.ResponseStatusCode.SUCCESS;
@@ -165,18 +159,18 @@ public class DelegationRssShuffleManagerTest {
.thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
.thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
.thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""));
- List<CoordinatorClient> SecondCoordinatorClients = Lists.newArrayList();
- SecondCoordinatorClients.add(mockCoordinatorClient);
+ List<CoordinatorClient> secondCoordinatorClients = Lists.newArrayList();
+ secondCoordinatorClients.add(mockCoordinatorClient);
mockedStaticRssShuffleUtils.when(() ->
- RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(SecondCoordinatorClients);
- SparkConf SecondConf = new SparkConf();
- SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
- SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
- SecondConf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
- SecondConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
- SecondConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
- SecondConf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
- assertCreateSortShuffleManager(SecondConf);
+ RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(secondCoordinatorClients);
+ SparkConf secondConf = new SparkConf();
+ secondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ secondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ secondConf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ secondConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ secondConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ secondConf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ assertCreateSortShuffleManager(secondConf);
}
private DelegationRssShuffleManager assertCreateSortShuffleManager(SparkConf conf) throws Exception {
diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index 63308c1e..473ce609 100644
--- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -17,15 +17,9 @@
package org.apache.spark.shuffle.reader;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import java.util.Map;
import com.google.common.collect.Maps;
-import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
-import org.apache.uniffle.storage.util.StorageType;
-import java.util.Map;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
@@ -37,6 +31,14 @@ import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import scala.Option;
+import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
public class RssShuffleReaderTest extends AbstractRssReaderTest {
private static final Serializer KRYO_SERIALIZER = new KryoSerializer(new SparkConf(false));
@@ -50,8 +52,8 @@ public class RssShuffleReaderTest extends AbstractRssReaderTest {
Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 2, 5, expectedData,
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
TaskContext contextMock = mock(TaskContext.class);
diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 995f14e2..084a731c 100644
--- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -35,9 +35,9 @@ import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.serializer.Serializer;
-import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.util.EventLoop;
import org.junit.jupiter.api.Test;
import scala.Product2;
@@ -51,8 +51,8 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -74,7 +74,7 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name())
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "127.0.0.1:12345,127.0.0.1:12346");
// init SparkContext
- SparkContext sc = SparkContext.getOrCreate(conf);
+ final SparkContext sc = SparkContext.getOrCreate(conf);
RssShuffleManager manager = new RssShuffleManager(conf, false);
Serializer kryoSerializer = new KryoSerializer(conf);
@@ -141,7 +141,7 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name())
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "127.0.0.1:12345,127.0.0.1:12346");
// init SparkContext
- SparkContext sc = SparkContext.getOrCreate(conf);
+ final SparkContext sc = SparkContext.getOrCreate(conf);
RssShuffleManager manager = new RssShuffleManager(conf, false);
List<ShuffleBlockInfo> shuffleBlockInfos = Lists.newArrayList();
@@ -163,7 +163,7 @@ public class RssShuffleWriterTest {
Partitioner mockPartitioner = mock(Partitioner.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
- ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
+ final ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
Serializer kryoSerializer = new KryoSerializer(conf);
@@ -247,11 +247,11 @@ public class RssShuffleWriterTest {
@Test
public void postBlockEventTest() throws Exception {
- WriteBufferManager mockBufferManager = mock(WriteBufferManager.class);
- ShuffleWriteMetrics mockMetrics = mock(ShuffleWriteMetrics.class);
+ final WriteBufferManager mockBufferManager = mock(WriteBufferManager.class);
+ final ShuffleWriteMetrics mockMetrics = mock(ShuffleWriteMetrics.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
Partitioner mockPartitioner = mock(Partitioner.class);
- RssShuffleManager mockShuffleManager = mock(RssShuffleManager.class);
+ final RssShuffleManager mockShuffleManager = mock(RssShuffleManager.class);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
when(mockPartitioner.numPartitions()).thenReturn(2);
List<AddBlockEvent> events = Lists.newArrayList();
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 9682b4b7..222dbc00 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -21,8 +21,6 @@ import java.util.List;
import java.util.NoSuchElementException;
import com.google.common.collect.Lists;
-
-import org.apache.uniffle.storage.util.StorageType;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.sort.SortShuffleManager;
import org.junit.jupiter.api.AfterAll;
@@ -33,8 +31,7 @@ import org.mockito.Mockito;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
-import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.RetryUtils;
+import org.apache.uniffle.storage.util.StorageType;
import static org.apache.uniffle.client.response.ResponseStatusCode.ACCESS_DENIED;
import static org.apache.uniffle.client.response.ResponseStatusCode.SUCCESS;
@@ -162,18 +159,18 @@ public class DelegationRssShuffleManagerTest {
.thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
.thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
.thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""));
- List<CoordinatorClient> SecondCoordinatorClients = Lists.newArrayList();
- SecondCoordinatorClients.add(mockCoordinatorClient);
+ List<CoordinatorClient> secondCoordinatorClients = Lists.newArrayList();
+ secondCoordinatorClients.add(mockCoordinatorClient);
mockedStaticRssShuffleUtils.when(() ->
- RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(SecondCoordinatorClients);
- SparkConf SecondConf = new SparkConf();
- SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
- SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
- SecondConf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
- SecondConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
- SecondConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
- SecondConf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
- assertCreateSortShuffleManager(SecondConf);
+ RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(secondCoordinatorClients);
+ SparkConf secondConf = new SparkConf();
+ secondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ secondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ secondConf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ secondConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ secondConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ secondConf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ assertCreateSortShuffleManager(secondConf);
}
private DelegationRssShuffleManager assertCreateSortShuffleManager(SparkConf conf) throws Exception {
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
index f0a6e1f6..99619f16 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.SystemUtils;
-
import org.apache.spark.SparkConf;
import org.apache.spark.util.EventLoop;
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index ae8ef2f2..70938c88 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -51,13 +51,13 @@ public class RssShuffleReaderTest extends AbstractRssReaderTest {
String basePath = HDFS_URI + "readTest1";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 0, basePath, "test", conf);
- HdfsShuffleWriteHandler writeHandler1 =
+ final HdfsShuffleWriteHandler writeHandler1 =
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test", conf);
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
Map<String, String> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+ final Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 093f3c0f..1b7afcd9 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -36,9 +36,9 @@ import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.serializer.Serializer;
-import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.TestUtils;
import org.apache.spark.util.EventLoop;
import org.junit.jupiter.api.Test;
@@ -52,8 +52,8 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -74,7 +74,7 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name())
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "127.0.0.1:12345,127.0.0.1:12346");
// init SparkContext
- SparkContext sc = SparkContext.getOrCreate(conf);
+ final SparkContext sc = SparkContext.getOrCreate(conf);
Map<String, Set<Long>> failBlocks = Maps.newConcurrentMap();
Map<String, Set<Long>> successBlocks = Maps.newConcurrentMap();
Serializer kryoSerializer = new KryoSerializer(conf);
@@ -146,7 +146,7 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "127.0.0.1:12345,127.0.0.1:12346");
// init SparkContext
List<ShuffleBlockInfo> shuffleBlockInfos = Lists.newArrayList();
- SparkContext sc = SparkContext.getOrCreate(conf);
+ final SparkContext sc = SparkContext.getOrCreate(conf);
Map<String, Set<Long>> successBlockIds = Maps.newConcurrentMap();
EventLoop<AddBlockEvent> testLoop = new EventLoop<AddBlockEvent>("test") {
@Override
@@ -164,7 +164,7 @@ public class RssShuffleWriterTest {
}
};
- RssShuffleManager manager = TestUtils.createShuffleManager(
+ final RssShuffleManager manager = TestUtils.createShuffleManager(
conf,
false,
testLoop,
@@ -172,7 +172,7 @@ public class RssShuffleWriterTest {
Maps.newConcurrentMap());
Serializer kryoSerializer = new KryoSerializer(conf);
Partitioner mockPartitioner = mock(Partitioner.class);
- ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
+ final ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 319d4b2b..2589c195 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -17,27 +17,14 @@
package org.apache.uniffle.client.impl;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.uniffle.client.TestUtils;
-import org.apache.uniffle.client.util.DefaultIdHelper;
-import org.apache.uniffle.common.ShufflePartitionedBlock;
-import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.storage.HdfsTestBase;
-import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
-import org.apache.uniffle.storage.util.StorageType;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -47,6 +34,21 @@ import org.mockito.Mockito;
import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.storage.HdfsTestBase;
+import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+
public class ShuffleReadClientImplTest extends HdfsTestBase {
private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
@@ -65,7 +67,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
blockIdBitmap);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -74,7 +77,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
blockIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
try {
// can't find all expected block id, data loss
@@ -119,8 +123,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, "test3_2", conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler1, 2, 30, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler2, 2, 30, 0, expectedData, blockIdBitmap);
@@ -169,6 +173,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
fs.listStatus(dataFile);
fail("Index file should be deleted");
} catch (Exception e) {
+ // ignore
}
try {
@@ -210,8 +215,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
Map<Long, byte[]> expectedData1 = Maps.newHashMap();
Map<Long, byte[]> expectedData2 = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ final Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 10, 30, 0, expectedData1, blockIdBitmap1);
Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
@@ -222,7 +227,7 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
ShuffleReadClientImpl readClient1 = new ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 0, 100, 2, 10, 100,
basePath, blockIdBitmap1, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
- ShuffleReadClientImpl readClient2 = new ShuffleReadClientImpl(StorageType.HDFS.name(),
+ final ShuffleReadClientImpl readClient2 = new ShuffleReadClientImpl(StorageType.HDFS.name(),
"appId", 0, 1, 100, 2, 10, 100,
basePath, blockIdBitmap2, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient1, expectedData1);
@@ -316,35 +321,40 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
// test with different indexReadLimit to validate result
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 1, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 2, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 3, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 10, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 11, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
@@ -358,15 +368,16 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
+ final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler, 5, 30, 2, Maps.newHashMap(), blockIdBitmap);
writeTestData(writeHandler, 5, 30, 1, expectedData, blockIdBitmap);
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
@@ -381,8 +392,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
+ final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
// test case: data generated by speculation task without report result
writeTestData(writeHandler, 5, 30, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
@@ -392,7 +403,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
assertEquals(20, readClient.getProcessedBlockIds().getLongCardinality());
@@ -407,15 +419,16 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 2);
+ final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 2);
writeDuplicatedData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler, 5, 30, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
writeTestData(writeHandler, 5, 30, 2, expectedData, blockIdBitmap);
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
assertEquals(15, readClient.getProcessedBlockIds().getLongCardinality());
@@ -427,11 +440,11 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
public void readTest15() throws Exception {
String basePath = HDFS_URI + "clientReadTest15";
HdfsShuffleWriteHandler writeHandler =
- new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
+ new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, "test1", conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler, 5, 30, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
@@ -440,7 +453,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
writeTestData(writeHandler, 5, 30, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(), "appId", 0, 1, 100, 1,
- 10, 1000, basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ 10, 1000, basePath, blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
TestUtils.validateResult(readClient, expectedData);
assertEquals(25, readClient.getProcessedBlockIds().getLongCardinality());
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index d7f539ed..79661e76 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -17,14 +17,12 @@
package org.apache.uniffle.client.impl;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import java.util.List;
import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
@@ -32,10 +30,11 @@ import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class ShuffleWriteClientImplTest {
diff --git a/common/src/test/java/org/apache/uniffle/common/KerberizedHdfs.java b/common/src/test/java/org/apache/uniffle/common/KerberizedHdfs.java
index 8482490c..bc1fe1e2 100644
--- a/common/src/test/java/org/apache/uniffle/common/KerberizedHdfs.java
+++ b/common/src/test/java/org/apache/uniffle/common/KerberizedHdfs.java
@@ -168,7 +168,7 @@ public class KerberizedHdfs implements Serializable {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.setShouldRenewImmediatelyForTests(true);
- UserGroupInformation ugi =
+ final UserGroupInformation ugi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(hdfsPrincipal, hdfsKeytab);
Configuration hdfsConf = createSecureDFSConfig();
diff --git a/common/src/test/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProviderTest.java b/common/src/test/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProviderTest.java
index a787a099..fd3de044 100644
--- a/common/src/test/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProviderTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProviderTest.java
@@ -57,7 +57,7 @@ public class HadoopFilesystemProviderTest extends KerberizedHdfsBase {
FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(new Path("/hdfs"), kerberizedHdfs.getConf());
fileSystem.mkdirs(new Path("/hdfs/HadoopFilesystemProviderTest"));
} catch (AccessControlException e) {
-
+ // ignore
}
}
diff --git a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java
index dde0340a..9a737558 100644
--- a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java
@@ -55,7 +55,7 @@ public class HadoopSecurityContextTest extends KerberizedHdfsBase {
context.runSecured(StringUtils.EMPTY, (Callable<Void>) () -> null);
fail();
} catch (Exception e) {
-
+ return;
}
// case2: run by the login user, there is no need to wrap proxy action
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
index 1d1bc130..31e6456c 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
@@ -20,9 +20,9 @@ package org.apache.uniffle.common.util;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Sets;
-import org.apache.uniffle.common.exception.NotRetryException;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 706bcde6..8f2e7f4c 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.coordinator;
import java.io.File;
import java.io.FileWriter;
-import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.util.List;
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
index 26a860e9..c4097d7d 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
@@ -64,8 +64,6 @@ public class AccessCandidatesCheckerHdfsTest extends HdfsTestBase {
String candidatesFile,
FileSystem fs,
Configuration hadoopConf) throws Exception {
- Path path = new Path(candidatesFile);
- FSDataOutputStream out = fs.create(path);
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC, 1);
@@ -94,6 +92,9 @@ public class AccessCandidatesCheckerHdfsTest extends HdfsTestBase {
assertTrue(expectedException.getMessage().contains(
"NoSuchMethodException: org.apache.uniffle.coordinator.AccessCandidatesChecker.<init>()"));
+ Path path = new Path(candidatesFile);
+ FSDataOutputStream out = fs.create(path);
+
PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(out));
printWriter.println("9527");
printWriter.println(" 135 ");
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index 149ab88b..e245c256 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -17,10 +17,10 @@
package org.apache.uniffle.test;
+import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
-import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -126,7 +126,8 @@ public class AccessClusterTest extends CoordinatorTestBase {
coordinatorConf.setString("rss.coordinator.access.candidates.path", cfgFile.getAbsolutePath());
coordinatorConf.setString(
"rss.coordinator.access.checkers",
- "org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+ "org.apache.uniffle.coordinator.AccessCandidatesChecker,"
+ + "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
index 29ff97a0..2ed3e45f 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
@@ -52,7 +52,7 @@ public class AssignmentServerNodesNumberTest extends CoordinatorTestBase {
coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, SHUFFLE_NODES_MAX);
createCoordinatorServer(coordinatorConf);
- for (int i = 0; i < SERVER_NUM; i++){
+ for (int i = 0; i < SERVER_NUM; i++) {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File tmpDir = Files.createTempDir();
File dataDir1 = new File(tmpDir, "data1");
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
index 4ca0c205..c19a911f 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
@@ -17,10 +17,6 @@
package org.apache.uniffle.test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
@@ -31,7 +27,14 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.util.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -41,172 +44,170 @@ import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
/**
* This class is to test the conf of {@code org.apache.uniffle.server.ShuffleServerConf.Tags}
* and {@code RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS}
*/
public class AssignmentWithTagsTest extends CoordinatorTestBase {
- private static final Logger LOG = LoggerFactory.getLogger(AssignmentWithTagsTest.class);
-
- // KV: tag -> shuffle server id
- private static Map<String, List<Integer>> tagOfShufflePorts = new HashMap<>();
+ private static final Logger LOG = LoggerFactory.getLogger(AssignmentWithTagsTest.class);
- private static List<Integer> findAvailablePorts(int num) throws IOException {
- List<ServerSocket> sockets = new ArrayList<>();
- List<Integer> ports = new ArrayList<>();
+ // KV: tag -> shuffle server id
+ private static Map<String, List<Integer>> tagOfShufflePorts = new HashMap<>();
- for (int i = 0; i < num; i++) {
- ServerSocket socket = new ServerSocket(0);
- ports.add(socket.getLocalPort());
- sockets.add(socket);
- }
+ private static List<Integer> findAvailablePorts(int num) throws IOException {
+ List<ServerSocket> sockets = new ArrayList<>();
+ List<Integer> ports = new ArrayList<>();
- for (ServerSocket socket : sockets) {
- socket.close();
- }
+ for (int i = 0; i < num; i++) {
+ ServerSocket socket = new ServerSocket(0);
+ ports.add(socket.getLocalPort());
+ sockets.add(socket);
+ }
- return ports;
+ for (ServerSocket socket : sockets) {
+ socket.close();
}
- private static void createAndStartShuffleServerWithTags(Set<String> tags) throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
+ return ports;
+ }
+
+ private static void createAndStartShuffleServerWithTags(Set<String> tags) throws Exception {
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
- File tmpDir = Files.createTempDir();
- tmpDir.deleteOnExit();
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
- File dataDir1 = new File(tmpDir, "data1");
- File dataDir2 = new File(tmpDir, "data2");
- String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
+ File dataDir1 = new File(tmpDir, "data1");
+ File dataDir2 = new File(tmpDir, "data2");
+ String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
- shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
- shuffleServerConf.setString("rss.storage.basePath", basePath);
- shuffleServerConf.setString("rss.server.tags", StringUtils.join(tags, ","));
+ shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
+ shuffleServerConf.setString("rss.storage.basePath", basePath);
+ shuffleServerConf.setString("rss.server.tags", StringUtils.join(tags, ","));
- List<Integer> ports = findAvailablePorts(2);
- shuffleServerConf.setInteger("rss.rpc.server.port", ports.get(0));
- shuffleServerConf.setInteger("rss.jetty.http.port", ports.get(1));
+ List<Integer> ports = findAvailablePorts(2);
+ shuffleServerConf.setInteger("rss.rpc.server.port", ports.get(0));
+ shuffleServerConf.setInteger("rss.jetty.http.port", ports.get(1));
+
+ for (String tag : tags) {
+ tagOfShufflePorts.putIfAbsent(tag, new ArrayList<>());
+ tagOfShufflePorts.get(tag).add(ports.get(0));
+ }
+ tagOfShufflePorts.putIfAbsent(Constants.SHUFFLE_SERVER_VERSION, new ArrayList<>());
+ tagOfShufflePorts.get(Constants.SHUFFLE_SERVER_VERSION).add(ports.get(0));
- for (String tag : tags) {
- tagOfShufflePorts.putIfAbsent(tag, new ArrayList<>());
- tagOfShufflePorts.get(tag).add(ports.get(0));
- }
- tagOfShufflePorts.putIfAbsent(Constants.SHUFFLE_SERVER_VERSION, new ArrayList<>());
- tagOfShufflePorts.get(Constants.SHUFFLE_SERVER_VERSION).add(ports.get(0));
+ LOG.info("Shuffle server data dir: {}, rpc port: {}, http port: {}", dataDir1 + "," + dataDir2,
+ ports.get(0), ports.get(1));
- LOG.info("Shuffle server data dir: {}, rpc port: {}, http port: {}", dataDir1 + "," + dataDir2,
- ports.get(0), ports.get(1));
+ ShuffleServer server = new ShuffleServer(shuffleServerConf);
+ shuffleServers.add(server);
+ server.start();
+ }
- ShuffleServer server = new ShuffleServer(shuffleServerConf);
- shuffleServers.add(server);
- server.start();
+ @BeforeAll
+ public static void setupServers() throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ createCoordinatorServer(coordinatorConf);
+
+ for (CoordinatorServer coordinator : coordinators) {
+ coordinator.start();
+ }
+
+ for (int i = 0; i < 2; i++) {
+ createAndStartShuffleServerWithTags(Sets.newHashSet());
+ }
+
+ for (int i = 0; i < 2; i++) {
+ createAndStartShuffleServerWithTags(Sets.newHashSet("fixed"));
+ }
+
+ for (int i = 0; i < 2; i++) {
+ createAndStartShuffleServerWithTags(Sets.newHashSet("elastic"));
}
- @BeforeAll
- public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- createCoordinatorServer(coordinatorConf);
-
- for (CoordinatorServer coordinator : coordinators) {
- coordinator.start();
- }
-
- for (int i = 0; i < 2; i ++) {
- createAndStartShuffleServerWithTags(Sets.newHashSet());
- }
-
- for (int i = 0; i < 2; i++) {
- createAndStartShuffleServerWithTags(Sets.newHashSet("fixed"));
- }
-
- for (int i = 0; i < 2; i++) {
- createAndStartShuffleServerWithTags(Sets.newHashSet("elastic"));
- }
-
- // Wait all shuffle servers registering to coordinator
- long startTimeMS = System.currentTimeMillis();
- while (true) {
- int nodeSum = coordinators.get(0).getClusterManager().getNodesNum();
- if (nodeSum == 6) {
- break;
- }
- if (System.currentTimeMillis() - startTimeMS > 1000 * 5) {
- throw new Exception("Timeout of waiting shuffle servers registry, timeout: 5s.");
- }
- }
+ // Wait all shuffle servers registering to coordinator
+ long startTimeMS = System.currentTimeMillis();
+ while (true) {
+ int nodeSum = coordinators.get(0).getClusterManager().getNodesNum();
+ if (nodeSum == 6) {
+ break;
+ }
+ if (System.currentTimeMillis() - startTimeMS > 1000 * 5) {
+ throw new Exception("Timeout of waiting shuffle servers registry, timeout: 5s.");
+ }
+ }
+ }
+
+ @Test
+ public void testTags() throws Exception {
+ ShuffleWriteClientImpl shuffleWriteClient = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
+ 1, 1, 1, true, 1, 1);
+ shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
+
+ // Case1 : only set the single default shuffle version tag
+ ShuffleAssignmentsInfo assignmentsInfo =
+ shuffleWriteClient.getShuffleAssignments("app-1",
+ 1, 1, 1, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 1);
+
+ List<Integer> assignedServerPorts = assignmentsInfo
+ .getPartitionToServers()
+ .values()
+ .stream()
+ .flatMap(x -> x.stream())
+ .map(x -> x.getPort())
+ .collect(Collectors.toList());
+ assertEquals(1, assignedServerPorts.size());
+ assertTrue(tagOfShufflePorts.get(Constants.SHUFFLE_SERVER_VERSION).contains(assignedServerPorts.get(0)));
+
+ // Case2: Set the single non-exist shuffle server tag
+ try {
+ assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-2",
+ 1, 1, 1, Sets.newHashSet("non-exist"), 1);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith("Error happened when getShuffleAssignments with"));
}
- @Test
- public void testTags() throws Exception {
- ShuffleWriteClientImpl shuffleWriteClient = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
- 1, 1, 1, true, 1, 1);
- shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
-
- // Case1 : only set the single default shuffle version tag
- ShuffleAssignmentsInfo assignmentsInfo =
- shuffleWriteClient.getShuffleAssignments("app-1",
- 1, 1, 1, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 1);
-
- List<Integer> assignedServerPorts = assignmentsInfo
- .getPartitionToServers()
- .values()
- .stream()
- .flatMap(x -> x.stream())
- .map(x -> x.getPort())
- .collect(Collectors.toList());
- assertEquals(1, assignedServerPorts.size());
- assertTrue(tagOfShufflePorts.get(Constants.SHUFFLE_SERVER_VERSION).contains(assignedServerPorts.get(0)));
-
- // Case2: Set the single non-exist shuffle server tag
- try {
- assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-2",
- 1, 1, 1, Sets.newHashSet("non-exist"), 1);
- fail();
- } catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Error happened when getShuffleAssignments with"));
- }
-
- // Case3: Set the single fixed tag
- assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-3",
- 1, 1, 1, Sets.newHashSet("fixed"), 1);
- assignedServerPorts = assignmentsInfo
- .getPartitionToServers()
- .values()
- .stream()
- .flatMap(x -> x.stream())
- .map(x -> x.getPort())
- .collect(Collectors.toList());
- assertEquals(1, assignedServerPorts.size());
- assertTrue(tagOfShufflePorts.get("fixed").contains(assignedServerPorts.get(0)));
-
- // case4: Set the multiple tags if exists
- assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-4",
- 1, 1, 1, Sets.newHashSet("fixed", Constants.SHUFFLE_SERVER_VERSION), 1);
- assignedServerPorts = assignmentsInfo
- .getPartitionToServers()
- .values()
- .stream()
- .flatMap(x -> x.stream())
- .map(x -> x.getPort())
- .collect(Collectors.toList());
- assertEquals(1, assignedServerPorts.size());
- assertTrue(tagOfShufflePorts.get("fixed").contains(assignedServerPorts.get(0)));
-
- // case5: Set the multiple tags if non-exist
- try {
- assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-5",
- 1, 1, 1, Sets.newHashSet("fixed", "elastic", Constants.SHUFFLE_SERVER_VERSION), 1);
- fail();
- } catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Error happened when getShuffleAssignments with"));
- }
+ // Case3: Set the single fixed tag
+ assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-3",
+ 1, 1, 1, Sets.newHashSet("fixed"), 1);
+ assignedServerPorts = assignmentsInfo
+ .getPartitionToServers()
+ .values()
+ .stream()
+ .flatMap(x -> x.stream())
+ .map(x -> x.getPort())
+ .collect(Collectors.toList());
+ assertEquals(1, assignedServerPorts.size());
+ assertTrue(tagOfShufflePorts.get("fixed").contains(assignedServerPorts.get(0)));
+
+ // case4: Set the multiple tags if exists
+ assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-4",
+ 1, 1, 1, Sets.newHashSet("fixed", Constants.SHUFFLE_SERVER_VERSION), 1);
+ assignedServerPorts = assignmentsInfo
+ .getPartitionToServers()
+ .values()
+ .stream()
+ .flatMap(x -> x.stream())
+ .map(x -> x.getPort())
+ .collect(Collectors.toList());
+ assertEquals(1, assignedServerPorts.size());
+ assertTrue(tagOfShufflePorts.get("fixed").contains(assignedServerPorts.get(0)));
+
+ // case5: Set the multiple tags if non-exist
+ try {
+ assignmentsInfo = shuffleWriteClient.getShuffleAssignments("app-5",
+ 1, 1, 1, Sets.newHashSet("fixed", "elastic", Constants.SHUFFLE_SERVER_VERSION), 1);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith("Error happened when getShuffleAssignments with"));
}
+ }
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHdfsTest.java
index 3d2bda14..3354f22c 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHdfsTest.java
@@ -51,8 +51,6 @@ public class ClientConfManagerHdfsTest extends HdfsTestBase {
String cfgFile,
FileSystem fileSystem,
Configuration hadoopConf) throws Exception {
- Path path = new Path(cfgFile);
- FSDataOutputStream out = fileSystem.create(path);
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, clusterPathPrefix);
@@ -69,6 +67,8 @@ public class ClientConfManagerHdfsTest extends HdfsTestBase {
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().endsWith("is not a file."));
+ Path path = new Path(cfgFile);
+ FSDataOutputStream out = fileSystem.create(path);
conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile);
ClientConfManager clientConfManager = new ClientConfManager(
conf, new Configuration(), new ApplicationManager(conf));
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHdfsTest.java
index 73a3940a..eff88b8a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHdfsTest.java
@@ -32,12 +32,12 @@ public class ClientConfManagerKerberlizedHdfsTest extends KerberizedHdfsBase {
@Test
public void testConfInHDFS() throws Exception {
- String cfgFile = kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/test/client_conf";
- ClientConfManagerHdfsTest.createAndRunClientConfManagerCases(
- kerberizedHdfs.getSchemeAndAuthorityPrefix(),
- cfgFile,
- kerberizedHdfs.getFileSystem(),
- kerberizedHdfs.getConf()
- );
+ String cfgFile = kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/test/client_conf";
+ ClientConfManagerHdfsTest.createAndRunClientConfManagerCases(
+ kerberizedHdfs.getSchemeAndAuthorityPrefix(),
+ cfgFile,
+ kerberizedHdfs.getFileSystem(),
+ kerberizedHdfs.getConf()
+ );
}
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 8fd896a3..aa4900b5 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -242,8 +242,7 @@ public class CoordinatorGrpcTest extends CoordinatorTestBase {
}
@Test
- public void rpcMetricsTest() throws Exception{
- String appId = "rpcMetricsTest";
+ public void rpcMetricsTest() throws Exception {
double oldValue = coordinators.get(0).getGrpcMetrics().getCounterMap()
.get(CoordinatorGrpcMetrics.HEARTBEAT_METHOD).get();
CoordinatorTestUtils.waitForRegister(coordinatorClient,2);
@@ -254,6 +253,7 @@ public class CoordinatorGrpcTest extends CoordinatorTestBase {
coordinators.get(0).getGrpcMetrics().getGaugeMap()
.get(CoordinatorGrpcMetrics.HEARTBEAT_METHOD).get(), 0.5);
+ String appId = "rpcMetricsTest";
RssGetShuffleAssignmentsRequest request = new RssGetShuffleAssignmentsRequest(
appId, 1, 10, 4, 1,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
index 5e56999f..5592a2e7 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorTestBase.java
@@ -17,15 +17,17 @@
package org.apache.uniffle.test;
-import org.apache.uniffle.client.factory.CoordinatorClientFactory;
-import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.apache.uniffle.client.factory.CoordinatorClientFactory;
+import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcClient;
+
public class CoordinatorTestBase extends IntegrationTestBase {
protected CoordinatorClientFactory factory = new CoordinatorClientFactory("GRPC");
protected CoordinatorGrpcClient coordinatorClient;
+
@BeforeEach
public void createClient() {
coordinatorClient =
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 31a9f907..0704850f 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -29,11 +29,10 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -43,6 +42,7 @@ import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
index 03eb4c18..c93723c6 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
@@ -30,12 +30,12 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.client.request.RssFetchClientConfRequest;
import org.apache.uniffle.client.request.RssFetchRemoteStorageRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
import org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
+import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -97,7 +97,6 @@ public class FetchClientConfTest extends CoordinatorTestBase {
@Test
public void testFetchRemoteStorage(@TempDir File tempDir) throws Exception {
String remotePath1 = "hdfs://path1";
- String remotePath2 = "hdfs://path2";
File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
String contItem = "path2,key1=test1,key2=test2";
Map<String, String> dynamicConf = Maps.newHashMap();
@@ -121,6 +120,7 @@ public class FetchClientConfTest extends CoordinatorTestBase {
assertEquals(remotePath1, remoteStorageInfo.getPath());
// update remote storage info
+ String remotePath2 = "hdfs://path2";
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), remotePath2);
writeRemoteStorageConf(cfgFile, dynamicConf);
waitForUpdate(Sets.newHashSet(remotePath2), coordinators.get(0).getApplicationManager());
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
index 22c92aba..608e0371 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
@@ -53,8 +53,6 @@ public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {
@BeforeAll
public static void setupServers() throws Exception {
serverTmpDir.deleteOnExit();
- long totalSize = serverTmpDir.getTotalSpace();
- long usedSize = serverTmpDir.getTotalSpace() - serverTmpDir.getUsableSpace();
File data1 = new File(serverTmpDir, "data1");
data1.mkdirs();
File data2 = new File(serverTmpDir, "data2");
@@ -67,8 +65,10 @@ public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {
} else {
writeDataSize = (int) freeSize / 2;
}
+ long totalSize = serverTmpDir.getTotalSpace();
+ long usedSize = serverTmpDir.getTotalSpace() - serverTmpDir.getUsableSpace();
maxUsage = (writeDataSize * 0.75 + usedSize) * 100.0 / totalSize;
- healthUsage = (writeDataSize * 0.5 + usedSize) * 100.0 /totalSize;
+ healthUsage = (writeDataSize * 0.5 + usedSize) * 100.0 / totalSize;
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 3000);
@@ -95,20 +95,21 @@ public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {
@Test
public void healthCheckTest() throws Exception {
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+ assertEquals(2, coordinatorClient.getShuffleServerList().getServersCount());
+ List<ServerNode> nodes = coordinators.get(0).getClusterManager()
+ .getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
+ assertEquals(2, coordinatorClient.getShuffleServerList().getServersCount());
+ assertEquals(2, nodes.size());
+
RssGetShuffleAssignmentsRequest request =
- new RssGetShuffleAssignmentsRequest(
+ new RssGetShuffleAssignmentsRequest(
"1",
1,
1,
1,
1,
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
- Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
- assertEquals(2, coordinatorClient.getShuffleServerList().getServersCount());
- List<ServerNode> nodes = coordinators.get(0).getClusterManager()
- .getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
- assertEquals(2, coordinatorClient.getShuffleServerList().getServersCount());
- assertEquals(2, nodes.size());
RssGetShuffleAssignmentsResponse response =
coordinatorClient.getShuffleAssignments(request);
assertFalse(response.getPartitionToServers().isEmpty());
@@ -142,7 +143,7 @@ public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {
if (i == 10) {
fail();
}
- } while(nodes.size() != 2);
+ } while (nodes.size() != 2);
for (ServerNode node : nodes) {
assertTrue(node.isHealthy());
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index 0237aa76..dff9848b 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -37,7 +37,7 @@ import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.storage.HdfsTestBase;
import org.apache.uniffle.storage.util.StorageType;
-abstract public class IntegrationTestBase extends HdfsTestBase {
+public abstract class IntegrationTestBase extends HdfsTestBase {
protected static final int SHUFFLE_SERVER_PORT = 20001;
protected static final String LOCALHOST = "127.0.0.1";
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
index a0fe6abb..5f76be5b 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
@@ -26,10 +26,9 @@ import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -41,6 +40,7 @@ import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -57,7 +57,7 @@ public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ final CoordinatorConf coordinatorConf = getCoordinatorConf();
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
String basePath = generateBasePath();
shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
@@ -91,7 +91,7 @@ public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
map.put(0, Lists.newArrayList(0));
registerShuffle(appId, map);
Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf();
- List<ShuffleBlockInfo> blocks = createShuffleBlockList(
+ final List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 40, 10 * 1024 * 1024, blockBitmap, expectedData);
assertEquals(1, cluster.getDataNodes().size());
cluster.stopDataNode(0);
@@ -114,8 +114,6 @@ public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
long taskAttemptId, List<ShuffleBlockInfo> blocks) {
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
- Map<Integer, List<Long>> partitionToBlockIds = Maps.newHashMap();
- Set<Long> expectBlockIds = getExpectBlockIds(blocks);
partitionToBlocks.put(partition, blocks);
shuffleToBlocks.put(shuffle, partitionToBlocks);
RssSendShuffleDataRequest rs = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
@@ -124,6 +122,9 @@ public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
shuffleServerClient.sendCommit(rc);
RssFinishShuffleRequest rf = new RssFinishShuffleRequest(appId, shuffle);
shuffleServerClient.finishShuffle(rf);
+
+ Map<Integer, List<Long>> partitionToBlockIds = Maps.newHashMap();
+ Set<Long> expectBlockIds = getExpectBlockIds(blocks);
partitionToBlockIds.put(shuffle, new ArrayList<>(expectBlockIds));
RssReportShuffleResultRequest rrp = new RssReportShuffleResultRequest(
appId, shuffle, taskAttemptId, partitionToBlockIds, 1);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/PartitionBalanceCoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/PartitionBalanceCoordinatorGrpcTest.java
index 7c9853bc..9e16773b 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/PartitionBalanceCoordinatorGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/PartitionBalanceCoordinatorGrpcTest.java
@@ -17,18 +17,19 @@
package org.apache.uniffle.test;
+import java.util.List;
+import java.util.Map;
+
import com.google.common.collect.Sets;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 731c39e9..913f7726 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -25,10 +25,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -39,6 +38,7 @@ import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.ClientType;
+import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -105,15 +105,15 @@ public class QuorumTest extends ShuffleReadWriteBase {
shuffleServers.add(createServer(4));
shuffleServerInfo0 =
- new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT + 0);
+ new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT + 0);
shuffleServerInfo1 =
- new ShuffleServerInfo("127.0.0.1-20002", shuffleServers.get(1).getIp(), SHUFFLE_SERVER_PORT + 1);
+ new ShuffleServerInfo("127.0.0.1-20002", shuffleServers.get(1).getIp(), SHUFFLE_SERVER_PORT + 1);
shuffleServerInfo2 =
- new ShuffleServerInfo("127.0.0.1-20003", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 2);
+ new ShuffleServerInfo("127.0.0.1-20003", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 2);
shuffleServerInfo3 =
- new ShuffleServerInfo("127.0.0.1-20004", shuffleServers.get(3).getIp(), SHUFFLE_SERVER_PORT + 3);
+ new ShuffleServerInfo("127.0.0.1-20004", shuffleServers.get(3).getIp(), SHUFFLE_SERVER_PORT + 3);
shuffleServerInfo4 =
- new ShuffleServerInfo("127.0.0.1-20005", shuffleServers.get(4).getIp(), SHUFFLE_SERVER_PORT + 4);
+ new ShuffleServerInfo("127.0.0.1-20005", shuffleServers.get(4).getIp(), SHUFFLE_SERVER_PORT + 4);
for (CoordinatorServer coordinator : coordinators) {
coordinator.start();
}
@@ -123,15 +123,15 @@ public class QuorumTest extends ShuffleReadWriteBase {
// simulator of failed servers
fakedShuffleServerInfo0 =
- new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT + 100);
+ new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT + 100);
fakedShuffleServerInfo1 =
- new ShuffleServerInfo("127.0.0.1-20002", shuffleServers.get(1).getIp(), SHUFFLE_SERVER_PORT + 200);
+ new ShuffleServerInfo("127.0.0.1-20002", shuffleServers.get(1).getIp(), SHUFFLE_SERVER_PORT + 200);
fakedShuffleServerInfo2 =
- new ShuffleServerInfo("127.0.0.1-20003", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 300);
+ new ShuffleServerInfo("127.0.0.1-20003", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 300);
fakedShuffleServerInfo3 =
- new ShuffleServerInfo("127.0.0.1-20004", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 400);
+ new ShuffleServerInfo("127.0.0.1-20004", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 400);
fakedShuffleServerInfo4 =
- new ShuffleServerInfo("127.0.0.1-20005", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 500);
+ new ShuffleServerInfo("127.0.0.1-20005", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 500);
Thread.sleep(2000);
}
@@ -147,7 +147,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
}
@BeforeEach
- public void InitEnv() throws Exception {
+ public void initEnv() throws Exception {
// spark.rss.data.replica=3
// spark.rss.data.replica.write=2
// spark.rss.data.replica.read=2
@@ -177,7 +177,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
@Test
- public void QuorumConfigTest() throws Exception {
+ public void quorumConfigTest() throws Exception {
try {
RssUtils.checkQuorumSetting(3, 1, 1);
fail(EXPECTED_EXCEPTION_MESSAGE);
@@ -207,10 +207,9 @@ public class QuorumTest extends ShuffleReadWriteBase {
// case1: When only 1 server is failed, the block sending should success
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2));
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -223,10 +222,12 @@ public class QuorumTest extends ShuffleReadWriteBase {
assertEquals(0, failedBlockIdBitmap.getLongCardinality());
assertEquals(blockIdBitmap, succBlockIdBitmap);
+ Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2), null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2),
+ null, new DefaultIdHelper());
// The data should be read
validateResult(readClient, expectedData);
@@ -253,12 +254,12 @@ public class QuorumTest extends ShuffleReadWriteBase {
private void enableTimeout(MockedShuffleServer server, long timeout) {
((MockedGrpcServer)server.getServer()).getService()
- .enableMockedTimeout(timeout);
+ .enableMockedTimeout(timeout);
}
private void disableTimeout(MockedShuffleServer server) {
((MockedGrpcServer)server.getServer()).getService()
- .disableMockedTimeout();
+ .disableMockedTimeout();
}
private void registerShuffleServer(String testAppId,
@@ -270,7 +271,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
List<ShuffleServerInfo> allServers = Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1,
shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
- for (int i = 0; i < replica; i ++) {
+ for (int i = 0; i < replica; i++) {
shuffleWriteClientImpl.registerShuffle(allServers.get(i),
testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo(""));
}
@@ -280,30 +281,28 @@ public class QuorumTest extends ShuffleReadWriteBase {
public void case1() throws Exception {
String testAppId = "case1";
registerShuffleServer(testAppId, 3, 2, 2, true);
- Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
// only 1 server is timout, the block sending should success
enableTimeout((MockedShuffleServer)shuffleServers.get(2), 500);
- List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
-
// report result should success
Map<Integer, List<Long>> partitionToBlockIds = Maps.newHashMap();
partitionToBlockIds.put(0, Lists.newArrayList(blockIdBitmap.stream().iterator()));
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
partitionToServers.put(0, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
shuffleWriteClientImpl.reportShuffleResult(partitionToServers, testAppId, 0, 0L,
- partitionToBlockIds, 1);
+ partitionToBlockIds, 1);
Roaring64NavigableMap report = shuffleWriteClientImpl.getShuffleResult("GRPC",
- Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
- testAppId, 0, 0);
+ Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
+ testAppId, 0, 0);
assertEquals(report, blockIdBitmap);
// data read should success
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ Map<Long, byte[]> expectedData = Maps.newHashMap();
+ List<ShuffleBlockInfo> blocks = createShuffleBlockList(
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Long blockId : result.getSuccessBlockIds()) {
@@ -312,10 +311,11 @@ public class QuorumTest extends ShuffleReadWriteBase {
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(blockIdBitmap, succBlockIdBitmap);
+ Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());
validateResult(readClient, expectedData);
}
@@ -332,8 +332,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
enableTimeout((MockedShuffleServer)shuffleServers.get(2), 500);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Long blockId : result.getFailedBlockIds()) {
@@ -349,16 +349,16 @@ public class QuorumTest extends ShuffleReadWriteBase {
partitionToServers.put(0, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
try {
shuffleWriteClientImpl.reportShuffleResult(partitionToServers, testAppId, 0, 0L,
- partitionToBlockIds, 1);
+ partitionToBlockIds, 1);
fail(EXPECTED_EXCEPTION_MESSAGE);
- } catch (Exception e){
+ } catch (Exception e) {
assertTrue(e.getMessage().startsWith("Quorum check of report shuffle result is failed"));
}
// get result should also fail
try {
shuffleWriteClientImpl.getShuffleResult("GRPC",
- Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
- testAppId, 0, 0);
+ Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
+ testAppId, 0, 0);
fail(EXPECTED_EXCEPTION_MESSAGE);
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Get shuffle result is failed"));
@@ -379,8 +379,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -398,11 +398,11 @@ public class QuorumTest extends ShuffleReadWriteBase {
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
partitionToServers.put(0, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
shuffleWriteClientImpl.reportShuffleResult(partitionToServers, testAppId, 0, 0L,
- partitionToBlockIds, 1);
+ partitionToBlockIds, 1);
Roaring64NavigableMap report = shuffleWriteClientImpl.getShuffleResult("GRPC",
- Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
- testAppId, 0, 0);
+ Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
+ testAppId, 0, 0);
assertEquals(report, blockIdBitmap);
// let this server be failed, the reading will be also be failed
@@ -436,17 +436,17 @@ public class QuorumTest extends ShuffleReadWriteBase {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
for (int i = 0; i < 5; i++) {
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
assertTrue(result.getSuccessBlockIds().size() == 3);
assertTrue(result.getFailedBlockIds().size() == 0);
}
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());
validateResult(readClient, expectedData);
}
@@ -458,9 +458,9 @@ public class QuorumTest extends ShuffleReadWriteBase {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
+ final List<ShuffleBlockInfo> blocks = createShuffleBlockList(
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
// report result should success
Map<Integer, List<Long>> partitionToBlockIds = Maps.newHashMap();
@@ -468,14 +468,13 @@ public class QuorumTest extends ShuffleReadWriteBase {
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
partitionToServers.put(0, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
shuffleWriteClientImpl.reportShuffleResult(partitionToServers, testAppId, 0, 0L,
- partitionToBlockIds, 1);
+ partitionToBlockIds, 1);
Roaring64NavigableMap report = shuffleWriteClientImpl.getShuffleResult("GRPC",
- Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
- testAppId, 0, 0);
+ Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
+ testAppId, 0, 0);
assertEquals(report, blockIdBitmap);
// data read should success
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Long blockId : result.getSuccessBlockIds()) {
@@ -505,7 +504,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Get shuffle result is failed"));
}
- }
+ }
@Test
public void case6() throws Exception {
@@ -517,14 +516,14 @@ public class QuorumTest extends ShuffleReadWriteBase {
Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
List<ShuffleBlockInfo> partition0 = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap0,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap0,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
List<ShuffleBlockInfo> partition1 = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap1,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap1,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
List<ShuffleBlockInfo> partition2 = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap2,
- expectedData, Lists.newArrayList(shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4));
+ 0, 0, 0, 3, 25, blockIdBitmap2,
+ expectedData, Lists.newArrayList(shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4));
// server 0,1,2 are ok, server 3,4 are timout
enableTimeout((MockedShuffleServer)shuffleServers.get(3), 500);
@@ -542,11 +541,11 @@ public class QuorumTest extends ShuffleReadWriteBase {
// report result should fail because partition2 is failed to report server 3,4
try {
- shuffleWriteClientImpl.reportShuffleResult(partitionToServers, testAppId, 0, 0L,
- partitionToBlockIds, 1);
- fail(EXPECTED_EXCEPTION_MESSAGE);
+ shuffleWriteClientImpl.reportShuffleResult(partitionToServers, testAppId, 0, 0L,
+ partitionToBlockIds, 1);
+ fail(EXPECTED_EXCEPTION_MESSAGE);
} catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Quorum check of report shuffle result is failed"));
+ assertTrue(e.getMessage().startsWith("Quorum check of report shuffle result is failed"));
}
}
@@ -562,8 +561,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
// attempt to send data to "all servers", but only the server 0,1 receive data actually
for (int i = 0; i < 5; i++) {
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
assertTrue(result.getSuccessBlockIds().size() == 3);
assertTrue(result.getFailedBlockIds().size() == 0);
@@ -571,9 +570,9 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 2
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo2), null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo2), null, new DefaultIdHelper());
assertTrue(readClient.readShuffleBlockData() == null);
// we can read blocks from server 0,1
@@ -605,8 +604,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
// secondary round: server 2
for (int i = 0; i < 5; i++) {
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, fakedShuffleServerInfo1, shuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, fakedShuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
assertTrue(result.getSuccessBlockIds().size() == 3);
assertTrue(result.getFailedBlockIds().size() == 0);
@@ -614,9 +613,9 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 1
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo1), null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo1), null, new DefaultIdHelper());
assertTrue(readClient.readShuffleBlockData() == null);
// we can read blocks from server 2, which is sent in to secondary round
@@ -647,8 +646,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
// attempt to send data to "all servers", but only the server 0,1,2 receive data actually
for (int i = 0; i < 5; i++) {
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2,
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2,
shuffleServerInfo3, shuffleServerInfo4));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
assertTrue(result.getSuccessBlockIds().size() == 3);
@@ -657,9 +656,9 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 3, 4
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4), null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo3, shuffleServerInfo4), null, new DefaultIdHelper());
assertTrue(readClient.readShuffleBlockData() == null);
// we can also read blocks from server 0,1,2
@@ -683,8 +682,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
// attempt to send data to "all servers", but the secondary round is activated due to failures in primary round.
for (int i = 0; i < 5; i++) {
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, fakedShuffleServerInfo1, shuffleServerInfo2,
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, fakedShuffleServerInfo1, shuffleServerInfo2,
shuffleServerInfo3, shuffleServerInfo4));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
assertTrue(result.getSuccessBlockIds().size() == 3);
@@ -693,10 +692,10 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 1 due to failures
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo1), null, new DefaultIdHelper());
- assertTrue(readClient.readShuffleBlockData() == null);
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo1), null, new DefaultIdHelper());
+ assertTrue(readClient.readShuffleBlockData() == null);
// we can also read blocks from server 3,4
readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
@@ -719,8 +718,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
// attempt to send data to "all servers", but only the server 0,1,2 receive data actually
for (int i = 0; i < 5; i++) {
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2,
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2,
shuffleServerInfo3, shuffleServerInfo4));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
assertTrue(result.getSuccessBlockIds().size() == 3);
@@ -729,16 +728,17 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we cannot read any blocks from server 4 because the secondary round is skipped
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo4), null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo4), null, new DefaultIdHelper());
assertTrue(readClient.readShuffleBlockData() == null);
// we can read blocks from server 0,1,2,3
readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
testAppId, 0, 0, 100, 1,
10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2, shuffleServerInfo3), null, new DefaultIdHelper());
+ Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2, shuffleServerInfo3),
+ null, new DefaultIdHelper());
validateResult(readClient, expectedData);
}
@@ -755,8 +755,8 @@ public class QuorumTest extends ShuffleReadWriteBase {
for (int i = 0; i < 5; i++) {
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 3, 25, blockIdBitmap,
- expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
+ 0, 0, 0, 3, 25, blockIdBitmap,
+ expectedData, Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
assertTrue(result.getSuccessBlockIds().size() == 3);
assertTrue(result.getFailedBlockIds().size() == 0);
@@ -764,9 +764,9 @@ public class QuorumTest extends ShuffleReadWriteBase {
// we can read blocks from server 0
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.MEMORY_LOCALFILE.name(),
- testAppId, 0, 0, 100, 1,
- 10, 1000, "", blockIdBitmap, taskIdBitmap,
- Lists.newArrayList(shuffleServerInfo0), null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1,
+ 10, 1000, "", blockIdBitmap, taskIdBitmap,
+ Lists.newArrayList(shuffleServerInfo0), null, new DefaultIdHelper());
validateResult(readClient, expectedData);
// we can also read blocks from server 1
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index 99992c9f..8c0a678c 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -17,9 +17,18 @@
package org.apache.uniffle.test;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
@@ -33,14 +42,7 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
@@ -57,7 +59,8 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
new Random().nextBytes(buf);
long seqno = ATOMIC_LONG.getAndIncrement();
- long blockId = (seqno << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH)) + taskAttemptId;
+ long blockId = (seqno << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+ + taskAttemptId;
blockIdBitmap.addLong(blockId);
dataMap.put(blockId, buf);
shuffleBlockInfoList.add(new ShuffleBlockInfo(
@@ -67,6 +70,16 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
return shuffleBlockInfoList;
}
+ public static List<ShuffleBlockInfo> createShuffleBlockList(int shuffleId, int partitionId, long taskAttemptId,
+ int blockNum, int length,
+ Roaring64NavigableMap blockIdBitmap,
+ Map<Long, byte[]> dataMap) {
+ List<ShuffleServerInfo> shuffleServerInfoList =
+ Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
+ return createShuffleBlockList(
+ shuffleId, partitionId, taskAttemptId, blockNum, length, blockIdBitmap, dataMap, shuffleServerInfoList);
+ }
+
public static Map<Integer, List<ShuffleBlockInfo>> createTestData(
Roaring64NavigableMap[] bitmaps,
Map<Long, byte[]> expectedData) {
@@ -89,14 +102,6 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
return partitionToBlocks;
}
- public static List<ShuffleBlockInfo> createShuffleBlockList(int shuffleId, int partitionId, long taskAttemptId,
- int blockNum, int length, Roaring64NavigableMap blockIdBitmap, Map<Long, byte[]> dataMap) {
- List<ShuffleServerInfo> shuffleServerInfoList =
- Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
- return createShuffleBlockList(
- shuffleId, partitionId, taskAttemptId, blockNum, length, blockIdBitmap, dataMap, shuffleServerInfoList);
- }
-
public static boolean compareByte(byte[] expected, ByteBuffer buffer) {
return TestUtils.compareByte(expected, buffer);
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index c35e8ac9..280de66a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -29,9 +29,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.protobuf.ByteString;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -76,6 +75,7 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
private ShuffleServerGrpcClient shuffleServerClient;
private AtomicInteger atomicInteger = new AtomicInteger(0);
private static Long EVENT_THRESHOLD_SIZE = 2048L;
+ private static final int GB = 1024 * 1024 * 1024;
@BeforeAll
public static void setupServers() throws Exception {
@@ -511,9 +511,9 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
public void rpcMetricsTest() {
String appId = "rpcMetricsTest";
int shuffleId = 0;
- double oldGrpcTotal = shuffleServers.get(0).getGrpcMetrics().getCounterGrpcTotal().get();
- double oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().
- get(ShuffleServerGrpcMetrics.REGISTER_SHUFFLE_METHOD).get();
+ final double oldGrpcTotal = shuffleServers.get(0).getGrpcMetrics().getCounterGrpcTotal().get();
+ double oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap()
+ .get(ShuffleServerGrpcMetrics.REGISTER_SHUFFLE_METHOD).get();
shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest(appId, shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)), ""));
double newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap()
@@ -647,7 +647,6 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
assertEquals(0, shuffleServers.get(0).getGrpcMetrics().getGaugeGrpcOpen().get(), 0.5);
oldValue = ShuffleServerMetrics.counterTotalRequireBufferFailed.get();
- int GB = 1024 * 1024 * 1024;
// the next two allocations will fail
assertEquals(shuffleServerClient.requirePreAllocation(GB, 0, 10), -1);
assertEquals(shuffleServerClient.requirePreAllocation(GB, 0, 10), -1);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
index ba94944d..41a4f7e4 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
@@ -23,11 +23,10 @@ import java.util.Map.Entry;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -38,6 +37,7 @@ import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -102,7 +102,8 @@ public class ShuffleServerWithHdfsTest extends ShuffleReadWriteBase {
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 0, 100, 2, 10, 1000,
- dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0), Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0), Lists.newArrayList(),
+ new Configuration(), new DefaultIdHelper());
assertNull(readClient.readShuffleBlockData());
shuffleServerClient.finishShuffle(rfsr);
@@ -131,22 +132,26 @@ public class ShuffleServerWithHdfsTest extends ShuffleReadWriteBase {
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 0, 100, 2, 10, 1000,
- dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0), Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ dataBasePath, bitmaps[0], Roaring64NavigableMap.bitmapOf(0), Lists.newArrayList(),
+ new Configuration(), new DefaultIdHelper());
validateResult(readClient, expectedData, bitmaps[0]);
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 1, 100, 2, 10, 1000,
- dataBasePath, bitmaps[1], Roaring64NavigableMap.bitmapOf(1), Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ dataBasePath, bitmaps[1], Roaring64NavigableMap.bitmapOf(1), Lists.newArrayList(),
+ new Configuration(), new DefaultIdHelper());
validateResult(readClient, expectedData, bitmaps[1]);
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 2, 100, 2, 10, 1000,
- dataBasePath, bitmaps[2], Roaring64NavigableMap.bitmapOf(2), Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ dataBasePath, bitmaps[2], Roaring64NavigableMap.bitmapOf(2), Lists.newArrayList(),
+ new Configuration(), new DefaultIdHelper());
validateResult(readClient, expectedData, bitmaps[2]);
readClient = new ShuffleReadClientImpl(StorageType.HDFS.name(),
appId, 0, 3, 100, 2, 10, 1000,
- dataBasePath, bitmaps[3], Roaring64NavigableMap.bitmapOf(3), Lists.newArrayList(), new Configuration(), new DefaultIdHelper());
+ dataBasePath, bitmaps[3], Roaring64NavigableMap.bitmapOf(3), Lists.newArrayList(),
+ new Configuration(), new DefaultIdHelper());
validateResult(readClient, expectedData, bitmaps[3]);
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
index 550903e1..bf07502a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
@@ -28,8 +28,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -97,11 +97,6 @@ public class ShuffleServerWithLocalTest extends ShuffleReadWriteBase {
Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = createTestData(bitmaps, expectedData);
- Set<Long> expectedBlockIds1 = transBitmapToSet(bitmaps[0]);
- Set<Long> expectedBlockIds2 = transBitmapToSet(bitmaps[1]);
- Set<Long> expectedBlockIds3 = transBitmapToSet(bitmaps[2]);
- Set<Long> expectedBlockIds4 = transBitmapToSet(bitmaps[3]);
-
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
shuffleToBlocks.put(0, partitionToBlocks);
@@ -113,6 +108,10 @@ public class ShuffleServerWithLocalTest extends ShuffleReadWriteBase {
RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(testAppId, 0);
shuffleServerClient.finishShuffle(rfsr);
+ final Set<Long> expectedBlockIds1 = transBitmapToSet(bitmaps[0]);
+ final Set<Long> expectedBlockIds2 = transBitmapToSet(bitmaps[1]);
+ final Set<Long> expectedBlockIds3 = transBitmapToSet(bitmaps[2]);
+ final Set<Long> expectedBlockIds4 = transBitmapToSet(bitmaps[3]);
ShuffleDataResult sdr = readShuffleData(
shuffleServerClient, testAppId, 0, 0, 2,
10, 1000, 0);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
index 4898af8d..592c980e 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
@@ -26,8 +26,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -94,16 +94,15 @@ public class ShuffleServerWithMemLocalHdfsTest extends ShuffleReadWriteBase {
int shuffleId = 0;
int partitionId = 0;
RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId, 0,
- Lists.newArrayList(new PartitionRange(0, 0)), REMOTE_STORAGE);
+ Lists.newArrayList(new PartitionRange(0, 0)), REMOTE_STORAGE);
shuffleServerClient.registerShuffle(rrsr);
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
Map<Long, byte[]> dataMap = Maps.newHashMap();
Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
bitmaps[0] = Roaring64NavigableMap.bitmapOf();
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- shuffleId, partitionId, 0, 3, 25,
- expectBlockIds, dataMap, mockSSI);
+ shuffleId, partitionId, 0, 3, 25,
+ expectBlockIds, dataMap, mockSSI);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(partitionId, blocks);
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
@@ -111,29 +110,29 @@ public class ShuffleServerWithMemLocalHdfsTest extends ShuffleReadWriteBase {
// send data to shuffle server's memory
RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
- testAppId, 3, 1000, shuffleToBlocks);
+ testAppId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
// read the 1-th segment from memory
MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient));
+ testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient));
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new LocalFileQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 0, 1, 3,
- 75, expectBlockIds, processBlockIds, Lists.newArrayList(shuffleServerClient));
- HdfsClientReadHandler hdfsClientReadHandler = new HdfsClientReadHandler(
- testAppId, shuffleId, partitionId, 0, 1, 3,
- 500, expectBlockIds, processBlockIds, REMOTE_STORAGE, conf);
+ testAppId, shuffleId, partitionId, 0, 1, 3,
+ 75, expectBlockIds, processBlockIds, Lists.newArrayList(shuffleServerClient));
+ HdfsClientReadHandler hdfsClientReadHandler = new HdfsClientReadHandler(testAppId, shuffleId, partitionId, 0, 1, 3,
+ 500, expectBlockIds, processBlockIds, REMOTE_STORAGE, conf);
ClientReadHandler[] handlers = new ClientReadHandler[3];
handlers[0] = memoryQuorumClientReadHandler;
handlers[1] = localFileQuorumClientReadHandler;
handlers[2] = hdfsClientReadHandler;
ComposedClientReadHandler composedClientReadHandler = new ComposedClientReadHandler(handlers);
- ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
Map<Long, byte[]> expectedData = Maps.newHashMap();
expectedData.clear();
expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
expectedData.put(blocks.get(1).getBlockId(), blocks.get(1).getData());
expectedData.put(blocks.get(2).getBlockId(), blocks.get(1).getData());
+ ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
validateResult(expectedData, sdr);
processBlockIds.addLong(blocks.get(0).getBlockId());
processBlockIds.addLong(blocks.get(1).getBlockId());
@@ -142,8 +141,8 @@ public class ShuffleServerWithMemLocalHdfsTest extends ShuffleReadWriteBase {
// send data to shuffle server, and wait until flush to LocalFile
List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
- shuffleId, partitionId, 0, 3, 50,
- expectBlockIds, dataMap, mockSSI);
+ shuffleId, partitionId, 0, 3, 50,
+ expectBlockIds, dataMap, mockSSI);
partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(partitionId, blocks2);
shuffleToBlocks = Maps.newHashMap();
@@ -174,8 +173,8 @@ public class ShuffleServerWithMemLocalHdfsTest extends ShuffleReadWriteBase {
// send data to shuffle server, and wait until flush to HDFS
List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
- shuffleId, partitionId, 0, 2, 200,
- expectBlockIds, dataMap, mockSSI);
+ shuffleId, partitionId, 0, 2, 200,
+ expectBlockIds, dataMap, mockSSI);
partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(partitionId, blocks3);
shuffleToBlocks = Maps.newHashMap();
@@ -199,11 +198,11 @@ public class ShuffleServerWithMemLocalHdfsTest extends ShuffleReadWriteBase {
sdr = composedClientReadHandler.readShuffleData();
assertNull(sdr);
- assert(composedClientReadHandler.getReadBlokNumInfo()
+ assert (composedClientReadHandler.getReadBlokNumInfo()
.contains("Client read 8 blocks [ hot:3 warm:3 cold:2 frozen:0 ]"));
- assert(composedClientReadHandler.getReadLengthInfo()
+ assert (composedClientReadHandler.getReadLengthInfo()
.contains("Client read 625 bytes [ hot:75 warm:150 cold:400 frozen:0 ]"));
- assert(composedClientReadHandler.getReadUncompressLengthInfo()
+ assert (composedClientReadHandler.getReadUncompressLengthInfo()
.contains("Client read 625 uncompressed bytes [ hot:75 warm:150 cold:400 frozen:0 ]"));
}
@@ -214,7 +213,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends ShuffleReadWriteBase {
fail("Timeout for flush data");
}
ShuffleBuffer shuffleBuffer = shuffleServers.get(0).getShuffleBufferManager()
- .getShuffleBuffer(appId, shuffleId, 0);
+ .getShuffleBuffer(appId, shuffleId, 0);
if (shuffleBuffer.getBlocks().size() == 0 && shuffleBuffer.getInFlushBlockMap().size() == 0) {
break;
}
@@ -224,8 +223,8 @@ public class ShuffleServerWithMemLocalHdfsTest extends ShuffleReadWriteBase {
}
protected void validateResult(
- Map<Long, byte[]> expectedData,
- ShuffleDataResult sdr) {
+ Map<Long, byte[]> expectedData,
+ ShuffleDataResult sdr) {
byte[] buffer = sdr.getData();
List<BufferSegment> bufferSegments = sdr.getBufferSegments();
assertEquals(expectedData.size(), bufferSegments.size());
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 3532e8ee..7e0123dc 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -26,8 +26,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -93,13 +93,12 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase {
Lists.newArrayList(new PartitionRange(0, 0)), "");
shuffleServerClient.registerShuffle(rrsr);
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
Map<Long, byte[]> dataMap = Maps.newHashMap();
Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
bitmaps[0] = Roaring64NavigableMap.bitmapOf();
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
shuffleId, partitionId, 0, 3, 25,
- expectBlockIds, dataMap, mockSSI);
+ expectBlockIds, dataMap, mockSSI);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(partitionId, blocks);
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
@@ -137,6 +136,7 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase {
assertEquals(0, sdr.getBufferSegments().size());
// case: read with ComposedClientReadHandler
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
testAppId, shuffleId, partitionId, 50, Lists.newArrayList(shuffleServerClient));
LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new LocalFileQuorumClientReadHandler(
@@ -154,12 +154,12 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase {
validateResult(expectedData, sdr);
// send data to shuffle server, flush should happen
- List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
- shuffleId, partitionId, 0, 3, 50,
- expectBlockIds, dataMap, mockSSI);
processBlockIds.addLong(blocks.get(0).getBlockId());
processBlockIds.addLong(blocks.get(1).getBlockId());
+ List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
+ shuffleId, partitionId, 0, 3, 50,
+ expectBlockIds, dataMap, mockSSI);
partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(partitionId, blocks2);
shuffleToBlocks = Maps.newHashMap();
@@ -214,16 +214,15 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase {
int shuffleId = 0;
int partitionId = 0;
RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId, 0,
- Lists.newArrayList(new PartitionRange(0, 0)), "");
+ Lists.newArrayList(new PartitionRange(0, 0)), "");
shuffleServerClient.registerShuffle(rrsr);
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
Map<Long, byte[]> dataMap = Maps.newHashMap();
Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
bitmaps[0] = Roaring64NavigableMap.bitmapOf();
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- shuffleId, partitionId, 0, 3, 25,
- expectBlockIds, dataMap, mockSSI);
+ shuffleId, partitionId, 0, 3, 25,
+ expectBlockIds, dataMap, mockSSI);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(partitionId, blocks);
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
@@ -231,25 +230,26 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase {
// send data to shuffle server's memory
RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
- testAppId, 3, 1000, shuffleToBlocks);
+ testAppId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
// read the 1-th segment from memory
+ Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient));
+ testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient));
LocalFileQuorumClientReadHandler localFileQuorumClientReadHandler = new LocalFileQuorumClientReadHandler(
- testAppId, shuffleId, partitionId, 0, 1, 3,
- 75, expectBlockIds, processBlockIds, Lists.newArrayList(shuffleServerClient));
+ testAppId, shuffleId, partitionId, 0, 1, 3,
+ 75, expectBlockIds, processBlockIds, Lists.newArrayList(shuffleServerClient));
ClientReadHandler[] handlers = new ClientReadHandler[2];
handlers[0] = memoryQuorumClientReadHandler;
handlers[1] = localFileQuorumClientReadHandler;
ComposedClientReadHandler composedClientReadHandler = new ComposedClientReadHandler(handlers);
- ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
Map<Long, byte[]> expectedData = Maps.newHashMap();
expectedData.clear();
expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
expectedData.put(blocks.get(1).getBlockId(), blocks.get(1).getData());
expectedData.put(blocks.get(2).getBlockId(), blocks.get(1).getData());
+ ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
validateResult(expectedData, sdr);
processBlockIds.addLong(blocks.get(0).getBlockId());
processBlockIds.addLong(blocks.get(1).getBlockId());
@@ -257,8 +257,8 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase {
// send data to shuffle server, and wait until flush finish
List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
- shuffleId, partitionId, 0, 3, 50,
- expectBlockIds, dataMap, mockSSI);
+ shuffleId, partitionId, 0, 3, 50,
+ expectBlockIds, dataMap, mockSSI);
partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(partitionId, blocks2);
shuffleToBlocks = Maps.newHashMap();
@@ -273,7 +273,7 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase {
fail("Timeout for flush data");
}
ShuffleBuffer shuffleBuffer = shuffleServers.get(0).getShuffleBufferManager()
- .getShuffleBuffer(testAppId, shuffleId, 0);
+ .getShuffleBuffer(testAppId, shuffleId, 0);
if (shuffleBuffer.getBlocks().size() == 0 && shuffleBuffer.getInFlushBlockMap().size() == 0) {
break;
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index ec7d2277..29532908 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -26,12 +26,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
-import org.apache.uniffle.client.util.DefaultIdHelper;
-import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.common.util.RetryUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -40,11 +37,14 @@ import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.ClientType;
import org.apache.uniffle.client.util.ClientUtils;
+import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -140,8 +140,8 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
shuffleServerInfo1, fakeShuffleServerInfo));
shuffleWriteClientImpl.reportShuffleResult(partitionToServers, testAppId, 0, 0, ptb, 2);
Roaring64NavigableMap report = shuffleWriteClientImpl.getShuffleResult("GRPC",
- Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo),
- testAppId, 0, 0);
+ Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo),
+ testAppId, 0, 0);
assertEquals(blockIdBitmap, report);
}
@@ -160,12 +160,12 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
partitionToServers.putIfAbsent(2, Lists.newArrayList(shuffleServerInfo2));
Map<Integer, List<Long>> partitionToBlocks = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
- for (int i = 0; i < 5; i++ ) {
+ for (int i = 0; i < 5; i++) {
blockIds.add(ClientUtils.getBlockId(1, 0, i));
}
partitionToBlocks.put(1, blockIds);
blockIds = Lists.newArrayList();
- for (int i = 0; i < 7; i++ ) {
+ for (int i = 0; i < 7; i++) {
blockIds.add(ClientUtils.getBlockId(2, 0, i));
}
partitionToBlocks.put(2, blockIds);
diff --git a/integration-test/mr/src/test/java/org/apache/uniffle/test/FailoverAppMaster.java b/integration-test/mr/src/test/java/org/apache/uniffle/test/FailoverAppMaster.java
index e4f2242f..464eb469 100644
--- a/integration-test/mr/src/test/java/org/apache/uniffle/test/FailoverAppMaster.java
+++ b/integration-test/mr/src/test/java/org/apache/uniffle/test/FailoverAppMaster.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.uniffle.test;
import org.apache.hadoop.mapreduce.RssMRUtils;
diff --git a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
index 3170fd35..1ceab307 100644
--- a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
+++ b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
@@ -51,6 +51,7 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
protected static MiniMRYarnCluster mrYarnCluster;
protected static FileSystem localFs;
+
static {
try {
localFs = FileSystem.getLocal(conf);
@@ -62,8 +63,8 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
private static Path TEST_ROOT_DIR = localFs.makeQualified(
new Path("target", TestMRJobs.class.getName() + "-tmpDir"));
static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
- private static final String OUTPUT_ROOT_DIR = "/tmp/" +
- TestMRJobs.class.getSimpleName();
+ private static final String OUTPUT_ROOT_DIR = "/tmp/"
+ + TestMRJobs.class.getSimpleName();
private static final Path TEST_RESOURCES_DIR = new Path(TEST_ROOT_DIR,
"localizedResources");
@@ -95,7 +96,7 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
JobConf appConf = new JobConf(mrYarnCluster.getConfig());
updateCommonConfiguration(appConf);
runOriginApp(appConf);
- String originPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
+ final String originPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
appConf = new JobConf(mrYarnCluster.getConfig());
updateCommonConfiguration(appConf);
runRssApp(appConf);
@@ -120,12 +121,13 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
private void runRssApp(Configuration jobConf) throws Exception {
URL url = MRIntegrationTestBase.class.getResource("/");
- String parentPath = new Path(url.getPath()).getParent()
+ final String parentPath = new Path(url.getPath()).getParent()
.getParent().getParent().getParent().toString();
if (System.getenv("JAVA_HOME") == null) {
throw new RuntimeException("We must set JAVA_HOME");
}
- jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-XX:+TraceClassLoading org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster");
+ jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS,
+ "-XX:+TraceClassLoading org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster");
jobConf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-XX:+TraceClassLoading -XX:MaxDirectMemorySize=419430400");
jobConf.setInt(MRJobConfig.MAP_MEMORY_MB, 500);
jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
@@ -149,7 +151,7 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
String[] splittedProps = props.split(":");
for (String prop : splittedProps) {
if (!prop.contains("classes") && !prop.contains("grpc") && !prop.contains("rss-")
- && !prop.contains("shuffle-storage")) {
+ && !prop.contains("shuffle-storage")) {
newProps = newProps + ":" + prop;
} else if (prop.contains("mr") && prop.contains("integration-test")) {
newProps = newProps + ":" + prop;
diff --git a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
index 25ad3b16..a26b7102 100644
--- a/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
+++ b/integration-test/mr/src/test/java/org/apache/uniffle/test/WordCountTest.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.uniffle.test;
import java.util.List;
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
index a9d6a34d..c6ad3fd5 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.DelegationRssShuffleManager;
-import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.ShuffleManager;
import org.apache.spark.shuffle.sort.SortShuffleManager;
import org.junit.jupiter.api.Test;
@@ -61,7 +61,7 @@ public class AutoAccessTest extends IntegrationTestBase {
printWriter.println(" spark.mock.2 overwrite-conf ");
printWriter.println(" spark.mock.3 true ");
printWriter.println("spark.rss.storage.type " + StorageType.MEMORY_LOCALFILE_HDFS.name());
- printWriter.println(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key()+ " expectedPath");
+ printWriter.println(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key() + " expectedPath");
printWriter.flush();
printWriter.close();
@@ -83,7 +83,8 @@ public class AutoAccessTest extends IntegrationTestBase {
coordinatorConf.setString("rss.coordinator.access.candidates.path", candidatesFile);
coordinatorConf.setString(
"rss.coordinator.access.checkers",
- "org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+ "org.apache.uniffle.coordinator.AccessCandidatesChecker,"
+ + "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
@@ -162,7 +163,7 @@ public class AutoAccessTest extends IntegrationTestBase {
printWriter.println(" spark.mock.2 overwrite-conf ");
printWriter.println(" spark.mock.3 false ");
printWriter.println("spark.rss.storage.type " + StorageType.MEMORY_LOCALFILE_HDFS.name());
- printWriter.println(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key()+ " expectedPathNew");
+ printWriter.println(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key() + " expectedPathNew");
printWriter.flush();
printWriter.close();
fs.rename(tmpPath, path);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
index 818908d0..dfacfa9c 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/CombineByKeyTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.util.Map;
+
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java
index ee5cde5e..9f259c4f 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java
@@ -25,8 +25,8 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
-import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.coordinator.CoordinatorConf;
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/GroupByKeyTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/GroupByKeyTest.java
index 3b8984f9..fe371755 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/GroupByKeyTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/GroupByKeyTest.java
@@ -17,10 +17,10 @@
package org.apache.uniffle.test;
-import com.google.common.collect.Lists;
import java.util.Map;
+import com.google.common.collect.Lists;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionTest.java
index 52dee77e..cb4186dd 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionTest.java
@@ -24,6 +24,7 @@ import java.nio.file.Files;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
index 9061a780..63a641ea 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
@@ -59,7 +59,7 @@ public class RepartitionWithMemoryRssTest extends RepartitionTest {
@Test
public void testMemoryRelease() throws Exception {
- String fileName = generateTextFile(10000, 10000);
+ final String fileName = generateTextFile(10000, 10000);
SparkConf sparkConf = createSparkConf();
updateSparkConfWithRss(sparkConf);
sparkConf.set("spark.executor.memory", "500m");
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 4c4e1905..96ed80e3 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -24,11 +24,10 @@ import java.util.Map;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
-import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -39,6 +38,7 @@ import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -118,7 +118,7 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks);
@@ -172,7 +172,7 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
Map<Long, byte[]> expectedData1 = Maps.newHashMap();
Map<Long, byte[]> expectedData2 = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 10, 30, blockIdBitmap1, expectedData1, mockSSI);
sendTestData(testAppId, blocks);
@@ -189,7 +189,7 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
ShuffleReadClientImpl readClient1 = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
testAppId, 0, 0, 100, 2, 10, 100,
"", blockIdBitmap1, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
- ShuffleReadClientImpl readClient2 = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
+ final ShuffleReadClientImpl readClient2 = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
testAppId, 0, 1, 100, 2, 10, 100,
"", blockIdBitmap2, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
validateResult(readClient1, expectedData1);
@@ -249,7 +249,7 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI);
@@ -279,7 +279,7 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks);
@@ -315,7 +315,6 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks;
- ShuffleReadClientImpl readClient;
createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap);
Roaring64NavigableMap beforeAdded = RssUtils.cloneBitMap(blockIdBitmap);
@@ -324,6 +323,7 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
0, 0, 1, 3, 25, blockIdBitmap, Maps.newHashMap(), mockSSI);
sendTestData(testAppId, blocks);
// test with un-changed expected blockId
+ ShuffleReadClientImpl readClient;
readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), testAppId, 0, 0, 100, 1,
10, 1000, "", beforeAdded, taskIdBitmap,
shuffleServerInfo, null, new DefaultIdHelper());
@@ -348,14 +348,14 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap unexpectedBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
+ final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
// send some expected data
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
- 0, 0, 0, 2, 30, expectedBlockIds, expectedData, mockSSI);
+ 0, 0, 0, 2, 30, expectedBlockIds, expectedData, mockSSI);
sendTestData(testAppId, blocks);
// send some unexpected data
blocks = createShuffleBlockList(
- 0, 0, 0, 2, 30, unexpectedBlockIds,
+ 0, 0, 0, 2, 30, unexpectedBlockIds,
Maps.newHashMap(), mockSSI);
sendTestData(testAppId, blocks);
// send some expected data
@@ -363,8 +363,8 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
0, 0, 1, 2, 30, expectedBlockIds, expectedData, mockSSI);
sendTestData(testAppId, blocks);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
- testAppId, 0, 0, 100, 1, 10, 1000,
- "", expectedBlockIds, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
+ testAppId, 0, 0, 100, 1, 10, 1000,
+ "", expectedBlockIds, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index 3a38efb3..1ea90007 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -49,8 +49,8 @@ public abstract class SparkIntegrationTestBase extends IntegrationTestBase {
long start = System.currentTimeMillis();
updateCommonSparkConf(sparkConf);
- Map resultWithoutRss = runSparkApp(sparkConf, fileName);
- long durationWithoutRss = System.currentTimeMillis() - start;
+ final Map resultWithoutRss = runSparkApp(sparkConf, fileName);
+ final long durationWithoutRss = System.currentTimeMillis() - start;
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
updateSparkConfWithRss(sparkConf);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLTest.java
index 264d57f7..08eaf44a 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLTest.java
@@ -17,7 +17,6 @@
package org.apache.uniffle.test;
-import com.google.common.collect.Maps;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
@@ -25,6 +24,7 @@ import java.nio.file.Files;
import java.util.Map;
import java.util.Random;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
index 75836010..45d0e774 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
@@ -43,7 +43,8 @@ public class SparkSQLWithDelegationShuffleManager extends SparkSQLTest {
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
- "org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+ "org.apache.uniffle.coordinator.AccessCandidatesChecker,"
+ + "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, candidates);
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD, 1);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
index eace92a5..44120958 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
@@ -43,7 +43,8 @@ public class SparkSQLWithDelegationShuffleManagerFallback extends SparkSQLTest {
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
- "org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+ "org.apache.uniffle.coordinator.AccessCandidatesChecker,"
+ + "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, candidates);
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD, 1);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
index bed223e5..b7bdbfc7 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/TestUtils.java
@@ -28,6 +28,7 @@ public class TestUtils {
private TestUtils() {
}
+
static JavaPairRDD<String, Integer> getRDD(JavaSparkContext jsc) {
JavaPairRDD<String, Integer> javaPairRDD1 = jsc.parallelizePairs(Lists.newArrayList(
new Tuple2<>("cat1", 11), new Tuple2<>("dog", 22),
@@ -39,10 +40,10 @@ public class TestUtils {
static JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD(JavaPairRDD javaPairRDD1) {
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD = javaPairRDD1
.combineByKey((Function<Integer, Tuple2<Integer, Integer>>) i -> new Tuple2<>(1, i),
- (Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>) (tuple, i) ->
- new Tuple2<>(tuple._1 + 1, tuple._2 + i),
- (Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>) (tuple1, tuple2) ->
- new Tuple2<>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)
+ (Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>)
+ (tuple, i) -> new Tuple2<>(tuple._1 + 1, tuple._2 + i),
+ (Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>)
+ (tuple1, tuple2) -> new Tuple2<>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)
);
return javaPairRDD;
}
diff --git a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 05458859..6acb4f95 100644
--- a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -21,23 +21,22 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;
-import scala.Tuple2;
-
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.TaskContextImpl;
-import org.apache.spark.shuffle.reader.RssShuffleReader;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContextImpl;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.shuffle.RssShuffleHandle;
-import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.reader.RssShuffleReader;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Test;
+import scala.Tuple2;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java
index 316040a9..297a2386 100644
--- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java
+++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java
@@ -17,27 +17,26 @@
package org.apache.uniffle.test;
-
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
-
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
-import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AQERepartitionTest extends SparkIntegrationTestBase {
diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
index 60867faf..50e0c27e 100644
--- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
+++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
@@ -17,13 +17,12 @@
package org.apache.uniffle.test;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
-
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.sql.Dataset;
@@ -36,9 +35,9 @@ import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 95e2b39e..827e793c 100644
--- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -22,35 +22,34 @@ import java.io.PrintWriter;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import scala.Option;
-import scala.Tuple2;
-import scala.collection.Seq;
-import scala.collection.immutable.Map;
-
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.executor.TempShuffleReadMetrics;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.metrics.source.Source;
import org.apache.spark.resource.ResourceInformation;
import org.apache.spark.shuffle.FetchFailedException;
+import org.apache.spark.shuffle.RssShuffleHandle;
+import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.reader.RssShuffleReader;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.TaskCompletionListener;
import org.apache.spark.util.TaskFailureListener;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.ShuffleDependency;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.shuffle.RssShuffleHandle;
-import org.apache.spark.shuffle.RssSparkConfig;
-import org.apache.spark.shuffle.RssShuffleManager;
-import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Test;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.Seq;
+import scala.collection.immutable.Map;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
diff --git a/pom.xml b/pom.xml
index e14359ed..18011c27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -799,18 +799,14 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<version>${maven-checkstyle-plugin.version}</version>
<configuration>
- <sourceDirectories>
- <directory>${basedir}/src/main/java</directory>
- </sourceDirectories>
- <testSourceDirectories>
- <directory>${basedir}/src/test/java</directory>
- </testSourceDirectories>
<configLocation>checkstyle.xml</configLocation>
+ <suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<linkXRef>false</linkXRef>
<failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<dependencies>
<dependency>
diff --git a/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java b/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
index 71a16eda..18b16bb8 100644
--- a/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
+++ b/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
@@ -19,12 +19,13 @@ package org.apache.uniffle.server;
class HealthyMockChecker extends Checker {
- public HealthyMockChecker(ShuffleServerConf conf) {
- super(conf);
- }
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public HealthyMockChecker(ShuffleServerConf conf) {
+ super(conf);
+ }
- @Override
- boolean checkIsHealthy() {
- return true;
- }
+ @Override
+ boolean checkIsHealthy() {
+ return true;
+ }
}
diff --git a/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java b/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
index f7e6de8d..fa450345 100644
--- a/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
+++ b/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
@@ -23,11 +23,13 @@ import org.apache.uniffle.common.rpc.GrpcServer;
public class MockedGrpcServer extends GrpcServer {
MockedShuffleServerGrpcService service;
+
public MockedGrpcServer(RssBaseConf conf, MockedShuffleServerGrpcService service,
GRPCMetrics grpcMetrics) {
super(conf, service, grpcMetrics);
this.service = service;
}
+
public MockedShuffleServerGrpcService getService() {
return service;
}
diff --git a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
index fe04b230..a43a7504 100644
--- a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
+++ b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
@@ -24,6 +24,7 @@ import org.apache.uniffle.common.rpc.ServerInterface;
public class MockedShuffleServerFactory extends ShuffleServerFactory {
private static final Logger LOG = LoggerFactory.getLogger(MockedShuffleServerFactory.class);
+
public MockedShuffleServerFactory(MockedShuffleServer shuffleServer) {
super(shuffleServer);
}
diff --git a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
index 930f51c0..02b7c29a 100644
--- a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
+++ b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
@@ -17,6 +17,8 @@
package org.apache.uniffle.server;
+import java.util.concurrent.TimeUnit;
+
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
@@ -24,7 +26,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.proto.RssProtos;
-import java.util.concurrent.TimeUnit;
public class MockedShuffleServerGrpcService extends ShuffleServerGrpcService {
@@ -49,7 +50,7 @@ public class MockedShuffleServerGrpcService extends ShuffleServerGrpcService {
StreamObserver<RssProtos.SendShuffleDataResponse> responseObserver) {
if (mockedTimeout > 0) {
LOG.info("Add a mocked timeout on sendShuffleData");
- Uninterruptibles.sleepUninterruptibly(mockedTimeout, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(mockedTimeout, TimeUnit.MILLISECONDS);
}
super.sendShuffleData(request, responseObserver);
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
index 642d7ab3..4800c4d2 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
@@ -118,10 +118,10 @@ public class ShuffleFlushManagerOnKerberizedHdfsTest extends KerberizedHdfsBase
manager.addToFlushQueue(event2);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
- AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
+ AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
int size = storage.getHandlerSize();
assertEquals(2, size);
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index ba670a22..2ae581e7 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -63,8 +63,8 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -128,15 +128,15 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
ShuffleDataFlushEvent event1 =
createShuffleDataFlushEvent(appId, 1, 1, 1, null);
- List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
+ final List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
manager.addToFlushQueue(event1);
ShuffleDataFlushEvent event21 =
createShuffleDataFlushEvent(appId, 2, 2, 2, null);
- List<ShufflePartitionedBlock> blocks21 = event21.getShuffleBlocks();
+ final List<ShufflePartitionedBlock> blocks21 = event21.getShuffleBlocks();
manager.addToFlushQueue(event21);
ShuffleDataFlushEvent event22 =
createShuffleDataFlushEvent(appId, 2, 2, 2, null);
- List<ShufflePartitionedBlock> blocks22 = event22.getShuffleBlocks();
+ final List<ShufflePartitionedBlock> blocks22 = event22.getShuffleBlocks();
manager.addToFlushQueue(event22);
// wait for write data
waitForFlush(manager, appId, 1, 5);
@@ -170,7 +170,8 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
List<ShufflePartitionedBlock> expectedBlocks = Lists.newArrayList();
List<ShuffleDataFlushEvent> flushEvents1 = Lists.newArrayList();
List<ShuffleDataFlushEvent> flushEvents2 = Lists.newArrayList();
- ShuffleFlushManager manager = new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
+ ShuffleFlushManager manager = new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
+ mockShuffleServer, storageManager);
for (int i = 0; i < 30; i++) {
ShuffleDataFlushEvent flushEvent1 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
ShuffleDataFlushEvent flushEvent2 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
@@ -217,7 +218,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
manager.addToFlushQueue(event2);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
- AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
+ final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
@@ -260,8 +261,8 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
@Test
public void clearLocalTest(@TempDir File tempDir) throws Exception {
- String appId1 = "clearLocalTest_appId1";
- String appId2 = "clearLocalTest_appId2";
+ final String appId1 = "clearLocalTest_appId1";
+ final String appId2 = "clearLocalTest_appId2";
ShuffleServerConf serverConf = new ShuffleServerConf();
serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
@@ -277,7 +278,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
createShuffleDataFlushEvent(appId2, 1, 0, 1, null);
manager.addToFlushQueue(event2);
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
- AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
+ final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
@@ -289,7 +290,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
manager.removeResources(appId1);
assertFalse(file.exists());
ShuffleDataFlushEvent event3 =
- createShuffleDataFlushEvent(appId1, 1, 0, 1, () -> { return false; });
+ createShuffleDataFlushEvent(appId1, 1, 0, 1, () -> false);
manager.addToFlushQueue(event3);
Thread.sleep(1000);
assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
@@ -313,7 +314,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
try {
assertEquals(0, gauge.get(), delta);
match = true;
- } catch(Exception e) {
+ } catch (Exception e) {
// ignore
}
} while (!match);
@@ -422,7 +423,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(0, manager.getPendingEventsSize());
do {
Thread.sleep(1 * 1000);
- } while(manager.getEventNumInFlush() != 0);
+ } while (manager.getEventNumInFlush() != 0);
List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
ShuffleDataFlushEvent bigEvent = new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, blocks, null, null);
storageManager.updateWriteMetrics(bigEvent, 0);
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 56cab2ca..404a4fff 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.Future;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 690ad1d8..571b3c5d 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -119,9 +119,9 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
public void writeProcessTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
- String remoteStorage = HDFS_URI + "rss/test";
- String appId = "testAppId";
- int shuffleId = 1;
+ final String remoteStorage = HDFS_URI + "rss/test";
+ final String appId = "testAppId";
+ final int shuffleId = 1;
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
@@ -149,9 +149,9 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
new RemoteStorageInfo(remoteStorage),
StringUtils.EMPTY
);
- List<ShufflePartitionedBlock> expectedBlocks1 = Lists.newArrayList();
- List<ShufflePartitionedBlock> expectedBlocks2 = Lists.newArrayList();
- Map<Long, PreAllocatedBufferInfo> bufferIds = shuffleTaskManager.getRequireBufferIds();
+ final List<ShufflePartitionedBlock> expectedBlocks1 = Lists.newArrayList();
+ final List<ShufflePartitionedBlock> expectedBlocks2 = Lists.newArrayList();
+ final Map<Long, PreAllocatedBufferInfo> bufferIds = shuffleTaskManager.getRequireBufferIds();
shuffleTaskManager.requireBuffer(10);
shuffleTaskManager.requireBuffer(10);
@@ -253,7 +253,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
public void clearTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/clearTest";
- int shuffleId = 1;
+ final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
@@ -271,7 +271,8 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
- ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager, shuffleBufferManager, storageManager);
+ ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager,
+ shuffleBufferManager, storageManager);
shuffleTaskManager.registerShuffle(
"clearTest1",
shuffleId,
diff --git a/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java b/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
index 53389779..1282d7bd 100644
--- a/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
@@ -17,15 +17,16 @@
package org.apache.uniffle.server;
-import com.google.common.collect.Lists;
-import org.apache.uniffle.storage.common.LocalStorage;
-import org.apache.uniffle.storage.util.StorageType;
-import org.junit.jupiter.api.Test;
-
import java.io.File;
import java.util.Arrays;
import java.util.List;
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.util.StorageType;
+
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -66,7 +67,7 @@ public class StorageCheckerTest {
}
private class MockStorageChecker extends LocalStorageChecker {
- public MockStorageChecker(ShuffleServerConf conf, List<LocalStorage> storages) {
+ MockStorageChecker(ShuffleServerConf conf, List<LocalStorage> storages) {
super(conf, storages);
}
@@ -112,6 +113,7 @@ public class StorageCheckerTest {
break;
case 4:
result = 100;
+ break;
default:
break;
}
@@ -128,6 +130,8 @@ public class StorageCheckerTest {
break;
}
break;
+ default:
+ // ignore
}
return result;
}
diff --git a/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java b/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
index 92e32761..1629335e 100644
--- a/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
+++ b/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
class UnHealthyMockChecker extends Checker {
+ @SuppressWarnings("checkstyle:RedundantModifier")
public UnHealthyMockChecker(ShuffleServerConf conf) {
super(conf);
}
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
index d132c1cc..bfdd4895 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
@@ -20,13 +20,13 @@ package org.apache.uniffle.server.buffer;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.uniffle.server.ShuffleServerMetrics;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.server.ShuffleServerMetrics;
public abstract class BufferTestBase {
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 19a4ef3e..50c91f5a 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -173,7 +173,6 @@ public class ShuffleBufferManagerTest extends BufferTestBase {
ShufflePartitionedData spd2 = createData(0, 68);
ShufflePartitionedData spd3 = createData(0, 68);
ShufflePartitionedData spd4 = createData(0, 68);
- ShufflePartitionedData spd5 = createData(0, 68);
shuffleBufferManager.cacheShuffleData(appId1, 1, false, spd1);
shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd2);
shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd3);
@@ -185,6 +184,7 @@ public class ShuffleBufferManagerTest extends BufferTestBase {
assertEquals(200, shuffleSizeMap.get(appId1).get(2).get());
assertEquals(100, shuffleSizeMap.get(appId2).get(1).get());
+ ShufflePartitionedData spd5 = createData(0, 68);
shuffleBufferManager.cacheShuffleData(appId2, 2, false, spd5);
// flush happen
assertEquals(99, shuffleSizeMap.get(appId1).get(1).get());
@@ -333,7 +333,8 @@ public class ShuffleBufferManagerTest extends BufferTestBase {
public void bufferSizeTest() throws Exception {
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager = StorageManagerFactory.getInstance().createStorageManager(conf);
- ShuffleFlushManager shuffleFlushManager = new ShuffleFlushManager(conf, "serverId", mockShuffleServer, storageManager);
+ ShuffleFlushManager shuffleFlushManager = new ShuffleFlushManager(conf,
+ "serverId", mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(conf, shuffleFlushManager);
when(mockShuffleServer
@@ -405,7 +406,8 @@ public class ShuffleBufferManagerTest extends BufferTestBase {
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager = StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
- ShuffleFlushManager shuffleFlushManager = new ShuffleFlushManager(shuffleConf, "serverId", mockShuffleServer, storageManager);
+ ShuffleFlushManager shuffleFlushManager =
+ new ShuffleFlushManager(shuffleConf, "serverId", mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleConf, shuffleFlushManager);
when(mockShuffleServer
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
index 32a570ef..bb59f23c 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
@@ -17,17 +17,18 @@
package org.apache.uniffle.server.buffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Test;
+
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -144,9 +145,9 @@ public class ShuffleBufferTest extends BufferTestBase {
ShuffleDataFlushEvent event1 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
assertEquals(0, shuffleBuffer.getBlocks().size());
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 20);
- expectedData = getExpectedData(spd1, spd2);
compareBufferSegment(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()),
sdr.getBufferSegments(), 0, 2);
+ expectedData = getExpectedData(spd1, spd2);
assertArrayEquals(expectedData, sdr.getData());
// case5: flush data only, blockId = lastBlockId
@@ -174,21 +175,21 @@ public class ShuffleBufferTest extends BufferTestBase {
shuffleBuffer.append(spd4);
shuffleBuffer.append(spd5);
shuffleBuffer.append(spd6);
- ShuffleDataFlushEvent event2 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
+ final ShuffleDataFlushEvent event2 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
ShufflePartitionedData spd7 = createData(15);
ShufflePartitionedData spd8 = createData(15);
ShufflePartitionedData spd9 = createData(15);
shuffleBuffer.append(spd7);
shuffleBuffer.append(spd8);
shuffleBuffer.append(spd9);
- ShuffleDataFlushEvent event3 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
+ final ShuffleDataFlushEvent event3 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
ShufflePartitionedData spd10 = createData(15);
ShufflePartitionedData spd11 = createData(15);
ShufflePartitionedData spd12 = createData(15);
shuffleBuffer.append(spd10);
shuffleBuffer.append(spd11);
shuffleBuffer.append(spd12);
- ShuffleDataFlushEvent event4 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
+ final ShuffleDataFlushEvent event4 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
ShufflePartitionedData spd13 = createData(15);
ShufflePartitionedData spd14 = createData(15);
ShufflePartitionedData spd15 = createData(15);
@@ -430,7 +431,7 @@ public class ShuffleBufferTest extends BufferTestBase {
assertArrayEquals(expectedData, sdr.getData());
}
- private byte[] getExpectedData(ShufflePartitionedData... spds ) {
+ private byte[] getExpectedData(ShufflePartitionedData... spds) {
int size = 0;
for (ShufflePartitionedData spd : spds) {
size += spd.getBlockList()[0].getLength();
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index 04ef0729..e62fb18c 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -21,7 +21,6 @@ import java.util.Arrays;
import java.util.List;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.RemoteStorageInfo;
@@ -38,14 +37,14 @@ public class MultiStorageManagerTest {
@Test
public void selectStorageManagerTest() {
- String remoteStorage = "test";
- String appId = "selectStorageManagerTest_appId";
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
MultiStorageManager manager = new MultiStorageManager(conf);
+ String remoteStorage = "test";
+ String appId = "selectStorageManagerTest_appId";
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsHandlerTest.java
index 9eff9821..3f8ea968 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsHandlerTest.java
@@ -17,7 +17,6 @@
package org.apache.uniffle.storage.handler.impl;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;