You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/10/24 14:09:19 UTC
[incubator-celeborn] branch main updated: [CELEBORN-1084] Initialize `workerSource` member to prevent `NullPointException`
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 875ad1fa8 [CELEBORN-1084] Initialize `workerSource` member to prevent `NullPointException`
875ad1fa8 is described below
commit 875ad1fa8e579ca3c42446295f68f39dded66922
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Tue Oct 24 22:09:11 2023 +0800
[CELEBORN-1084] Initialize `workerSource` member to prevent `NullPointException`
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR addresses a NPE issue that occurs when the `workerSource` member is accessed before it is initialized. To resolve this issue, we initialize the `workerSource` member when the handlers are created.
```
23/10/24 16:27:03,363 ERROR [fetch-server-11-1] TransportChannelHandler: Exception from request handler while channel is active
java.lang.NullPointerException
at org.apache.celeborn.service.deploy.worker.FetchHandler.channelActive(FetchHandler.scala:412)
at org.apache.celeborn.common.network.server.TransportRequestHandler.channelActive(TransportRequestHandler.java:66)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelActive(TransportChannelHandler.java:120)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
at io.netty.handler.timeout.IdleStateHandler.channelActive(IdleStateHandler.java:271)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:260)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:258)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:522)
at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
...
23/10/24 16:27:03,423 INFO [main] Worker: Starting Worker <ip>:<port1>:<port2>:<port3> with {/data1=DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data1, usableSpace: 250.0 GiB, avgFlushTime: 0 ns, avgFetchTime: 0 ns, activeSlots: 0) status: HEALTHY dirs /data1/celeborn/worker/celeborn-worker/shuffle_data, /data=DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data, usableSpace: 250.0 GiB, avgFlushTime: 0 ns, avgFet [...]
...
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #2034 from cfmcgrady/fix-start-worker-npe.
Authored-by: Fu Chen <cf...@gmail.com>
Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
.../org/apache/celeborn/service/deploy/worker/Controller.scala | 5 ++---
.../org/apache/celeborn/service/deploy/worker/FetchHandler.scala | 7 ++++---
.../apache/celeborn/service/deploy/worker/PushDataHandler.scala | 4 +---
.../scala/org/apache/celeborn/service/deploy/worker/Worker.scala | 8 ++++----
.../apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java | 7 +++----
.../celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java | 3 +--
6 files changed, 15 insertions(+), 19 deletions(-)
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 464806ade..f65b7f36c 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -43,10 +43,10 @@ import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, MapPartiti
private[deploy] class Controller(
override val rpcEnv: RpcEnv,
val conf: CelebornConf,
- val metricsSystem: MetricsSystem)
+ val metricsSystem: MetricsSystem,
+ val workerSource: WorkerSource)
extends RpcEndpoint with Logging {
- var workerSource: WorkerSource = _
var storageManager: StorageManager = _
var shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] = _
// shuffleKey -> (epoch -> CommitInfo)
@@ -65,7 +65,6 @@ private[deploy] class Controller(
val testRetryCommitFiles = conf.testRetryCommitFiles
def init(worker: Worker): Unit = {
- workerSource = worker.workerSource
storageManager = worker.storageManager
shufflePartitionType = worker.shufflePartitionType
shufflePushDataTimeout = worker.shufflePushDataTimeout
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index cbb575e68..38fd353eb 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -41,7 +41,10 @@ import org.apache.celeborn.common.protocol.{MessageType, PartitionType, PbBuffer
import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager, CreditStreamManager, PartitionFilesSorter, StorageManager}
-class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
+class FetchHandler(
+ val conf: CelebornConf,
+ val transportConf: TransportConf,
+ val workerSource: WorkerSource)
extends BaseMessageHandler with Logging {
val chunkStreamManager = new ChunkStreamManager()
@@ -52,13 +55,11 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
conf.partitionReadBuffersMax,
conf.creditStreamThreadsPerMountpoint,
conf.readBuffersToTriggerReadMin)
- var workerSource: WorkerSource = _
var storageManager: StorageManager = _
var partitionsSorter: PartitionFilesSorter = _
var registered: AtomicBoolean = new AtomicBoolean(false)
def init(worker: Worker): Unit = {
- this.workerSource = worker.workerSource
workerSource.addGauge(WorkerSource.CREDIT_STREAM_COUNT) { () =>
creditStreamManager.getStreamsCount
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 4cf8456de..86e6b55ca 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -42,9 +42,8 @@ import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}
-class PushDataHandler extends BaseMessageHandler with Logging {
+class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler with Logging {
- private var workerSource: WorkerSource = _
private var partitionLocationInfo: WorkerPartitionLocationInfo = _
private var shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] = _
private var shufflePartitionType: ConcurrentHashMap[String, PartitionType] = _
@@ -66,7 +65,6 @@ class PushDataHandler extends BaseMessageHandler with Logging {
private var testPushReplicaDataTimeout: Boolean = _
def init(worker: Worker): Unit = {
- workerSource = worker.workerSource
partitionLocationInfo = worker.partitionLocationInfo
shufflePartitionType = worker.shufflePartitionType
shufflePushDataTimeout = worker.shufflePushDataTimeout
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 1cc85db3d..1c15efb4b 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -131,10 +131,10 @@ private[celeborn] class Worker(
conf.workerCongestionControlUserInactiveIntervalMs)
}
- var controller = new Controller(rpcEnv, conf, metricsSystem)
+ var controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)
- val pushDataHandler = new PushDataHandler()
+ val pushDataHandler = new PushDataHandler(workerSource)
val (pushServer, pushClientFactory) = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads = conf.workerPushIoThreads.getOrElse(storageManager.totalFlusherThread)
@@ -154,7 +154,7 @@ private[celeborn] class Worker(
transportContext.createClientFactory())
}
- val replicateHandler = new PushDataHandler()
+ val replicateHandler = new PushDataHandler(workerSource)
private val replicateServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads =
@@ -179,7 +179,7 @@ private[celeborn] class Worker(
val numThreads = conf.workerFetchIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, numThreads)
- fetchHandler = new FetchHandler(conf, transportConf)
+ fetchHandler = new FetchHandler(conf, transportConf, workerSource)
val transportContext: TransportContext =
new TransportContext(
transportConf,
diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java
index 7234c21fa..7da20b2ea 100644
--- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java
+++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java
@@ -183,8 +183,7 @@ public class FetchHandlerSuiteJ {
TransportClient client = new TransportClient(channel, mock(TransportResponseHandler.class));
FetchHandler fetchHandler = mockFetchHandler(fileInfo);
- PbStreamHandler streamHandler =
- openStreamAndCheck(client, channel, fetchHandler, 0, Integer.MAX_VALUE);
+ PbStreamHandler streamHandler = openStreamAndCheck(client, channel, fetchHandler, 5, 10);
fetchChunkAndCheck(client, channel, fetchHandler, streamHandler);
} finally {
@@ -257,11 +256,11 @@ public class FetchHandlerSuiteJ {
}
private FetchHandler mockFetchHandler(FileInfo fileInfo) {
+ WorkerSource workerSource = mock(WorkerSource.class);
TransportConf transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, 4);
- FetchHandler fetchHandler0 = new FetchHandler(conf, transportConf);
+ FetchHandler fetchHandler0 = new FetchHandler(conf, transportConf, workerSource);
Worker worker = mock(Worker.class);
- WorkerSource workerSource = mock(WorkerSource.class);
PartitionFilesSorter partitionFilesSorter =
new PartitionFilesSorter(MemoryManager.instance(), conf, workerSource);
diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index 5898402bb..ad9cfef1e 100644
--- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -83,7 +83,6 @@ public class FileWriterSuiteJ {
private static final CelebornConf CONF = new CelebornConf();
public static final Long SPLIT_THRESHOLD = 256 * 1024 * 1024L;
public static final PartitionSplitMode splitMode = PartitionSplitMode.HARD;
- public static final PartitionType partitionType = PartitionType.REDUCE;
private static File tempDir = null;
private static LocalFlusher localFlusher = null;
@@ -139,7 +138,7 @@ public class FileWriterSuiteJ {
public static void setupChunkServer(FileInfo info) throws IOException {
FetchHandler handler =
- new FetchHandler(transConf.getCelebornConf(), transConf) {
+ new FetchHandler(transConf.getCelebornConf(), transConf, mock(WorkerSource.class)) {
@Override
public StorageManager storageManager() {
return new StorageManager(CONF, source);