You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by ch...@apache.org on 2023/03/25 15:20:39 UTC
[incubator-celeborn] branch main updated: [MINOR] Fix typo and remove unused code (#1381)
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c609c0eba [MINOR] Fix typo and remove unused code (#1381)
c609c0eba is described below
commit c609c0ebaad0826acbe6612a1a81a2c6583688d4
Author: Fei Wang <tu...@foxmail.com>
AuthorDate: Sat Mar 25 23:20:33 2023 +0800
[MINOR] Fix typo and remove unused code (#1381)
* fix typo
* remove unused
---
.../org/apache/celeborn/client/compress/Compressor.java | 1 -
.../org/apache/celeborn/client/compress/Decompressor.java | 1 -
.../org/apache/celeborn/client/compress/RssLz4Trait.java | 2 --
.../apache/celeborn/common/haclient/RssHARetryClient.java | 14 +++++++-------
.../deploy/master/clustermeta/AbstractMetaManager.java | 2 +-
.../org/apache/celeborn/server/common/HttpService.scala | 1 -
.../deploy/worker/storage/ReducePartitionFileWriter.java | 2 +-
.../org/apache/celeborn/service/deploy/worker/Worker.scala | 2 +-
.../celeborn/service/deploy/worker/WorkerArguments.scala | 1 -
.../celeborn/service/deploy/worker/storage/FlushTask.scala | 2 +-
.../service/deploy/worker/storage/StorageManager.scala | 3 +--
11 files changed, 12 insertions(+), 19 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/compress/Compressor.java b/client/src/main/java/org/apache/celeborn/client/compress/Compressor.java
index 5c7b7b7a9..4efa9dcb0 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/Compressor.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/Compressor.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.client.compress;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
-import org.apache.celeborn.common.protocol.CompressionCodec.*;
public interface Compressor {
diff --git a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
index 95e3b23a5..fdd0dcdff 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/Decompressor.java
@@ -19,7 +19,6 @@ package org.apache.celeborn.client.compress;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
-import org.apache.celeborn.common.protocol.CompressionCodec.*;
public interface Decompressor {
diff --git a/client/src/main/java/org/apache/celeborn/client/compress/RssLz4Trait.java b/client/src/main/java/org/apache/celeborn/client/compress/RssLz4Trait.java
index c8d110823..9cb4d6eda 100644
--- a/client/src/main/java/org/apache/celeborn/client/compress/RssLz4Trait.java
+++ b/client/src/main/java/org/apache/celeborn/client/compress/RssLz4Trait.java
@@ -28,8 +28,6 @@ public abstract class RssLz4Trait {
+ 4 // decompressed length
+ 4; // checksum
- protected static final int COMPRESSION_LEVEL_BASE = 10;
-
protected static final int COMPRESSION_METHOD_RAW = 0x10;
protected static final int COMPRESSION_METHOD_LZ4 = 0x20;
diff --git a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java b/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
index fa2d04240..3b5063a1f 100644
--- a/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/haclient/RssHARetryClient.java
@@ -69,7 +69,7 @@ public class RssHARetryClient {
this.oneWayMessageSender = ThreadUtils.newDaemonSingleThreadExecutor("One-Way-Message-Sender");
}
- private static final String SPLITER = "#";
+ private static final String SPLITTER = "#";
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
static long nextCallId() {
@@ -77,15 +77,15 @@ public class RssHARetryClient {
}
public static Tuple2<String, Long> decodeRequestId(String requestId) {
- if (requestId.contains(SPLITER)) {
- return new Tuple2<>(requestId.split(SPLITER)[0], Long.valueOf(requestId.split(SPLITER)[1]));
+ if (requestId.contains(SPLITTER)) {
+ return new Tuple2<>(requestId.split(SPLITTER)[0], Long.valueOf(requestId.split(SPLITTER)[1]));
} else {
return null;
}
}
public static String encodeRequestId(String uuid, long callId) {
- return String.format("%s%s%d", uuid, SPLITER, callId);
+ return String.format("%s%s%d", uuid, SPLITTER, callId);
}
/**
@@ -184,8 +184,8 @@ public class RssHARetryClient {
}
private void setRpcEndpointRef(String masterEndpoint) {
- // This method should never care newer or old value, we just set the suggest master endpoint.
- // If an error occurs when setting the suggest Master, it means that the Master may be down.
+ // This method should never care newer or old value, we just set the suggested master endpoint.
+ // If an error occurs when setting the suggested Master, it means that the Master may be down.
// At this time, we just set `rpcEndpointRef` to null. Then next time, we will re-select the
// Master and get the correct leader.
rpcEndpointRef.set(setupEndpointRef(masterEndpoint));
@@ -206,7 +206,7 @@ public class RssHARetryClient {
* return directly.
*
* <p>When `rpcEndpointRef` is empty, we need to assign a value to it and return this value; but
- * because it is a multi-threaded environment, we need to ensure that the old value is still empty
+ * because it is a multi-thread environment, we need to ensure that the old value is still empty
* when setting the value of `rpcEndpointRef`, otherwise we should use the new value of
* `rpcEndpointRef`. Only if the setting is successful, update `currentIndex` to ensure that all
* Masters can be used.
diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 1b15d5eff..f9cff9235 100644
--- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -48,7 +48,7 @@ import org.apache.celeborn.common.util.Utils;
public abstract class AbstractMetaManager implements IMetadataHandler {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMetaManager.class);
- // Meta data for master service
+ // Metadata for master service
public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
public final ArrayList<WorkerInfo> workers = new ArrayList<>();
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 312cbb316..259b59d49 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -17,7 +17,6 @@
package org.apache.celeborn.server.common
-import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.server.common.http.{HttpRequestHandler, HttpServer, HttpServerInitializer}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
index e214df185..1e86b5dc2 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -30,7 +30,7 @@ import org.apache.celeborn.common.protocol.PartitionSplitMode;
import org.apache.celeborn.common.protocol.PartitionType;
/*
- * reduce partition file writer, it will create chunkindex
+ * reduce partition file writer, it will create chunk index
*/
public final class ReducePartitionFileWriter extends FileWriter {
private static final Logger logger = LoggerFactory.getLogger(ReducePartitionFileWriter.class);
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 105c29717..2137bff21 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -46,7 +46,7 @@ import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.util.{ShutdownHookManager, ThreadUtils, Utils}
import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
-import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, PartitionFilesSorter, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}
private[celeborn] class Worker(
override val conf: CelebornConf,
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
index 76280132a..674ae44da 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerArguments.scala
@@ -26,7 +26,6 @@ class WorkerArguments(args: Array[String], conf: CelebornConf) {
private var _host: Option[String] = None
private var _port: Option[Int] = None
- // var master: String = null
// for local testing.
private var _master: Option[String] = None
private var _propertiesFile: Option[String] = None
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 340e9e9da..d00a3353a 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker.storage
import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
-import org.apache.hadoop.fs.{FSDataOutputStream, Path}
+import org.apache.hadoop.fs.Path
abstract private[worker] class FlushTask(
val buffer: CompositeByteBuf,
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 8bd7693b9..32c097ca4 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.function.IntUnaryOperator
import scala.collection.JavaConverters._
-import scala.collection.mutable
import scala.concurrent.duration._
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -50,7 +49,7 @@ import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
final private[worker] class StorageManager(conf: CelebornConf, workerSource: AbstractSource)
extends ShuffleRecoverHelper with DeviceObserver with Logging with MemoryPressureListener {
- // mount point -> filewriter
+ // mount point -> file writer
val workingDirWriters = new ConcurrentHashMap[File, ConcurrentHashMap[String, FileWriter]]()
val (deviceInfos, diskInfos) = {