You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Duarte Ferreira (Jira)" <ji...@apache.org> on 2020/09/24 13:40:00 UTC

[jira] [Comment Edited] (SPARK-32893) Structured Streaming and Dynamic Allocation on StandaloneCluster

    [ https://issues.apache.org/jira/browse/SPARK-32893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201527#comment-17201527 ] 

Duarte Ferreira edited comment on SPARK-32893 at 9/24/20, 1:39 PM:
-------------------------------------------------------------------

Further testing has shown that it happens even if the minimum number of executors e higher than 0, once it reaches that number for more than 10 minutes it cannot assign new executors even though there are enough resources on the spark worker nodes to start new executors.

 


was (Author: dnferreira):
Further testing has shown that it happens even if the minimum number of executors e higher than 0, once it reaches that number for more than 10 minutos it cannot assign new executors even though there are enough resources on the spark worker nodes to start new executors.

 

> Structured Streaming and Dynamic Allocation on StandaloneCluster
> ----------------------------------------------------------------
>
>                 Key: SPARK-32893
>                 URL: https://issues.apache.org/jira/browse/SPARK-32893
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.0.1
>            Reporter: Duarte Ferreira
>            Priority: Major
>
> We are currently using Spark 3.0.1 Standalone cluster to run our Structured streaming applications.
> We set the following configurations when running the application in cluster mode:
>  * spark.dynamicAllocation.enabled = true
>  * spark.shuffle.service.enabled = true
>  * spark.cores.max =5
>  * spark.executor.memory = 1G
>  * spark.executor.cores = 1
> We also have the configurations set to enable spark.shuffle.service.enabled on each worker and have a cluster composed of 1 master and 2 slaves.
> The application reads data from a kafka Topic (readTopic) using [This documentation, |https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]applies some transformations on the DataSet using spark SQL and writes data to another Kafka Topic (writeTopic).
> When we start the application it behaves correctly, it starts with 0 executors and. as we start feeding data to the readTopic, it starts increasing the number of executors until it reaches the 5 executors limit and all messages are transformed and written to the writeTopic in Kafka.
> If we stop feeding messages to the readTopic the application will work as expected and starts killing executors that are not needed anymore until we stop sending data completely and it reach 0 executors running.
> If we start sending data again right away, it behaves just as expected it starts increasing the numbers of executors again. But if we leave the application in idle at 0 executors for around 10 minutes we start getting errors like this:
> {noformat}
> *no*
> 20/09/15 10:41:22 ERROR TransportClient: Failed to send RPC RPC 7570256331800450365 to sparkmaster/10.0.12.231:7077: java.nio.channels.ClosedChannelException
> java.nio.channels.ClosedChannelException
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
> 	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
> 	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
> 	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection reset by peer
> 	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> 	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> 	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> 	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
> 	at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
> 	at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
> 	at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
> 	at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
> 	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
> 	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
> 	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
> 	at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
> 	... 8 more
> 20/09/15 10:41:22 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request 1 total executors!
> {noformat}
> If we restart the master node, everything works again, if we restart the spark app, everything starts working again.
> All nodes can ping the master node and they can start other applications or kill the ones running, there seams to be a problem only when increasing the executors.
> Is this a Bug or are we missing some configuration/timeout?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org