You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/10/24 14:09:19 UTC

[incubator-celeborn] branch main updated: [CELEBORN-1084] Initialize `workerSource` member to prevent `NullPointException`

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 875ad1fa8 [CELEBORN-1084] Initialize `workerSource` member to prevent `NullPointException`
875ad1fa8 is described below

commit 875ad1fa8e579ca3c42446295f68f39dded66922
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Tue Oct 24 22:09:11 2023 +0800

    [CELEBORN-1084] Initialize `workerSource` member to prevent `NullPointException`
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    This PR addresses a NPE issue that occurs when the `workerSource` member is accessed before it is initialized. To resolve this issue, we initialize the `workerSource` member when the handlers are created.
    
    ```
    23/10/24 16:27:03,363 ERROR [fetch-server-11-1] TransportChannelHandler: Exception from request handler while channel is active
    java.lang.NullPointerException
            at org.apache.celeborn.service.deploy.worker.FetchHandler.channelActive(FetchHandler.scala:412)
            at org.apache.celeborn.common.network.server.TransportRequestHandler.channelActive(TransportRequestHandler.java:66)
            at org.apache.celeborn.common.network.server.TransportChannelHandler.channelActive(TransportChannelHandler.java:120)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:262)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
            at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
            at io.netty.handler.timeout.IdleStateHandler.channelActive(IdleStateHandler.java:271)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:260)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
            at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:258)
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
            at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
            at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:522)
            at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
            at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
            at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
            at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
            at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
            at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:748)
    ...
    23/10/24 16:27:03,423 INFO [main] Worker: Starting Worker <ip>:<port1>:<port2>:<port3> with {/data1=DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data1, usableSpace: 250.0 GiB, avgFlushTime: 0 ns, avgFetchTime: 0 ns, activeSlots: 0) status: HEALTHY dirs /data1/celeborn/worker/celeborn-worker/shuffle_data, /data=DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data, usableSpace: 250.0 GiB, avgFlushTime: 0 ns, avgFet [...]
    ...
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #2034 from cfmcgrady/fix-start-worker-npe.
    
    Authored-by: Fu Chen <cf...@gmail.com>
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
 .../org/apache/celeborn/service/deploy/worker/Controller.scala    | 5 ++---
 .../org/apache/celeborn/service/deploy/worker/FetchHandler.scala  | 7 ++++---
 .../apache/celeborn/service/deploy/worker/PushDataHandler.scala   | 4 +---
 .../scala/org/apache/celeborn/service/deploy/worker/Worker.scala  | 8 ++++----
 .../apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java | 7 +++----
 .../celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java  | 3 +--
 6 files changed, 15 insertions(+), 19 deletions(-)

diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 464806ade..f65b7f36c 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -43,10 +43,10 @@ import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, MapPartiti
 private[deploy] class Controller(
     override val rpcEnv: RpcEnv,
     val conf: CelebornConf,
-    val metricsSystem: MetricsSystem)
+    val metricsSystem: MetricsSystem,
+    val workerSource: WorkerSource)
   extends RpcEndpoint with Logging {
 
-  var workerSource: WorkerSource = _
   var storageManager: StorageManager = _
   var shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] = _
   // shuffleKey -> (epoch -> CommitInfo)
@@ -65,7 +65,6 @@ private[deploy] class Controller(
   val testRetryCommitFiles = conf.testRetryCommitFiles
 
   def init(worker: Worker): Unit = {
-    workerSource = worker.workerSource
     storageManager = worker.storageManager
     shufflePartitionType = worker.shufflePartitionType
     shufflePushDataTimeout = worker.shufflePushDataTimeout
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index cbb575e68..38fd353eb 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -41,7 +41,10 @@ import org.apache.celeborn.common.protocol.{MessageType, PartitionType, PbBuffer
 import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
 import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager, CreditStreamManager, PartitionFilesSorter, StorageManager}
 
-class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
+class FetchHandler(
+    val conf: CelebornConf,
+    val transportConf: TransportConf,
+    val workerSource: WorkerSource)
   extends BaseMessageHandler with Logging {
 
   val chunkStreamManager = new ChunkStreamManager()
@@ -52,13 +55,11 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
     conf.partitionReadBuffersMax,
     conf.creditStreamThreadsPerMountpoint,
     conf.readBuffersToTriggerReadMin)
-  var workerSource: WorkerSource = _
   var storageManager: StorageManager = _
   var partitionsSorter: PartitionFilesSorter = _
   var registered: AtomicBoolean = new AtomicBoolean(false)
 
   def init(worker: Worker): Unit = {
-    this.workerSource = worker.workerSource
 
     workerSource.addGauge(WorkerSource.CREDIT_STREAM_COUNT) { () =>
       creditStreamManager.getStreamsCount
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 4cf8456de..86e6b55ca 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -42,9 +42,8 @@ import org.apache.celeborn.common.util.Utils
 import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}
 
-class PushDataHandler extends BaseMessageHandler with Logging {
+class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler with Logging {
 
-  private var workerSource: WorkerSource = _
   private var partitionLocationInfo: WorkerPartitionLocationInfo = _
   private var shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] = _
   private var shufflePartitionType: ConcurrentHashMap[String, PartitionType] = _
@@ -66,7 +65,6 @@ class PushDataHandler extends BaseMessageHandler with Logging {
   private var testPushReplicaDataTimeout: Boolean = _
 
   def init(worker: Worker): Unit = {
-    workerSource = worker.workerSource
     partitionLocationInfo = worker.partitionLocationInfo
     shufflePartitionType = worker.shufflePartitionType
     shufflePushDataTimeout = worker.shufflePushDataTimeout
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 1cc85db3d..1c15efb4b 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -131,10 +131,10 @@ private[celeborn] class Worker(
       conf.workerCongestionControlUserInactiveIntervalMs)
   }
 
-  var controller = new Controller(rpcEnv, conf, metricsSystem)
+  var controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
   rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)
 
-  val pushDataHandler = new PushDataHandler()
+  val pushDataHandler = new PushDataHandler(workerSource)
   val (pushServer, pushClientFactory) = {
     val closeIdleConnections = conf.workerCloseIdleConnections
     val numThreads = conf.workerPushIoThreads.getOrElse(storageManager.totalFlusherThread)
@@ -154,7 +154,7 @@ private[celeborn] class Worker(
       transportContext.createClientFactory())
   }
 
-  val replicateHandler = new PushDataHandler()
+  val replicateHandler = new PushDataHandler(workerSource)
   private val replicateServer = {
     val closeIdleConnections = conf.workerCloseIdleConnections
     val numThreads =
@@ -179,7 +179,7 @@ private[celeborn] class Worker(
     val numThreads = conf.workerFetchIoThreads.getOrElse(storageManager.totalFlusherThread)
     val transportConf =
       Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, numThreads)
-    fetchHandler = new FetchHandler(conf, transportConf)
+    fetchHandler = new FetchHandler(conf, transportConf, workerSource)
     val transportContext: TransportContext =
       new TransportContext(
         transportConf,
diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java
index 7234c21fa..7da20b2ea 100644
--- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java
+++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/FetchHandlerSuiteJ.java
@@ -183,8 +183,7 @@ public class FetchHandlerSuiteJ {
       TransportClient client = new TransportClient(channel, mock(TransportResponseHandler.class));
       FetchHandler fetchHandler = mockFetchHandler(fileInfo);
 
-      PbStreamHandler streamHandler =
-          openStreamAndCheck(client, channel, fetchHandler, 0, Integer.MAX_VALUE);
+      PbStreamHandler streamHandler = openStreamAndCheck(client, channel, fetchHandler, 5, 10);
 
       fetchChunkAndCheck(client, channel, fetchHandler, streamHandler);
     } finally {
@@ -257,11 +256,11 @@ public class FetchHandlerSuiteJ {
   }
 
   private FetchHandler mockFetchHandler(FileInfo fileInfo) {
+    WorkerSource workerSource = mock(WorkerSource.class);
     TransportConf transportConf =
         Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, 4);
-    FetchHandler fetchHandler0 = new FetchHandler(conf, transportConf);
+    FetchHandler fetchHandler0 = new FetchHandler(conf, transportConf, workerSource);
     Worker worker = mock(Worker.class);
-    WorkerSource workerSource = mock(WorkerSource.class);
     PartitionFilesSorter partitionFilesSorter =
         new PartitionFilesSorter(MemoryManager.instance(), conf, workerSource);
 
diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index 5898402bb..ad9cfef1e 100644
--- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -83,7 +83,6 @@ public class FileWriterSuiteJ {
   private static final CelebornConf CONF = new CelebornConf();
   public static final Long SPLIT_THRESHOLD = 256 * 1024 * 1024L;
   public static final PartitionSplitMode splitMode = PartitionSplitMode.HARD;
-  public static final PartitionType partitionType = PartitionType.REDUCE;
 
   private static File tempDir = null;
   private static LocalFlusher localFlusher = null;
@@ -139,7 +138,7 @@ public class FileWriterSuiteJ {
 
   public static void setupChunkServer(FileInfo info) throws IOException {
     FetchHandler handler =
-        new FetchHandler(transConf.getCelebornConf(), transConf) {
+        new FetchHandler(transConf.getCelebornConf(), transConf, mock(WorkerSource.class)) {
           @Override
           public StorageManager storageManager() {
             return new StorageManager(CONF, source);