You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Duarte Ferreira (Jira)" <ji...@apache.org> on 2020/09/15 14:11:00 UTC

[jira] [Created] (SPARK-32893) Structured Streaming and Dynamic Allocation on StandaloneCluster

Duarte Ferreira created SPARK-32893:
---------------------------------------

             Summary: Structured Streaming and Dynamic Allocation on StandaloneCluster
                 Key: SPARK-32893
                 URL: https://issues.apache.org/jira/browse/SPARK-32893
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.0.1
            Reporter: Duarte Ferreira


We are currently using Spark 3.0.1 Standalone cluster to run our Structured streaming applications.

We set the following configurations when running the application in cluster mode:
 * spark.dynamicAllocation.enabled = true
 * spark.shuffle.service.enabled = true
 * spark.cores.max =5
 * spark.executor.memory = 1G
 * spark.executor.cores = 1

We also have the configurations set to enable spark.shuffle.service.enabled on each worker and have a cluster composed of 1 master and 2 slaves.

The application reads data from a kafka Topic (readTopic) using [This documentation, |https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]applies some transformations on the DataSet using spark SQL and writes data to another Kafka Topic (writeTopic).

When we start the application it behaves correctly, it starts with 0 executors and. as we start feeding data to the readTopic, it starts increasing the number of executors until it reaches the 5 executors limit and all messages are transformed and written to the writeTopic in Kafka.

If we stop feeding messages to the readTopic the application will work as expected and starts killing executors that are not needed anymore until we stop sending data completely and it reach 0 executors running.

If we start sending data again right away, it behaves just as expected it starts increasing the numbers of executors again. But if we leave the application in idle at 0 executors for around 10 minutes we start getting errors like this:
{noformat}
*no*
20/09/15 10:41:22 ERROR TransportClient: Failed to send RPC RPC 7570256331800450365 to sparkmaster/10.0.12.231:7077: java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
	at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
	at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
	at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
	at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
	at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	... 8 more
20/09/15 10:41:22 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request 1 total executors!

{noformat}
If we restart the master node, everything works again, if we restart the spark app, everything starts working again.

All nodes can ping the master node and they can start other applications or kill the ones running, there seams to be a problem only when increasing the executors.

Is this a Bug or are we missing some configuration/timeout?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org