You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by ch...@apache.org on 2023/03/25 15:20:39 UTC

[incubator-celeborn] branch main updated: [MINOR] Fix typo and remove unused code (#1381)

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c609c0eba [MINOR] Fix typo and remove unused code (#1381)
c609c0eba is described below

commit c609c0ebaad0826acbe6612a1a81a2c6583688d4
Author: Fei Wang <tu...@foxmail.com>
AuthorDate: Sat Mar 25 23:20:33 2023 +0800

    [MINOR] Fix typo and remove unused code (#1381)
    
    * fix typo
    
    * remove unused
---
 .../org/apache/celeborn/client/compress/Compressor.java    |  1 -
 .../org/apache/celeborn/client/compress/Decompressor.java  |  1 -
 .../org/apache/celeborn/client/compress/RssLz4Trait.java   |  2 --
 .../apache/celeborn/common/haclient/RssHARetryClient.java  | 14 +++++++-------
 .../deploy/master/clustermeta/AbstractMetaManager.java     |  2 +-
 .../org/apache/celeborn/server/common/HttpService.scala    |  1 -
 .../deploy/worker/storage/ReducePartitionFileWriter.java   |  2 +-
 .../org/apache/celeborn/service/deploy/worker/Worker.scala |  2 +-
 .../celeborn/service/deploy/worker/WorkerArguments.scala   |  1 -
 .../celeborn/service/deploy/worker/storage/FlushTask.scala |  2 +-
 .../service/deploy/worker/storage/StorageManager.scala     |  3 +--
 11 files changed, 12 insertions(+), 19 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/compress/Compressor.java b/client/src/main/java/org/apache/celeborn/client/compress/Compressor.java
index 5c7b7b7a9..4efa9dcb0 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/Compressor.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/Compressor.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.client.compress;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.CompressionCodec;
-import org.apache.celeborn.common.protocol.CompressionCodec.*;
 
 public interface Compressor {
 
diff --git a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
index 95e3b23a5..fdd0dcdff 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.client.compress;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.CompressionCodec;
-import org.apache.celeborn.common.protocol.CompressionCodec.*;
 
 public interface Decompressor {
 
diff --git a/client/src/main/java/org/apache/celeborn/client/compress/RssLz4Trait.java b/client/src/main/java/org/apache/celeborn/client/compress/RssLz4Trait.java
index c8d110823..9cb4d6eda 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/RssLz4Trait.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/RssLz4Trait.java
@@ -28,8 +28,6 @@ public abstract class RssLz4Trait {
           + 4 // decompressed length
           + 4; // checksum
 
-  protected static final int COMPRESSION_LEVEL_BASE = 10;
-
   protected static final int COMPRESSION_METHOD_RAW = 0x10;
   protected static final int COMPRESSION_METHOD_LZ4 = 0x20;
 
diff --git a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java b/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
index fa2d04240..3b5063a1f 100644
--- a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
@@ -69,7 +69,7 @@ public class RssHARetryClient {
     this.oneWayMessageSender = ThreadUtils.newDaemonSingleThreadExecutor("One-Way-Message-Sender");
   }
 
-  private static final String SPLITER = "#";
+  private static final String SPLITTER = "#";
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
 
   static long nextCallId() {
@@ -77,15 +77,15 @@ public class RssHARetryClient {
   }
 
   public static Tuple2<String, Long> decodeRequestId(String requestId) {
-    if (requestId.contains(SPLITER)) {
-      return new Tuple2<>(requestId.split(SPLITER)[0], Long.valueOf(requestId.split(SPLITER)[1]));
+    if (requestId.contains(SPLITTER)) {
+      return new Tuple2<>(requestId.split(SPLITTER)[0], Long.valueOf(requestId.split(SPLITTER)[1]));
     } else {
       return null;
     }
   }
 
   public static String encodeRequestId(String uuid, long callId) {
-    return String.format("%s%s%d", uuid, SPLITER, callId);
+    return String.format("%s%s%d", uuid, SPLITTER, callId);
   }
 
   /**
@@ -184,8 +184,8 @@ public class RssHARetryClient {
   }
 
   private void setRpcEndpointRef(String masterEndpoint) {
-    // This method should never care newer or old value, we just set the suggest master endpoint.
-    // If an error occurs when setting the suggest Master, it means that the Master may be down.
+    // This method should never care newer or old value, we just set the suggested master endpoint.
+    // If an error occurs when setting the suggested Master, it means that the Master may be down.
     // At this time, we just set `rpcEndpointRef` to null. Then next time, we will re-select the
     // Master and get the correct leader.
     rpcEndpointRef.set(setupEndpointRef(masterEndpoint));
@@ -206,7 +206,7 @@ public class RssHARetryClient {
    * return directly.
    *
    * <p>When `rpcEndpointRef` is empty, we need to assign a value to it and return this value; but
-   * because it is a multi-threaded environment, we need to ensure that the old value is still empty
+   * because it is a multi-thread environment, we need to ensure that the old value is still empty
    * when setting the value of `rpcEndpointRef`, otherwise we should use the new value of
    * `rpcEndpointRef`. Only if the setting is successful, update `currentIndex` to ensure that all
    * Masters can be used.
diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 1b15d5eff..f9cff9235 100644
--- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -48,7 +48,7 @@ import org.apache.celeborn.common.util.Utils;
 public abstract class AbstractMetaManager implements IMetadataHandler {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractMetaManager.class);
 
-  // Meta data for master service
+  // Metadata for master service
   public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
   public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
   public final ArrayList<WorkerInfo> workers = new ArrayList<>();
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 312cbb316..259b59d49 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -17,7 +17,6 @@
 
 package org.apache.celeborn.server.common
 
-import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.server.common.http.{HttpRequestHandler, HttpServer, HttpServerInitializer}
 
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
index e214df185..1e86b5dc2 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -30,7 +30,7 @@ import org.apache.celeborn.common.protocol.PartitionSplitMode;
 import org.apache.celeborn.common.protocol.PartitionType;
 
 /*
- * reduce partition file writer, it will create chunkindex
+ * reduce partition file writer, it will create chunk index
  */
 public final class ReducePartitionFileWriter extends FileWriter {
   private static final Logger logger = LoggerFactory.getLogger(ReducePartitionFileWriter.class);
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 105c29717..2137bff21 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -46,7 +46,7 @@ import org.apache.celeborn.common.rpc._
 import org.apache.celeborn.common.util.{ShutdownHookManager, ThreadUtils, Utils}
 import org.apache.celeborn.server.common.{HttpService, Service}
 import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
-import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, PartitionFilesSorter, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}
 
 private[celeborn] class Worker(
     override val conf: CelebornConf,
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
index 76280132a..674ae44da 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
@@ -26,7 +26,6 @@ class WorkerArguments(args: Array[String], conf: CelebornConf) {
 
   private var _host: Option[String] = None
   private var _port: Option[Int] = None
-  // var master: String = null
   // for local testing.
   private var _master: Option[String] = None
   private var _propertiesFile: Option[String] = None
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 340e9e9da..d00a3353a 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker.storage
 import java.nio.channels.FileChannel
 
 import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
-import org.apache.hadoop.fs.{FSDataOutputStream, Path}
+import org.apache.hadoop.fs.Path
 
 abstract private[worker] class FlushTask(
     val buffer: CompositeByteBuf,
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 8bd7693b9..32c097ca4 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.function.IntUnaryOperator
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 import scala.concurrent.duration._
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -50,7 +49,7 @@ import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
 
 final private[worker] class StorageManager(conf: CelebornConf, workerSource: AbstractSource)
   extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryPressureListener {
-  // mount point -> filewriter
+  // mount point -> file writer
   val workingDirWriters = new ConcurrentHashMap[File, ConcurrentHashMap[String, FileWriter]]()
 
   val (deviceInfos, diskInfos) = {