You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ranga Reddy (Jira)" <ji...@apache.org> on 2023/03/21 12:34:00 UTC

[jira] [Commented] (SPARK-32893) Structured Streaming and Dynamic Allocation on StandaloneCluster

    [ https://issues.apache.org/jira/browse/SPARK-32893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703177#comment-17703177 ] 

Ranga Reddy commented on SPARK-32893:
-------------------------------------

I can see similar behaviour Spark Structured Streaming with Yarn.
{code:java}
2023-03-14 18:17:29 ERROR TransportClient:337 - Failed to send RPC RPC 7955407071046657873 to /127.0.0.1:50040: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748) {code}

> Structured Streaming and Dynamic Allocation on StandaloneCluster
> ----------------------------------------------------------------
>
>                 Key: SPARK-32893
>                 URL: https://issues.apache.org/jira/browse/SPARK-32893
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.0.1
>            Reporter: Duarte Ferreira
>            Priority: Major
>
> We are currently using Spark 3.0.1 Standalone cluster to run our Structured streaming applications.
> We set the following configurations when running the application in cluster mode:
>  * spark.dynamicAllocation.enabled = true
>  * spark.shuffle.service.enabled = true
>  * spark.cores.max =5
>  * spark.executor.memory = 1G
>  * spark.executor.cores = 1
> We also have the configurations set to enable spark.shuffle.service.enabled on each worker and have a cluster composed of 1 master and 2 slaves.
> The application reads data from a kafka Topic (readTopic) using [This documentation, |https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]applies some transformations on the DataSet using spark SQL and writes data to another Kafka Topic (writeTopic).
> When we start the application it behaves correctly, it starts with 0 executors and. as we start feeding data to the readTopic, it starts increasing the number of executors until it reaches the 5 executors limit and all messages are transformed and written to the writeTopic in Kafka.
> If we stop feeding messages to the readTopic the application will work as expected and starts killing executors that are not needed anymore until we stop sending data completely and it reach 0 executors running.
> If we start sending data again right away, it behaves just as expected it starts increasing the numbers of executors again. But if we leave the application in idle at 0 executors for around 10 minutes we start getting errors like this:
> {noformat}
> *no*
> 20/09/15 10:41:22 ERROR TransportClient: Failed to send RPC RPC 7570256331800450365 to sparkmaster/10.0.12.231:7077: java.nio.channels.ClosedChannelException
> java.nio.channels.ClosedChannelException
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
> 	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> 	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
> 	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection reset by peer
> 	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> 	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> 	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> 	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
> 	at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
> 	at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
> 	at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
> 	at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
> 	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
> 	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
> 	at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
> 	... 8 more
> 20/09/15 10:41:22 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request 1 total executors!
> {noformat}
> If we restart the master node, everything works again, if we restart the spark app, everything starts working again.
> All nodes can ping the master node and they can start other applications or kill the ones running, there seams to be a problem only when increasing the executors.
> Is this a Bug or are we missing some configuration/timeout?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org