You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sitakant Mishra <si...@gmail.com> on 2017/12/26 19:34:34 UTC

Problem in Spark-Kafka Connector

Hi,

I am trying to connect my Spark cluster to a single Kafka Topic which
running as a separate process in a machine. While submitting the spark
application, I am getting the following error.



*17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
byteCount=186935315,
body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
offset=0, length=186935315}} to /129.82.44.156:55168
<http://129.82.44.156:55168/>; closing connection*
*java.nio.channels.ClosedChannelException*
* at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown
Source)*
*17/12/25 16:56:57 INFO TaskSetManager: Starting task 21.0 in stage 0.0
(TID 21, 129.82.44.156, executor 9, partition 21, PROCESS_LOCAL, 4706
bytes)*
*17/12/25 16:56:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
129.82.44.156, executor 9): java.nio.channels.ClosedChannelException*
* at
org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)*
* at
org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)*
* at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)*
* at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)*
* at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)*
* at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)*
* at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)*
* at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)*
* at
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)*
* at
io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)*
* at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)*
* at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)*
* at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)*
* at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)*
* at java.lang.Thread.run(Thread.java:745)*

*17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
byteCount=186935315,
body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
offset=0, length=186935315}} to /129.82.44.164:45988
<http://129.82.44.164:45988/>; closing connection*
*java.nio.channels.ClosedChannelException*
* at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown
Source)*
*17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
byteCount=186935315,
body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
offset=0, length=186935315}} to /129.82.44.142:56136
<http://129.82.44.142:56136/>; closing connection*



I looked over the web and I found only the following relevant link "
https://stackoverflow.com/questions/29781489/apache-
spark-network-errors-between-executors?noredirect=1&lq=1". I tried with the
suggestion given in the discussion as below.


val conf = new SparkConf().setAppName("KafkaInput").set("spark.shuffle.blockTransferService",
"nio")


But still it does not work. I am using "spark-2.2.0-bin-hadoop2.7" version
of spark. Please help me with this issue and let me know if you need any
other information from my side.



Thanks and Regards,
Sitakanta Mishra

Re: Problem in Spark-Kafka Connector

Posted by Sitakant Mishra <si...@gmail.com>.
Hi,

Kindly help me with this problem, for which I will be grateful.

Thanks and Regards,
Sitakanta Mishra

On Tue, Dec 26, 2017 at 12:34 PM, Sitakant Mishra <
sitakanta.mishra@gmail.com> wrote:

> Hi,
>
> I am trying to connect my Spark cluster to a single Kafka Topic which
> running as a separate process in a machine. While submitting the spark
> application, I am getting the following error.
>
>
>
> *17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
> StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
> byteCount=186935315,
> body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
> offset=0, length=186935315}} to /129.82.44.156:55168
> <http://129.82.44.156:55168/>; closing connection*
> *java.nio.channels.ClosedChannelException*
> * at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown
> Source)*
> *17/12/25 16:56:57 INFO TaskSetManager: Starting task 21.0 in stage 0.0
> (TID 21, 129.82.44.156, executor 9, partition 21, PROCESS_LOCAL, 4706
> bytes)*
> *17/12/25 16:56:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 129.82.44.156, executor 9): java.nio.channels.ClosedChannelException*
> * at
> org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)*
> * at
> org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)*
> * at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)*
> * at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)*
> * at
> io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)*
> * at
> io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)*
> * at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)*
> * at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)*
> * at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)*
> * at
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)*
> * at java.lang.Thread.run(Thread.java:745)*
>
> *17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
> StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
> byteCount=186935315,
> body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
> offset=0, length=186935315}} to /129.82.44.164:45988
> <http://129.82.44.164:45988/>; closing connection*
> *java.nio.channels.ClosedChannelException*
> * at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown
> Source)*
> *17/12/25 16:56:57 ERROR TransportRequestHandler: Error sending result
> StreamResponse{streamId=/jars/learning-spark-examples-assembly-0.0.1.jar,
> byteCount=186935315,
> body=FileSegmentManagedBuffer{file=/s/chopin/a/grad/skmishra/Thesis/spark-pipeline/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar,
> offset=0, length=186935315}} to /129.82.44.142:56136
> <http://129.82.44.142:56136/>; closing connection*
>
>
>
> I looked over the web and I found only the following relevant link "
> https://stackoverflow.com/questions/29781489/apache-spark-
> network-errors-between-executors?noredirect=1&lq=1". I tried with the
> suggestion given in the discussion as below.
>
>
> val conf = new SparkConf().setAppName("KafkaInput").set("spark.shuffle.blockTransferService",
> "nio")
>
>
> But still it does not work. I am using "spark-2.2.0-bin-hadoop2.7" version
> of spark. Please help me with this issue and let me know if you need any
> other information from my side.
>
>
>
> Thanks and Regards,
> Sitakanta Mishra
>