You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/02/22 07:19:58 UTC
[incubator-uniffle] branch master updated: [MINOR] refactor: address unchecked conversions (#623)
This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8b347a2a [MINOR] refactor: address unchecked conversions (#623)
8b347a2a is described below
commit 8b347a2afe94afa738182180d7ad4be627f9e8a4
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Wed Feb 22 15:19:51 2023 +0800
[MINOR] refactor: address unchecked conversions (#623)
### What changes were proposed in this pull request?
Fix or supress some unchecked conventions.
### Why are the changes needed?
Improve code quality and make the build log cleaner.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No tests are changed in this PR, existing tests should cover the change.
---
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 12 ++++++------
.../org/apache/spark/shuffle/reader/RssShuffleReader.java | 4 ++--
.../org/apache/spark/shuffle/writer/RssShuffleWriter.java | 8 ++++----
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 14 +++++++-------
.../org/apache/spark/shuffle/reader/RssShuffleReader.java | 6 +++---
.../org/apache/spark/shuffle/writer/RssShuffleWriter.java | 10 +++++-----
.../apache/uniffle/client/impl/ShuffleWriteClientImpl.java | 2 +-
.../org/apache/uniffle/common/config/ConfigOptions.java | 1 +
.../java/org/apache/uniffle/common/config/ConfigUtils.java | 4 +++-
.../main/java/org/apache/uniffle/common/util/RssUtils.java | 1 +
.../java/org/apache/uniffle/server/ShuffleTaskInfo.java | 2 +-
11 files changed, 34 insertions(+), 30 deletions(-)
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c8c11ca6..5778f341 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -93,7 +93,7 @@ public class RssShuffleManager implements ShuffleManager {
private final String user;
private final String uuid;
private ThreadPoolExecutor threadPoolExecutor;
- private EventLoop eventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
+ private EventLoop<AddBlockEvent> eventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
@Override
public void onReceive(AddBlockEvent event) {
@@ -238,7 +238,7 @@ public class RssShuffleManager implements ShuffleManager {
if (dependency.partitioner().numPartitions() == 0) {
LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
+ "return the empty RssShuffleHandle directly");
- return new RssShuffleHandle(shuffleId,
+ return new RssShuffleHandle<>(shuffleId,
appId,
dependency.rdd().getNumPartitions(),
dependency,
@@ -339,7 +339,7 @@ public class RssShuffleManager implements ShuffleManager {
public <K, V> ShuffleWriter<K, V> getWriter(ShuffleHandle handle, int mapId,
TaskContext context) {
if (handle instanceof RssShuffleHandle) {
- RssShuffleHandle rssHandle = (RssShuffleHandle) handle;
+ RssShuffleHandle<K, V, ?> rssHandle = (RssShuffleHandle<K, V, ?>) handle;
appId = rssHandle.getAppId();
int shuffleId = rssHandle.getShuffleId();
@@ -358,7 +358,7 @@ public class RssShuffleManager implements ShuffleManager {
);
taskToBufferManager.put(taskId, bufferManager);
- return new RssShuffleWriter(rssHandle.getAppId(), shuffleId, taskId, context.taskAttemptId(), bufferManager,
+ return new RssShuffleWriter<>(rssHandle.getAppId(), shuffleId, taskId, context.taskAttemptId(), bufferManager,
writeMetrics, this, sparkConf, shuffleWriteClient, rssHandle,
(Function<String, Boolean>) this::markFailedTask);
} else {
@@ -374,7 +374,7 @@ public class RssShuffleManager implements ShuffleManager {
if (handle instanceof RssShuffleHandle) {
final String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
final int indexReadLimit = sparkConf.get(RssSparkConfig.RSS_INDEX_READ_LIMIT);
- RssShuffleHandle rssShuffleHandle = (RssShuffleHandle) handle;
+ RssShuffleHandle<K, C, ?> rssShuffleHandle = (RssShuffleHandle<K, C, ?>) handle;
final int partitionNumPerRange = sparkConf.get(RssSparkConfig.RSS_PARTITION_NUM_PER_RANGE);
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
long readBufferSize = sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(),
@@ -446,7 +446,7 @@ public class RssShuffleManager implements ShuffleManager {
throw new RuntimeException("RssShuffleManager.shuffleBlockResolver is not implemented");
}
- public EventLoop getEventLoop() {
+ public EventLoop<AddBlockEvent> getEventLoop() {
return eventLoop;
}
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index ff1eac6e..2a8fd81d 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -77,7 +77,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
int startPartition,
int endPartition,
TaskContext context,
- RssShuffleHandle rssShuffleHandle,
+ RssShuffleHandle<K, C, ?> rssShuffleHandle,
String basePath,
int indexReadLimit,
Configuration hadoopConf,
@@ -154,7 +154,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
if (shuffleDependency.keyOrdering().isDefined()) {
// Create an ExternalSorter to sort the data
- ExternalSorter sorter = new ExternalSorter<K, C, C>(context, Option.empty(), Option.empty(),
+ ExternalSorter<K, C, C> sorter = new ExternalSorter<>(context, Option.empty(), Option.empty(),
shuffleDependency.keyOrdering(), serializer);
LOG.info("Inserting aggregated records to sorter");
long startTime = System.currentTimeMillis();
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 0def3b68..bd8c12b8 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -95,7 +95,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
RssShuffleManager shuffleManager,
SparkConf sparkConf,
ShuffleWriteClient shuffleWriteClient,
- RssShuffleHandle rssHandle) {
+ RssShuffleHandle<K, V, C> rssHandle) {
this(
appId,
shuffleId,
@@ -121,7 +121,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
RssShuffleManager shuffleManager,
SparkConf sparkConf,
ShuffleWriteClient shuffleWriteClient,
- RssShuffleHandle rssHandle,
+ RssShuffleHandle<K, V, C> rssHandle,
Function<String, Boolean> taskFailureCallback) {
this.appId = appId;
this.bufferManager = bufferManager;
@@ -178,7 +178,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
Product2<K, V> record = records.next();
int partition = getPartition(record._1());
if (shuffleDependency.mapSideCombine()) {
- Function1 createCombiner = shuffleDependency.aggregator().get().createCombiner();
+ Function1<V, C> createCombiner = shuffleDependency.aggregator().get().createCombiner();
Object c = createCombiner.apply(record._2());
shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), c);
} else {
@@ -352,7 +352,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
@VisibleForTesting
- protected <K> int getPartition(K key) {
+ protected <T> int getPartition(T key) {
int result = 0;
if (shouldPartition) {
result = partitioner.getPartition(key);
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 8d17c093..a37453ee 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -102,8 +102,8 @@ public class RssShuffleManager implements ShuffleManager {
private String user;
private String uuid;
private Set<String> failedTaskIds = Sets.newConcurrentHashSet();
- private final EventLoop eventLoop;
- private final EventLoop defaultEventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
+ private final EventLoop<AddBlockEvent> eventLoop;
+ private final EventLoop<AddBlockEvent> defaultEventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
@Override
public void onReceive(AddBlockEvent event) {
@@ -305,7 +305,7 @@ public class RssShuffleManager implements ShuffleManager {
if (dependency.partitioner().numPartitions() == 0) {
LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
+ "return the empty RssShuffleHandle directly");
- return new RssShuffleHandle(shuffleId,
+ return new RssShuffleHandle<>(shuffleId,
id.get(),
dependency.rdd().getNumPartitions(),
dependency,
@@ -348,7 +348,7 @@ public class RssShuffleManager implements ShuffleManager {
LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum[" + partitionToServers.size()
+ "], shuffleServerForResult: " + partitionToServers);
- return new RssShuffleHandle(shuffleId,
+ return new RssShuffleHandle<>(shuffleId,
id.get(),
dependency.rdd().getNumPartitions(),
dependency,
@@ -365,7 +365,7 @@ public class RssShuffleManager implements ShuffleManager {
if (!(handle instanceof RssShuffleHandle)) {
throw new RuntimeException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
- RssShuffleHandle rssHandle = (RssShuffleHandle) handle;
+ RssShuffleHandle<K, V, ?> rssHandle = (RssShuffleHandle<K, V, ?>) handle;
// todo: this implement is tricky, we should refactor it
if (id.get() == null) {
id.compareAndSet(null, rssHandle.getAppId());
@@ -385,7 +385,7 @@ public class RssShuffleManager implements ShuffleManager {
writeMetrics, RssSparkConfig.toRssConf(sparkConf));
taskToBufferManager.put(taskId, bufferManager);
LOG.info("RssHandle appId {} shuffleId {} ", rssHandle.getAppId(), rssHandle.getShuffleId());
- return new RssShuffleWriter(rssHandle.getAppId(), shuffleId, taskId, context.taskAttemptId(), bufferManager,
+ return new RssShuffleWriter<>(rssHandle.getAppId(), shuffleId, taskId, context.taskAttemptId(), bufferManager,
writeMetrics, this, sparkConf, shuffleWriteClient, rssHandle,
(Function<String, Boolean>) this::markFailedTask);
}
@@ -461,7 +461,7 @@ public class RssShuffleManager implements ShuffleManager {
}
final String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
final int indexReadLimit = sparkConf.get(RssSparkConfig.RSS_INDEX_READ_LIMIT);
- RssShuffleHandle rssShuffleHandle = (RssShuffleHandle) handle;
+ RssShuffleHandle<K, C, ?> rssShuffleHandle = (RssShuffleHandle<K, C, ?>) handle;
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
long readBufferSize = sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(),
RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.defaultValue().get());
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 1ac1641a..4734b1aa 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -85,7 +85,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
int mapStartIndex,
int mapEndIndex,
TaskContext context,
- RssShuffleHandle rssShuffleHandle,
+ RssShuffleHandle<K, C, ?> rssShuffleHandle,
String basePath,
int indexReadLimit,
Configuration hadoopConf,
@@ -142,7 +142,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
if (shuffleDependency.keyOrdering().isDefined()) {
// Create an ExternalSorter to sort the data
- ExternalSorter sorter = new ExternalSorter<K, C, C>(context, Option.empty(), Option.empty(),
+ ExternalSorter<K, C, C> sorter = new ExternalSorter<>(context, Option.empty(), Option.empty(),
shuffleDependency.keyOrdering(), serializer);
LOG.info("Inserting aggregated records to sorter");
long startTime = System.currentTimeMillis();
@@ -212,7 +212,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList,
hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable);
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
- RssShuffleDataIterator iterator = new RssShuffleDataIterator<K, C>(
+ RssShuffleDataIterator<K, C> iterator = new RssShuffleDataIterator<>(
shuffleDependency.serializer(), shuffleReadClient,
readMetrics, rssConf);
CompletionIterator<Product2<K, C>, RssShuffleDataIterator<K, C>> completionIterator =
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 79bba0c5..ac69ed38 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -80,7 +80,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final Map<Integer, Set<Long>> partitionToBlockIds;
private final ShuffleWriteClient shuffleWriteClient;
private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
- private final Set shuffleServersForData;
+ private final Set<ShuffleServerInfo> shuffleServersForData;
private final long[] partitionLengths;
private boolean isMemoryShuffleEnabled;
private final Function<String, Boolean> taskFailureCallback;
@@ -95,7 +95,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
RssShuffleManager shuffleManager,
SparkConf sparkConf,
ShuffleWriteClient shuffleWriteClient,
- RssShuffleHandle rssHandle) {
+ RssShuffleHandle<K, V, C> rssHandle) {
this(
appId,
shuffleId,
@@ -121,7 +121,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
RssShuffleManager shuffleManager,
SparkConf sparkConf,
ShuffleWriteClient shuffleWriteClient,
- RssShuffleHandle rssHandle,
+ RssShuffleHandle<K, V, C> rssHandle,
Function<String, Boolean> taskFailureCallback) {
LOG.warn("RssShuffle start write taskAttemptId data" + taskAttemptId);
this.shuffleManager = shuffleManager;
@@ -169,7 +169,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
List<ShuffleBlockInfo> shuffleBlockInfos;
Set<Long> blockIds = Sets.newHashSet();
boolean isCombine = shuffleDependency.mapSideCombine();
- Function1 createCombiner = null;
+ Function1<V, C> createCombiner = null;
if (isCombine) {
createCombiner = shuffleDependency.aggregator().get().createCombiner();
}
@@ -314,7 +314,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
}
@VisibleForTesting
- protected <K> int getPartition(K key) {
+ protected <T> int getPartition(T key) {
int result = 0;
if (shouldPartition) {
result = partitioner.getPartition(key);
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 27c7cfdd..b587c127 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -781,7 +781,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
Set<ShuffleServerInfo> getAllShuffleServers(String appId) {
Map<Integer, Set<ShuffleServerInfo>> appServerMap = shuffleServerInfoMap.get(appId);
if (appServerMap == null) {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
Set<ShuffleServerInfo> serverInfos = Sets.newHashSet();
appServerMap.values().forEach(serverInfos::addAll);
diff --git a/common/src/main/java/org/apache/uniffle/common/config/ConfigOptions.java b/common/src/main/java/org/apache/uniffle/common/config/ConfigOptions.java
index 1baef1d9..bca0da28 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigOptions.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigOptions.java
@@ -241,6 +241,7 @@ public class ConfigOptions {
private final Function<Object, E> atomicConverter;
private Function<Object, List<E>> asListConverter;
+ @SuppressWarnings("unchecked")
public ListConfigOptionBuilder(String key, Class<E> clazz, Function<Object, E> atomicConverter) {
this.key = key;
this.clazz = clazz;
diff --git a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index 66df92f7..bc0672eb 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -71,6 +71,7 @@ public class ConfigUtils {
throw new IllegalArgumentException("Unsupported type: " + clazz);
}
+ @SuppressWarnings("unchecked")
public static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) {
if (o.getClass().equals(clazz)) {
return (E) o;
@@ -184,7 +185,8 @@ public class ConfigUtils {
return Double.parseDouble(o.toString());
}
- public static List<ConfigOption<Object>> getAllConfigOptions(Class confClass) {
+ @SuppressWarnings("unchecked")
+ public static List<ConfigOption<Object>> getAllConfigOptions(Class<? extends RssBaseConf> confClass) {
List<ConfigOption<Object>> configOptionList = Lists.newArrayList();
try {
Field[] fields = confClass.getFields();
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index c45c4fe1..f391a33e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -191,6 +191,7 @@ public class RssUtils {
return String.join(Constants.KEY_SPLIT_CHAR, appId, String.valueOf(shuffleId), String.valueOf(partition));
}
+ @SuppressWarnings("unchecked")
public static <T> List<T> loadExtensions(Class<T> extClass, List<String> classes, Object obj) {
if (classes == null || classes.isEmpty()) {
throw new RuntimeException("Empty classes");
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index b5950377..dc76e5f8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -62,7 +62,7 @@ public class ShuffleTaskInfo {
/**
* shuffleId -> huge partitionIds set
*/
- private final Map<Integer, Set> hugePartitionTags;
+ private final Map<Integer, Set<Integer>> hugePartitionTags;
private final AtomicBoolean existHugePartition;
public ShuffleTaskInfo(String appId) {