You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/09/06 14:34:13 UTC
[incubator-celeborn] branch main updated: [CELEBORN-627][FLINK][FOLLOWUP] Support split partitions
This is an automated email from the ASF dual-hosted git repository.
zhongqiangchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b1e3d661e [CELEBORN-627][FLINK][FOLLOWUP] Support split partitions
b1e3d661e is described below
commit b1e3d661e6e7db255867159b639c3b345566b2e4
Author: zhongqiang.czq <zh...@alibaba-inc.com>
AuthorDate: Wed Sep 6 22:33:56 2023 +0800
[CELEBORN-627][FLINK][FOLLOWUP] Support split partitions
### What changes were proposed in this pull request?
fix duplicated sending commitFiles for MapPartition and fix not sending BufferStreamEnd while opening MapPartition split.
### Why are the changes needed?
After open partition split for MapPartition, there are 2 errors.
- ERROR1 : Worker don't send streamend to client because concurrent thread sync problem . After idle timeout, client will close the channel and throws the Exception **" xx is lost, notify related stream xx"**
```java
2023-09-06T04:40:47.7549935Z 23/09/06 04:40:47,753 WARN [Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0] Task: Keyed Aggregation -> Map -> Sink: Unnamed (5/8)#0 (c1cade728ddb3a32e0bf72acb1d87588_c27dcf7b54ef6bfd6cff02ca8870b681_4_0) switched from RUNNING to FAILED with failure cause:
2023-09-06T04:40:47.7550644Z java.io.IOException: Client localhost/127.0.0.1:38485 is lost, notify related stream 256654410004
2023-09-06T04:40:47.7551219Z at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:142)
2023-09-06T04:40:47.7551886Z at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
2023-09-06T04:40:47.7552576Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:57)
2023-09-06T04:40:47.7553250Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:119)
2023-09-06T04:40:47.7553806Z at java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597)
2023-09-06T04:40:47.7554564Z at org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:110)
2023-09-06T04:40:47.7555270Z at org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:71)
2023-09-06T04:40:47.7556005Z at org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:136)
2023-09-06T04:40:47.7556710Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
2023-09-06T04:40:47.7557370Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7558172Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7558803Z at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
2023-09-06T04:40:47.7559368Z at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
2023-09-06T04:40:47.7559954Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
2023-09-06T04:40:47.7560589Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7561222Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7561829Z at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
2023-09-06T04:40:47.7562620Z at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:206)
2023-09-06T04:40:47.7563506Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
2023-09-06T04:40:47.7564207Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7564829Z at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
2023-09-06T04:40:47.7565417Z at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
2023-09-06T04:40:47.7566014Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
2023-09-06T04:40:47.7566654Z at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
2023-09-06T04:40:47.7567317Z at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
2023-09-06T04:40:47.7567813Z at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
2023-09-06T04:40:47.7568297Z at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
2023-09-06T04:40:47.7568830Z at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
2023-09-06T04:40:47.7569402Z at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
2023-09-06T04:40:47.7569894Z at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
2023-09-06T04:40:47.7570356Z at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
2023-09-06T04:40:47.7570841Z at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2023-09-06T04:40:47.7571319Z at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2023-09-06T04:40:47.7571721Z at java.lang.Thread.run(Thread.java:750)
```
- ERROR2: Client will send duplicated commitFiles to worker. Becuase of inconsistency unHandledPartiitions , both batchCommit and finalCommit send commitFiles
``` java
2023-09-06T04:36:48.3146773Z 23/09/06 04:36:48,314 WARN [Worker-CommitFiles-1] Controller: Get Partition Location for 1693975002919-61094c8156f918062a5fae12d551bc90-0 0-1 but didn't exist.
```
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
Closes #1881 from zhongqiangczq/fix-split-test.
Authored-by: zhongqiang.czq <zh...@alibaba-inc.com>
Signed-off-by: zhongqiang.czq <zh...@alibaba-inc.com>
---
.../client/commit/MapPartitionCommitHandler.scala | 3 +--
tests/flink-it/src/test/resources/log4j2-test.xml | 4 ++--
.../worker/storage/MapDataPartitionReader.java | 9 ++++----
.../service/deploy/worker/PushDataHandler.scala | 24 ++++++++++++++++++----
4 files changed, 28 insertions(+), 12 deletions(-)
diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
index e75e77bf0..54d05671f 100644
--- a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala
@@ -74,7 +74,7 @@ class MapPartitionCommitHandler(
shuffleId: Int,
shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] = {
shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot { partitionLocation =>
- shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) &&
+ shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) ||
isPartitionInProcess(shuffleId, partitionLocation.getId)
}
}
@@ -199,7 +199,6 @@ class MapPartitionCommitHandler(
recordWorkerFailure(commitFailedWorkers)
}
- inProcessingPartitionIds.remove(partitionId)
if (dataCommitSuccess) {
val resultPartitions =
shuffleSucceedPartitionIds.computeIfAbsent(
diff --git a/tests/flink-it/src/test/resources/log4j2-test.xml b/tests/flink-it/src/test/resources/log4j2-test.xml
index 5fb0cb8ba..607cbb578 100644
--- a/tests/flink-it/src/test/resources/log4j2-test.xml
+++ b/tests/flink-it/src/test/resources/log4j2-test.xml
@@ -16,7 +16,7 @@
~ limitations under the License.
-->
-<Configuration status="INFO">
+<Configuration status="DEBUG">
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/>
@@ -29,7 +29,7 @@
</File>
</Appenders>
<Loggers>
- <Root level="INFO">
+ <Root level="DEBUG">
<AppenderRef ref="stdout"/>
<AppenderRef ref="file"/>
</Root>
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
index 06e50c01b..b8f996fe7 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java
@@ -71,7 +71,7 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader
/** Whether all the data has been successfully read or not. */
@GuardedBy("lock")
- private boolean readFinished;
+ private volatile boolean readFinished;
/** Whether this partition reader has been released or not. */
@GuardedBy("lock")
@@ -162,13 +162,14 @@ public class MapDataPartitionReader implements Comparable<MapDataPartitionReader
addBuffer(buffer, bufferRecycler);
++numDataBuffers;
}
- if (numDataBuffers > 0) {
- notifyBacklog(numDataBuffers);
- }
if (!hasRemaining) {
closeReader();
}
+
+ if (numDataBuffers > 0) {
+ notifyBacklog(numDataBuffers);
+ }
}
private void addBuffer(ByteBuf buffer, BufferRecycler bufferRecycler) {
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 963736996..36167ad47 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -940,13 +940,15 @@ class PushDataHandler extends BaseMessageHandler with Logging {
val isPartitionSplitEnabled = fileWriter.asInstanceOf[
MapPartitionFileWriter].getFileInfo.isPartitionSplitEnabled
- if (shutdown.get() && (messageType == Type.REGION_START || messageType == Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
+ if (shutdown.get() && (messageType == Type.REGION_START || messageType ==
+ Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled) {
logInfo(s"$messageType return HARD_SPLIT for shuffle $shuffleKey since worker shutdown.")
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}
- if (checkSplit && (messageType == Type.REGION_START || messageType == Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
+ if (checkSplit && (messageType == Type.REGION_START || messageType ==
+ Type.PUSH_DATA_HAND_SHAKE) && isPartitionSplitEnabled && checkDiskFullAndSplit(
fileWriter,
isPrimary,
null,
@@ -1116,7 +1118,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
callback: RpcResponseCallback): Boolean = {
val diskFull = checkDiskFull(fileWriter)
logDebug(
- s"CheckDiskFullAndSplit in diskfull: $diskFull, partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold: ${fileWriter.getSplitThreshold()}, filelength: ${fileWriter.getFileInfo.getFileLength}, filename:${fileWriter.getFileInfo.getFilePath}")
+ s"""
+ |CheckDiskFullAndSplit in
+ |diskFull:$diskFull,
+ |partitionSplitMinimumSize: $partitionSplitMinimumSize,
+ |splitThreshold:${fileWriter.getSplitThreshold()},
+ |fileLength:${fileWriter.getFileInfo.getFileLength}
+ |fileName:${fileWriter.getFileInfo.getFilePath}
+ |""".stripMargin)
if (workerPartitionSplitEnabled && ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
(isPrimary && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold()))) {
if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT &&
@@ -1125,7 +1134,14 @@ class PushDataHandler extends BaseMessageHandler with Logging {
} else {
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
logDebug(
- s"CheckDiskFullAndSplit hardsplit diskfull: $diskFull, partitionSplitMinimumSize: $partitionSplitMinimumSize, splitThreshold: ${fileWriter.getSplitThreshold()}, filelength: ${fileWriter.getFileInfo.getFileLength}, filename:${fileWriter.getFileInfo.getFilePath}")
+ s"""
+ |CheckDiskFullAndSplit hardSplit
+ |diskFull:$diskFull,
+ |partitionSplitMinimumSize:$partitionSplitMinimumSize,
+ |splitThreshold:${fileWriter.getSplitThreshold()},
+ |fileLength:${fileWriter.getFileInfo.getFileLength},
+ |fileName:${fileWriter.getFileInfo.getFilePath}
+ |""".stripMargin)
return true
}
}