You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Duarte Ferreira (Jira)" <ji...@apache.org> on 2020/09/15 18:08:00 UTC

[jira] [Updated] (SPARK-32893) Structured Streaming and Dynamic Allocation on StandaloneCluster

     [ https://issues.apache.org/jira/browse/SPARK-32893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Duarte Ferreira updated SPARK-32893:
------------------------------------
    Priority: Blocker  (was: Major)

> Structured Streaming and Dynamic Allocation on StandaloneCluster
> ----------------------------------------------------------------
>
>                 Key: SPARK-32893
>                 URL: https://issues.apache.org/jira/browse/SPARK-32893
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.0.1
>            Reporter: Duarte Ferreira
>            Priority: Blocker
>
> We are currently using Spark 3.0.1 Standalone cluster to run our Structured streaming applications.
> We set the following configurations when running the application in cluster mode:
>  * spark.dynamicAllocation.enabled = true
>  * spark.shuffle.service.enabled = true
>  * spark.cores.max =5
>  * spark.executor.memory = 1G
>  * spark.executor.cores = 1
> We also have the configurations set to enable spark.shuffle.service.enabled on each worker and have a cluster composed of 1 master and 2 slaves.
> The application reads data from a kafka Topic (readTopic) using [This documentation, |https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]applies some transformations on the DataSet using spark SQL and writes data to another Kafka Topic (writeTopic).
> When we start the application it behaves correctly, it starts with 0 executors and. as we start feeding data to the readTopic, it starts increasing the number of executors until it reaches the 5 executors limit and all messages are transformed and written to the writeTopic in Kafka.
> If we stop feeding messages to the readTopic the application will work as expected and starts killing executors that are not needed anymore until we stop sending data completely and it reach 0 executors running.
> If we start sending data again right away, it behaves just as expected it starts increasing the numbers of executors again. But if we leave the application in idle at 0 executors for around 10 minutes we start getting errors like this:
> {noformat}
> *no*
> 20/09/15 10:41:22 ERROR TransportClient: Failed to send RPC RPC 7570256331800450365 to sparkmaster/10.0.12.231:7077: java.nio.channels.ClosedChannelException
> java.nio.channels.ClosedChannelException
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
> 	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> 	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
> 	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection reset by peer
> 	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> 	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> 	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> 	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
> 	at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
> 	at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
> 	at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
> 	at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
> 	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
> 	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
> 	at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
> 	... 8 more
> 20/09/15 10:41:22 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request 1 total executors!
> {noformat}
> If we restart the master node, everything works again, if we restart the spark app, everything starts working again.
> All nodes can ping the master node and they can start other applications or kill the ones running, there seams to be a problem only when increasing the executors.
> Is this a Bug or are we missing some configuration/timeout?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org