You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/09/06 14:34:13 UTC

[incubator-celeborn] branch main updated: [CELEBORN-627][FLINK][FOLLOWUP] Support split partitions

This is an automated email from the ASF dual-hosted git repository.

zhongqiangchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b1e3d661e [CELEBORN-627][FLINK][FOLLOWUP] Support split partitions
b1e3d661e is described below

commit b1e3d661e6e7db255867159b639c3b345566b2e4
Author: zhongqiang.czq <zh...@alibaba-inc.com>
AuthorDate: Wed Sep 6 22:33:56 2023 +0800

    [CELEBORN-627][FLINK][FOLLOWUP] Support split partitions
    
    ### What changes were proposed in this pull request?
    fix duplicated sending commitFiles for MapPartition and fix not sending BufferStreamEnd while opening MapPartition split.
    
    ### Why are the changes needed?
    After open partition split  for MapPartition, there are 2 errors.
    - ERROR1 : Worker don't send streamend to client because concurrent thread sync problem . After idle timeout, client will close the channel and throws the Exception **" xx is lost, notify related stream xx"**
    ```java
    2023-09-06T04:40:47.7549935Z 23/09/06 04:40:47,753 WARN [Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0] Task: Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0 (c1cade728ddb3a32e0bf72acb1d87588_c27dcf7b54ef6bfd6cff02ca8870b681_4_0) switched from RUNNING to FAILED with failure cause:
    2023-09-06T04:40:47.7550644Z java.io.IOException: Client localhost/127.0.0.1:38485 is lost, notify related stream 256654410004
    2023-09-06T04:40:47.7551219Z    at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:142)
    2023-09-06T04:40:47.7551886Z    at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
    2023-09-06T04:40:47.7552576Z    at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:57)
    2023-09-06T04:40:47.7553250Z    at org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:119)
    2023-09-06T04:40:47.7553806Z    at java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597)
    2023-09-06T04:40:47.7554564Z    at org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:110)
    2023-09-06T04:40:47.7555270Z    at org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:71)
    2023-09-06T04:40:47.7556005Z    at org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:136)
    2023-09-06T04:40:47.7556710Z    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    2023-09-06T04:40:47.7557370Z    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    2023-09-06T04:40:47.7558172Z    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    2023-09-06T04:40:47.7558803Z    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    2023-09-06T04:40:47.7559368Z    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    2023-09-06T04:40:47.7559954Z    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    2023-09-06T04:40:47.7560589Z    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    2023-09-06T04:40:47.7561222Z    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    2023-09-06T04:40:47.7561829Z    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    2023-09-06T04:40:47.7562620Z    at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:206)
    2023-09-06T04:40:47.7563506Z    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    2023-09-06T04:40:47.7564207Z    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    2023-09-06T04:40:47.7564829Z    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    2023-09-06T04:40:47.7565417Z    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    2023-09-06T04:40:47.7566014Z    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
    2023-09-06T04:40:47.7566654Z    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    2023-09-06T04:40:47.7567317Z    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    2023-09-06T04:40:47.7567813Z    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    2023-09-06T04:40:47.7568297Z    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    2023-09-06T04:40:47.7568830Z    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    2023-09-06T04:40:47.7569402Z    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    2023-09-06T04:40:47.7569894Z    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    2023-09-06T04:40:47.7570356Z    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    2023-09-06T04:40:47.7570841Z    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    2023-09-06T04:40:47.7571319Z    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    2023-09-06T04:40:47.7571721Z    at java.lang.Thread.run(Thread.java:750)
    ```
    - ERROR2: Client will send duplicated commitFiles to worker. Becuase of inconsistency unHandledPartiitions , both batchCommit and finalCommit send commitFiles
    ``` java
    2023-09-06T04:36:48.3146773Z 23/09/06 04:36:48,314 WARN [Worker-CommitFiles-1] Controller: Get Partition Location for 1693975002919-61094c8156f918062a5fae12d551bc90-0 0-1 but didn't exist.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ci
    
    Closes #1881 from zhongqiangczq/fix-split-test.
    
    Authored-by: zhongqiang.czq <zh...@alibaba-inc.com>
    Signed-off-by: zhongqiang.czq <zh...@alibaba-inc.com>
---
 .../client/commit/MapPartitionCommitHandler.scala  |  3 +--
 tests/flink-it/src/test/resources/log4j2-test.xml  |  4 ++--
 .../worker/storage/MapDataPartitionReader.java     |  9 ++++----
 .../service/deploy/worker/PushDataHandler.scala    | 24 ++++++++++++++++++----
 4 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
index e75e77bf0..54d05671f 100644
--- a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
@@ -74,7 +74,7 @@ class MapPartitionCommitHandler(
       shuffleId: Int,
       shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] = {
     shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot { partitionLocation =>
-      shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) &&
+      shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) ||
       isPartitionInProcess(shuffleId, partitionLocation.getId)
     }
   }
@@ -199,7 +199,6 @@ class MapPartitionCommitHandler(
       recordWorkerFailure(commitFailedWorkers)
     }
 
-    inProcessingPartitionIds.remove(partitionId)
     if (dataCommitSuccess) {
       val resultPartitions =
         shuffleSucceedPartitionIds.computeIfAbsent(
diff --git a/tests/flink-it/src/test/resources/log4j2-test.xml b/tests/flink-it/src/test/resources/log4j2-test.xml
index 5fb0cb8ba..607cbb578 100644
--- a/tests/flink-it/src/test/resources/log4j2-test.xml
+++ b/tests/flink-it/src/test/resources/log4j2-test.xml
@@ -16,7 +16,7 @@
   ~ limitations under the License.
   -->
 
-<Configuration status="INFO">
+<Configuration status="DEBUG">
     <Appenders>
         <Console name="stdout" target="SYSTEM_OUT">
             <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/>
@@ -29,7 +29,7 @@
         </File>
     </Appenders>
     <Loggers>
-        <Root level="INFO">
+        <Root level="DEBUG">
             <AppenderRef ref="stdout"/>
             <AppenderRef ref="file"/>
         </Root>
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
index 06e50c01b..b8f996fe7 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
@@ -71,7 +71,7 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader
 
   /** Whether all the data has been successfully read or not. */
   @GuardedBy("lock")
-  private boolean readFinished;
+  private volatile boolean readFinished;
 
   /** Whether this partition reader has been released or not. */
   @GuardedBy("lock")
@@ -162,13 +162,14 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader
       addBuffer(buffer, bufferRecycler);
       ++numDataBuffers;
     }
-    if (numDataBuffers > 0) {
-      notifyBacklog(numDataBuffers);
-    }
 
     if (!hasRemaining) {
       closeReader();
     }
+
+    if (numDataBuffers > 0) {
+      notifyBacklog(numDataBuffers);
+    }
   }
 
   private void addBuffer(ByteBuf buffer, BufferRecycler bufferRecycler) {
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 963736996..36167ad47 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -940,13 +940,15 @@ class PushDataHandler extends BaseMessageHandler with Logging {
     val isPartitionSplitEnabled = fileWriter.asInstanceOf[
       MapPartitionFileWriter].getFileInfo.isPartitionSplitEnabled
 
-    if (shutdown.get() && (messageType == Type.REGION_START || messageType == Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
+    if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
+        Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
       logInfo(s"$messageType return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
       callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
       return
     }
 
-    if (checkSplit && (messageType == Type.REGION_START || messageType == Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
+    if (checkSplit && (messageType == Type.REGION_START || messageType ==
+        Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
         fileWriter,
         isPrimary,
         null,
@@ -1116,7 +1118,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       callback: RpcResponseCallback): Boolean = {
     val diskFull = checkDiskFull(fileWriter)
     logDebug(
-      s"CheckDiskFullAndSplit in diskfull: $diskFull, partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold: ${fileWriter.getSplitThreshold()}, filelength: ${fileWriter.getFileInfo.getFileLength}, filename:${fileWriter.getFileInfo.getFilePath}")
+      s"""
+         |CheckDiskFullAndSplit in
+         |diskFull:$diskFull,
+         |partitionSplitMinimumSize: $partitionSplitMinimumSize,
+         |splitThreshold:${fileWriter.getSplitThreshold()},
+         |fileLength:${fileWriter.getFileInfo.getFileLength}
+         |fileName:${fileWriter.getFileInfo.getFilePath}
+         |""".stripMargin)
     if (workerPartitionSplitEnabled && ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
         (isPrimary && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold()))) {
       if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT &&
@@ -1125,7 +1134,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       } else {
         callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
         logDebug(
-          s"CheckDiskFullAndSplit hardsplit diskfull: $diskFull, partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold: ${fileWriter.getSplitThreshold()}, filelength: ${fileWriter.getFileInfo.getFileLength}, filename:${fileWriter.getFileInfo.getFilePath}")
+          s"""
+             |CheckDiskFullAndSplit hardSplit
+             |diskFull:$diskFull,
+             |partitionSplitMinimumSize:$partitionSplitMinimumSize,
+             |splitThreshold:${fileWriter.getSplitThreshold()},
+             |fileLength:${fileWriter.getFileInfo.getFileLength},
+             |fileName:${fileWriter.getFileInfo.getFilePath}
+             |""".stripMargin)
         return true
       }
     }