You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/02/22 07:19:58 UTC

[incubator-uniffle] branch master updated: [MINOR] refactor: address unchecked conversions (#623)

This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b347a2a [MINOR] refactor: address unchecked conversions (#623)
8b347a2a is described below

commit 8b347a2afe94afa738182180d7ad4be627f9e8a4
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Wed Feb 22 15:19:51 2023 +0800

    [MINOR] refactor: address unchecked conversions (#623)
    
    ### What changes were proposed in this pull request?
    
    Fix or supress some unchecked conventions.
    
    ### Why are the changes needed?
    
    Improve code quality and make the build log cleaner.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No tests are changed in this PR, existing tests should cover the change.
---
 .../java/org/apache/spark/shuffle/RssShuffleManager.java   | 12 ++++++------
 .../org/apache/spark/shuffle/reader/RssShuffleReader.java  |  4 ++--
 .../org/apache/spark/shuffle/writer/RssShuffleWriter.java  |  8 ++++----
 .../java/org/apache/spark/shuffle/RssShuffleManager.java   | 14 +++++++-------
 .../org/apache/spark/shuffle/reader/RssShuffleReader.java  |  6 +++---
 .../org/apache/spark/shuffle/writer/RssShuffleWriter.java  | 10 +++++-----
 .../apache/uniffle/client/impl/ShuffleWriteClientImpl.java |  2 +-
 .../org/apache/uniffle/common/config/ConfigOptions.java    |  1 +
 .../java/org/apache/uniffle/common/config/ConfigUtils.java |  4 +++-
 .../main/java/org/apache/uniffle/common/util/RssUtils.java |  1 +
 .../java/org/apache/uniffle/server/ShuffleTaskInfo.java    |  2 +-
 11 files changed, 34 insertions(+), 30 deletions(-)

diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c8c11ca6..5778f341 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -93,7 +93,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final String user;
   private final String uuid;
   private ThreadPoolExecutor threadPoolExecutor;
-  private EventLoop eventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
+  private EventLoop<AddBlockEvent> eventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
 
     @Override
     public void onReceive(AddBlockEvent event) {
@@ -238,7 +238,7 @@ public class RssShuffleManager implements ShuffleManager {
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
-      return new RssShuffleHandle(shuffleId,
+      return new RssShuffleHandle<>(shuffleId,
         appId,
         dependency.rdd().getNumPartitions(),
         dependency,
@@ -339,7 +339,7 @@ public class RssShuffleManager implements ShuffleManager {
   public <K, V> ShuffleWriter<K, V> getWriter(ShuffleHandle handle, int mapId,
       TaskContext context) {
     if (handle instanceof RssShuffleHandle) {
-      RssShuffleHandle rssHandle = (RssShuffleHandle) handle;
+      RssShuffleHandle<K, V, ?> rssHandle = (RssShuffleHandle<K, V, ?>) handle;
       appId = rssHandle.getAppId();
 
       int shuffleId = rssHandle.getShuffleId();
@@ -358,7 +358,7 @@ public class RssShuffleManager implements ShuffleManager {
       );
       taskToBufferManager.put(taskId, bufferManager);
 
-      return new RssShuffleWriter(rssHandle.getAppId(), shuffleId, taskId, context.taskAttemptId(), bufferManager,
+      return new RssShuffleWriter<>(rssHandle.getAppId(), shuffleId, taskId, context.taskAttemptId(), bufferManager,
           writeMetrics, this, sparkConf, shuffleWriteClient, rssHandle,
           (Function<String, Boolean>) this::markFailedTask);
     } else {
@@ -374,7 +374,7 @@ public class RssShuffleManager implements ShuffleManager {
     if (handle instanceof RssShuffleHandle) {
       final String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
       final int indexReadLimit = sparkConf.get(RssSparkConfig.RSS_INDEX_READ_LIMIT);
-      RssShuffleHandle rssShuffleHandle = (RssShuffleHandle) handle;
+      RssShuffleHandle<K, C, ?> rssShuffleHandle = (RssShuffleHandle<K, C, ?>) handle;
       final int partitionNumPerRange = sparkConf.get(RssSparkConfig.RSS_PARTITION_NUM_PER_RANGE);
       final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
       long readBufferSize = sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(),
@@ -446,7 +446,7 @@ public class RssShuffleManager implements ShuffleManager {
     throw new RuntimeException("RssShuffleManager.shuffleBlockResolver is not implemented");
   }
 
-  public EventLoop getEventLoop() {
+  public EventLoop<AddBlockEvent> getEventLoop() {
     return eventLoop;
   }
 
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index ff1eac6e..2a8fd81d 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -77,7 +77,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
       int startPartition,
       int endPartition,
       TaskContext context,
-      RssShuffleHandle rssShuffleHandle,
+      RssShuffleHandle<K, C, ?> rssShuffleHandle,
       String basePath,
       int indexReadLimit,
       Configuration hadoopConf,
@@ -154,7 +154,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
 
     if (shuffleDependency.keyOrdering().isDefined()) {
       // Create an ExternalSorter to sort the data
-      ExternalSorter sorter = new ExternalSorter<K, C, C>(context, Option.empty(), Option.empty(),
+      ExternalSorter<K, C, C> sorter = new ExternalSorter<>(context, Option.empty(), Option.empty(),
           shuffleDependency.keyOrdering(), serializer);
       LOG.info("Inserting aggregated records to sorter");
       long startTime = System.currentTimeMillis();
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 0def3b68..bd8c12b8 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -95,7 +95,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       RssShuffleManager shuffleManager,
       SparkConf sparkConf,
       ShuffleWriteClient shuffleWriteClient,
-      RssShuffleHandle rssHandle) {
+      RssShuffleHandle<K, V, C> rssHandle) {
     this(
         appId,
         shuffleId,
@@ -121,7 +121,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       RssShuffleManager shuffleManager,
       SparkConf sparkConf,
       ShuffleWriteClient shuffleWriteClient,
-      RssShuffleHandle rssHandle,
+      RssShuffleHandle<K, V, C> rssHandle,
       Function<String, Boolean> taskFailureCallback) {
     this.appId = appId;
     this.bufferManager = bufferManager;
@@ -178,7 +178,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       Product2<K, V> record = records.next();
       int partition = getPartition(record._1());
       if (shuffleDependency.mapSideCombine()) {
-        Function1 createCombiner = shuffleDependency.aggregator().get().createCombiner();
+        Function1<V, C> createCombiner = shuffleDependency.aggregator().get().createCombiner();
         Object c = createCombiner.apply(record._2());
         shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), c);
       } else {
@@ -352,7 +352,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
   }
 
   @VisibleForTesting
-  protected <K> int getPartition(K key) {
+  protected <T> int getPartition(T key) {
     int result = 0;
     if (shouldPartition) {
       result = partitioner.getPartition(key);
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 8d17c093..a37453ee 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -102,8 +102,8 @@ public class RssShuffleManager implements ShuffleManager {
   private String user;
   private String uuid;
   private Set<String> failedTaskIds = Sets.newConcurrentHashSet();
-  private final EventLoop eventLoop;
-  private final EventLoop defaultEventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
+  private final EventLoop<AddBlockEvent> eventLoop;
+  private final EventLoop<AddBlockEvent> defaultEventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {
 
     @Override
     public void onReceive(AddBlockEvent event) {
@@ -305,7 +305,7 @@ public class RssShuffleManager implements ShuffleManager {
     if (dependency.partitioner().numPartitions() == 0) {
       LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum is 0, "
           + "return the empty RssShuffleHandle directly");
-      return new RssShuffleHandle(shuffleId,
+      return new RssShuffleHandle<>(shuffleId,
         id.get(),
         dependency.rdd().getNumPartitions(),
         dependency,
@@ -348,7 +348,7 @@ public class RssShuffleManager implements ShuffleManager {
 
     LOG.info("RegisterShuffle with ShuffleId[" + shuffleId + "], partitionNum[" + partitionToServers.size()
         + "], shuffleServerForResult: " + partitionToServers);
-    return new RssShuffleHandle(shuffleId,
+    return new RssShuffleHandle<>(shuffleId,
         id.get(),
         dependency.rdd().getNumPartitions(),
         dependency,
@@ -365,7 +365,7 @@ public class RssShuffleManager implements ShuffleManager {
     if (!(handle instanceof RssShuffleHandle)) {
       throw new RuntimeException("Unexpected ShuffleHandle:" + handle.getClass().getName());
     }
-    RssShuffleHandle rssHandle = (RssShuffleHandle) handle;
+    RssShuffleHandle<K, V, ?> rssHandle = (RssShuffleHandle<K, V, ?>) handle;
     // todo: this implement is tricky, we should refactor it
     if (id.get() == null) {
       id.compareAndSet(null, rssHandle.getAppId());
@@ -385,7 +385,7 @@ public class RssShuffleManager implements ShuffleManager {
         writeMetrics, RssSparkConfig.toRssConf(sparkConf));
     taskToBufferManager.put(taskId, bufferManager);
     LOG.info("RssHandle appId {} shuffleId {} ", rssHandle.getAppId(), rssHandle.getShuffleId());
-    return new RssShuffleWriter(rssHandle.getAppId(), shuffleId, taskId, context.taskAttemptId(), bufferManager,
+    return new RssShuffleWriter<>(rssHandle.getAppId(), shuffleId, taskId, context.taskAttemptId(), bufferManager,
         writeMetrics, this, sparkConf, shuffleWriteClient, rssHandle,
         (Function<String, Boolean>) this::markFailedTask);
   }
@@ -461,7 +461,7 @@ public class RssShuffleManager implements ShuffleManager {
     }
     final String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
     final int indexReadLimit = sparkConf.get(RssSparkConfig.RSS_INDEX_READ_LIMIT);
-    RssShuffleHandle rssShuffleHandle = (RssShuffleHandle) handle;
+    RssShuffleHandle<K, C, ?> rssShuffleHandle = (RssShuffleHandle<K, C, ?>) handle;
     final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
     long readBufferSize = sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(),
         RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.defaultValue().get());
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 1ac1641a..4734b1aa 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -85,7 +85,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
       int mapStartIndex,
       int mapEndIndex,
       TaskContext context,
-      RssShuffleHandle rssShuffleHandle,
+      RssShuffleHandle<K, C, ?> rssShuffleHandle,
       String basePath,
       int indexReadLimit,
       Configuration hadoopConf,
@@ -142,7 +142,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
 
     if (shuffleDependency.keyOrdering().isDefined()) {
       // Create an ExternalSorter to sort the data
-      ExternalSorter sorter = new ExternalSorter<K, C, C>(context, Option.empty(), Option.empty(),
+      ExternalSorter<K, C, C> sorter = new ExternalSorter<>(context, Option.empty(), Option.empty(),
           shuffleDependency.keyOrdering(), serializer);
       LOG.info("Inserting aggregated records to sorter");
       long startTime = System.currentTimeMillis();
@@ -212,7 +212,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
             1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList,
             hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable);
         ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
-        RssShuffleDataIterator iterator = new RssShuffleDataIterator<K, C>(
+        RssShuffleDataIterator<K, C> iterator = new RssShuffleDataIterator<>(
             shuffleDependency.serializer(), shuffleReadClient,
             readMetrics, rssConf);
         CompletionIterator<Product2<K, C>, RssShuffleDataIterator<K, C>> completionIterator =
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 79bba0c5..ac69ed38 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -80,7 +80,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
   private final Map<Integer, Set<Long>> partitionToBlockIds;
   private final ShuffleWriteClient shuffleWriteClient;
   private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
-  private final Set shuffleServersForData;
+  private final Set<ShuffleServerInfo> shuffleServersForData;
   private final long[] partitionLengths;
   private boolean isMemoryShuffleEnabled;
   private final Function<String, Boolean> taskFailureCallback;
@@ -95,7 +95,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       RssShuffleManager shuffleManager,
       SparkConf sparkConf,
       ShuffleWriteClient shuffleWriteClient,
-      RssShuffleHandle rssHandle) {
+      RssShuffleHandle<K, V, C> rssHandle) {
     this(
         appId,
         shuffleId,
@@ -121,7 +121,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
       RssShuffleManager shuffleManager,
       SparkConf sparkConf,
       ShuffleWriteClient shuffleWriteClient,
-      RssShuffleHandle rssHandle,
+      RssShuffleHandle<K, V, C> rssHandle,
       Function<String, Boolean> taskFailureCallback) {
     LOG.warn("RssShuffle start write taskAttemptId data" + taskAttemptId);
     this.shuffleManager = shuffleManager;
@@ -169,7 +169,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
     List<ShuffleBlockInfo> shuffleBlockInfos;
     Set<Long> blockIds = Sets.newHashSet();
     boolean isCombine = shuffleDependency.mapSideCombine();
-    Function1 createCombiner = null;
+    Function1<V, C> createCombiner = null;
     if (isCombine) {
       createCombiner = shuffleDependency.aggregator().get().createCombiner();
     }
@@ -314,7 +314,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
   }
 
   @VisibleForTesting
-  protected <K> int getPartition(K key) {
+  protected <T> int getPartition(T key) {
     int result = 0;
     if (shouldPartition) {
       result = partitioner.getPartition(key);
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 27c7cfdd..b587c127 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -781,7 +781,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
   Set<ShuffleServerInfo> getAllShuffleServers(String appId) {
     Map<Integer, Set<ShuffleServerInfo>> appServerMap = shuffleServerInfoMap.get(appId);
     if (appServerMap == null) {
-      return Collections.EMPTY_SET;
+      return Collections.emptySet();
     }
     Set<ShuffleServerInfo> serverInfos = Sets.newHashSet();
     appServerMap.values().forEach(serverInfos::addAll);
diff --git a/common/src/main/java/org/apache/uniffle/common/config/ConfigOptions.java b/common/src/main/java/org/apache/uniffle/common/config/ConfigOptions.java
index 1baef1d9..bca0da28 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigOptions.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigOptions.java
@@ -241,6 +241,7 @@ public class ConfigOptions {
     private final Function<Object, E> atomicConverter;
     private Function<Object, List<E>> asListConverter;
 
+    @SuppressWarnings("unchecked")
     public ListConfigOptionBuilder(String key, Class<E> clazz, Function<Object, E> atomicConverter) {
       this.key = key;
       this.clazz = clazz;
diff --git a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index 66df92f7..bc0672eb 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -71,6 +71,7 @@ public class ConfigUtils {
     throw new IllegalArgumentException("Unsupported type: " + clazz);
   }
 
+  @SuppressWarnings("unchecked")
   public static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) {
     if (o.getClass().equals(clazz)) {
       return (E) o;
@@ -184,7 +185,8 @@ public class ConfigUtils {
     return Double.parseDouble(o.toString());
   }
 
-  public static List<ConfigOption<Object>> getAllConfigOptions(Class confClass) {
+  @SuppressWarnings("unchecked")
+  public static List<ConfigOption<Object>> getAllConfigOptions(Class<? extends RssBaseConf> confClass) {
     List<ConfigOption<Object>> configOptionList = Lists.newArrayList();
     try {
       Field[] fields = confClass.getFields();
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index c45c4fe1..f391a33e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -191,6 +191,7 @@ public class RssUtils {
     return String.join(Constants.KEY_SPLIT_CHAR, appId, String.valueOf(shuffleId), String.valueOf(partition));
   }
 
+  @SuppressWarnings("unchecked")
   public static <T> List<T> loadExtensions(Class<T> extClass, List<String> classes, Object obj) {
     if (classes == null || classes.isEmpty()) {
       throw new RuntimeException("Empty classes");
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index b5950377..dc76e5f8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -62,7 +62,7 @@ public class ShuffleTaskInfo {
   /**
    * shuffleId -> huge partitionIds set
    */
-  private final Map<Integer, Set> hugePartitionTags;
+  private final Map<Integer, Set<Integer>> hugePartitionTags;
   private final AtomicBoolean existHugePartition;
 
   public ShuffleTaskInfo(String appId) {