You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ranga Reddy (Jira)" <ji...@apache.org> on 2023/03/21 12:34:00 UTC
[jira] [Commented] (SPARK-32893) Structured Streaming and Dynamic Allocation on StandaloneCluster
[ https://issues.apache.org/jira/browse/SPARK-32893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703177#comment-17703177 ]
Ranga Reddy commented on SPARK-32893:
-------------------------------------
I can see similar behaviour Spark Structured Streaming with Yarn.
{code:java}
2023-03-14 18:17:29 ERROR TransportClient:337 - Failed to send RPC RPC 7955407071046657873 to /127.0.0.1:50040: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748) {code}
> Structured Streaming and Dynamic Allocation on StandaloneCluster
> ----------------------------------------------------------------
>
> Key: SPARK-32893
> URL: https://issues.apache.org/jira/browse/SPARK-32893
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.0.1
> Reporter: Duarte Ferreira
> Priority: Major
>
> We are currently using Spark 3.0.1 Standalone cluster to run our Structured streaming applications.
> We set the following configurations when running the application in cluster mode:
> * spark.dynamicAllocation.enabled = true
> * spark.shuffle.service.enabled = true
> * spark.cores.max =5
> * spark.executor.memory = 1G
> * spark.executor.cores = 1
> We also have the configurations set to enable spark.shuffle.service.enabled on each worker and have a cluster composed of 1 master and 2 slaves.
> The application reads data from a kafka Topic (readTopic) using [This documentation, |https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]applies some transformations on the DataSet using spark SQL and writes data to another Kafka Topic (writeTopic).
> When we start the application it behaves correctly, it starts with 0 executors and. as we start feeding data to the readTopic, it starts increasing the number of executors until it reaches the 5 executors limit and all messages are transformed and written to the writeTopic in Kafka.
> If we stop feeding messages to the readTopic the application will work as expected and starts killing executors that are not needed anymore until we stop sending data completely and it reach 0 executors running.
> If we start sending data again right away, it behaves just as expected it starts increasing the numbers of executors again. But if we leave the application in idle at 0 executors for around 10 minutes we start getting errors like this:
> {noformat}
> *no*
> 20/09/15 10:41:22 ERROR TransportClient: Failed to send RPC RPC 7570256331800450365 to sparkmaster/10.0.12.231:7077: java.nio.channels.ClosedChannelException
> java.nio.channels.ClosedChannelException
> at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
> at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
> at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
> at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
> at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
> at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
> at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
> at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
> at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
> at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
> at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
> ... 8 more
> 20/09/15 10:41:22 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request 1 total executors!
> {noformat}
> If we restart the master node, everything works again, if we restart the spark app, everything starts working again.
> All nodes can ping the master node and they can start other applications or kill the ones running, there seams to be a problem only when increasing the executors.
> Is this a Bug or are we missing some configuration/timeout?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org