You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Duarte Ferreira (Jira)" <ji...@apache.org> on 2020/09/15 14:11:00 UTC
[jira] [Created] (SPARK-32893) Structured Streaming and Dynamic
Allocation on StandaloneCluster
Duarte Ferreira created SPARK-32893:
---------------------------------------
Summary: Structured Streaming and Dynamic Allocation on StandaloneCluster
Key: SPARK-32893
URL: https://issues.apache.org/jira/browse/SPARK-32893
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 3.0.1
Reporter: Duarte Ferreira
We are currently using Spark 3.0.1 Standalone cluster to run our Structured streaming applications.
We set the following configurations when running the application in cluster mode:
* spark.dynamicAllocation.enabled = true
* spark.shuffle.service.enabled = true
* spark.cores.max =5
* spark.executor.memory = 1G
* spark.executor.cores = 1
We also have the configurations set to enable spark.shuffle.service.enabled on each worker and have a cluster composed of 1 master and 2 slaves.
The application reads data from a kafka Topic (readTopic) using [This documentation, |https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]applies some transformations on the DataSet using spark SQL and writes data to another Kafka Topic (writeTopic).
When we start the application it behaves correctly, it starts with 0 executors and. as we start feeding data to the readTopic, it starts increasing the number of executors until it reaches the 5 executors limit and all messages are transformed and written to the writeTopic in Kafka.
If we stop feeding messages to the readTopic the application will work as expected and starts killing executors that are not needed anymore until we stop sending data completely and it reach 0 executors running.
If we start sending data again right away, it behaves just as expected it starts increasing the numbers of executors again. But if we leave the application in idle at 0 executors for around 10 minutes we start getting errors like this:
{noformat}
*no*
20/09/15 10:41:22 ERROR TransportClient: Failed to send RPC RPC 7570256331800450365 to sparkmaster/10.0.12.231:7077: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
... 8 more
20/09/15 10:41:22 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request 1 total executors!
{noformat}
If we restart the master node, everything works again, if we restart the spark app, everything starts working again.
All nodes can ping the master node and they can start other applications or kill the ones running, there seams to be a problem only when increasing the executors.
Is this a Bug or are we missing some configuration/timeout?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org