You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Balaji Rajagopalan <ba...@olacabs.com> on 2016/04/06 07:46:02 UTC

RemoteTransportException when trying to redis in flink code

I am trying to use AWS EMR yarn cluster where the flink code runs, in one
of apply window function, I try to set some values in redis it fails. I
have tried to access the same redis with no flink code and get/set works,
but from the flink I get  into this exception. Any inputs on what might be
going wrong.

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Error at remote task manager 'some-ip'.

at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)

at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by:
org.apache.flink.runtime.io.network.partition.ProducerFailedException

at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)

at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)

at
io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)

at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.ConnectException:
Connection refused

        at com.redis.IO <http://com.redis.io/>$class.connect(IO.scala:37)

        at com.redis.RedisClient.connect(RedisClient.scala:94)

        at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

        at com.redis.RedisClient.initialize(RedisClient.scala:94)

        at com.redis.RedisClient.<init>(RedisClient.scala:98)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:12)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:7)

        at
org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)

        at com.redis.RedisClientPool.withClient(Pool.scala:34)

        at
com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

        at
com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

        at
com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)

        at
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)

        at
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)

        at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)

        at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)

        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)

        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)

        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)

        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:211)

Re: RemoteTransportException when trying to redis in flink code

Posted by Till Rohrmann <tr...@apache.org>.
Great to hear that you solved your problem :-)

On Wed, Apr 6, 2016 at 2:29 PM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> Till,
>   Found the issue, it was my bad assumption about GlobalConfiguration,
> what I thought was once the configuration is read from the client machine
> GlobalConfiguration params will passed on to the task manager nodes, as
> well, it was not and values from default was getting pickup, which was
> localhost 6379 and there was no redis running in localhost of task manager.
>
> balaji
>
> On Wed, Apr 6, 2016 at 3:29 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hmm I'm not a Redis expert, but are you sure that you see a successful
>> ping reply in the logs of the TaskManagers and not only in the client logs?
>>
>> Another thing: Is the redisClient thread safe? Multiple map tasks might
>> be accessing the set and get methods concurrently.
>>
>> Another question: The code of DriverStreamHelper you've just sent is not
>> the code you've used when receiving the stack trace, right? Because in the
>> stack trace it's written that you access a RedisClientPool from the
>> DriverStreamHelper.set method.
>>
>> Cheers,
>> Till
>>
>>
>> On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <
>> balaji.rajagopalan@olacabs.com> wrote:
>>
>>> Till,
>>>   I have checked from all the taskmanager nodes I am able to establish a
>>> connection by installing a redis-cli on those nodes. The thing is in the
>>> constructor I am able to set and get values, also I am getting PONG for the
>>> ping. But once object is initialized when I try to call DriverStreamHelper.get
>>> and DriverStreamHelper.set from map/apply function I get the connection
>>> refused. This may not be related to flink but rather to some security
>>> setting with Amazon AWS EMR, this is assumption now. I have also tried with
>>> 3 different redis libraries to rule out any errors with libraries the same
>>> exception in all.
>>>
>>> object DriverStreamHelper {
>>>
>>>
>>>   implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")
>>>
>>>   val redisClient = RedisClient(host=redisHost, port=redisPort)
>>>
>>>   val p = redisClient.ping()
>>>   p.map{ res => LOG.info(s"Reply from Redis client : $res") }
>>>
>>>
>>>
>>>   val postFix = System.currentTimeMillis()
>>>   val key = "some-key" + postFix
>>>   val value = "some-value" + postFix
>>>   set(key, value, Some(10000L))
>>>   LOG.info(s"Going to get the value from Redis ${get(key)}")
>>>
>>>   def set(k: String, v: String): Unit = {
>>>     redisClient.set(k,v)
>>>   }
>>>
>>>   def set(k: String, v: String, exTime: Option[Long]): Unit = {
>>>       redisClient.set(k,v,exTime)
>>>   }
>>>
>>>
>>> def get(k: String): Option[String] = {
>>> import scala.concurrent.duration._
>>> val f = redisClient.get[String](k)
>>> Await.result(f, 1.seconds) //FIXME - really bad need to return future
>>> here.
>>> }
>>>
>>> }
>>>
>>>
>>> On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Balaji,
>>>>
>>>> from the stack trace it looks as if you cannot open a connection redis.
>>>> Have you checked that you can access redis from all your TaskManager nodes?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <
>>>> balaji.rajagopalan@olacabs.com> wrote:
>>>>
>>>>> I am trying to use AWS EMR yarn cluster where the flink code runs, in
>>>>> one of apply window function, I try to set some values in redis it fails. I
>>>>> have tried to access the same redis with no flink code and get/set works,
>>>>> but from the flink I get  into this exception. Any inputs on what might be
>>>>> going wrong.
>>>>>
>>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>>>> Error at remote task manager 'some-ip'.
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>>
>>>>> at
>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>>
>>>>> at
>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>>
>>>>> at
>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>>>>
>>>>> at
>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>>>>
>>>>> at
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>
>>>>> at
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>
>>>>> at
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>
>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>
>>>>> at
>>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Caused by:
>>>>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>>
>>>>> at
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>>
>>>>> at
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>>
>>>>> at
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>>>>>
>>>>> at
>>>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>>>
>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>>>
>>>>> ... 2 more
>>>>>
>>>>>
>>>>> Caused by: java.lang.RuntimeException: java.net.ConnectException:
>>>>> Connection refused
>>>>>
>>>>>         at com.redis.IO <http://com.redis.io/>
>>>>> $class.connect(IO.scala:37)
>>>>>
>>>>>         at com.redis.RedisClient.connect(RedisClient.scala:94)
>>>>>
>>>>>         at
>>>>> com.redis.RedisCommand$class.initialize(RedisClient.scala:71)
>>>>>
>>>>>         at com.redis.RedisClient.initialize(RedisClient.scala:94)
>>>>>
>>>>>         at com.redis.RedisClient.<init>(RedisClient.scala:98)
>>>>>
>>>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:12)
>>>>>
>>>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:7)
>>>>>
>>>>>         at
>>>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)
>>>>>
>>>>>         at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>>>>
>>>>>         at
>>>>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)
>>>>>
>>>>>         at
>>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)
>>>>>
>>>>>         at
>>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>>>
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Caused by: java.net.ConnectException: Connection refused
>>>>>
>>>>>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>>>>>
>>>>>         at
>>>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>>>>
>>>>>         at
>>>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>>>>
>>>>>         at
>>>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>>>>
>>>>>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>>>>
>>>>>         at java.net.Socket.connect(Socket.java:589)
>>>>>
>>>>>         at java.net.Socket.connect(Socket.java:538)
>>>>>
>>>>>         at java.net.Socket.<init>(Socket.java:434)
>>>>>
>>>>>         at java.net.Socket.<init>(Socket.java:211)
>>>>>
>>>>
>>>>
>>>
>>
>

Re: RemoteTransportException when trying to redis in flink code

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Till,
  Found the issue, it was my bad assumption about GlobalConfiguration, what
I thought was once the configuration is read from the client machine
GlobalConfiguration params will passed on to the task manager nodes, as
well, it was not and values from default was getting pickup, which was
localhost 6379 and there was no redis running in localhost of task manager.

balaji

On Wed, Apr 6, 2016 at 3:29 PM, Till Rohrmann <tr...@apache.org> wrote:

> Hmm I'm not a Redis expert, but are you sure that you see a successful
> ping reply in the logs of the TaskManagers and not only in the client logs?
>
> Another thing: Is the redisClient thread safe? Multiple map tasks might be
> accessing the set and get methods concurrently.
>
> Another question: The code of DriverStreamHelper you've just sent is not
> the code you've used when receiving the stack trace, right? Because in the
> stack trace it's written that you access a RedisClientPool from the
> DriverStreamHelper.set method.
>
> Cheers,
> Till
>
>
> On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> Till,
>>   I have checked from all the taskmanager nodes I am able to establish a
>> connection by installing a redis-cli on those nodes. The thing is in the
>> constructor I am able to set and get values, also I am getting PONG for the
>> ping. But once object is initialized when I try to call DriverStreamHelper.get
>> and DriverStreamHelper.set from map/apply function I get the connection
>> refused. This may not be related to flink but rather to some security
>> setting with Amazon AWS EMR, this is assumption now. I have also tried with
>> 3 different redis libraries to rule out any errors with libraries the same
>> exception in all.
>>
>> object DriverStreamHelper {
>>
>>
>>   implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")
>>
>>   val redisClient = RedisClient(host=redisHost, port=redisPort)
>>
>>   val p = redisClient.ping()
>>   p.map{ res => LOG.info(s"Reply from Redis client : $res") }
>>
>>
>>
>>   val postFix = System.currentTimeMillis()
>>   val key = "some-key" + postFix
>>   val value = "some-value" + postFix
>>   set(key, value, Some(10000L))
>>   LOG.info(s"Going to get the value from Redis ${get(key)}")
>>
>>   def set(k: String, v: String): Unit = {
>>     redisClient.set(k,v)
>>   }
>>
>>   def set(k: String, v: String, exTime: Option[Long]): Unit = {
>>       redisClient.set(k,v,exTime)
>>   }
>>
>>
>> def get(k: String): Option[String] = {
>> import scala.concurrent.duration._
>> val f = redisClient.get[String](k)
>> Await.result(f, 1.seconds) //FIXME - really bad need to return future
>> here.
>> }
>>
>> }
>>
>>
>> On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Balaji,
>>>
>>> from the stack trace it looks as if you cannot open a connection redis.
>>> Have you checked that you can access redis from all your TaskManager nodes?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <
>>> balaji.rajagopalan@olacabs.com> wrote:
>>>
>>>> I am trying to use AWS EMR yarn cluster where the flink code runs, in
>>>> one of apply window function, I try to set some values in redis it fails. I
>>>> have tried to access the same redis with no flink code and get/set works,
>>>> but from the flink I get  into this exception. Any inputs on what might be
>>>> going wrong.
>>>>
>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>>> Error at remote task manager 'some-ip'.
>>>>
>>>> at
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>>>>
>>>> at
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>
>>>> at
>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>
>>>> at
>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>
>>>> at
>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>>>
>>>> at
>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>>>
>>>> at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>
>>>> at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>
>>>> at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>
>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>
>>>> at
>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by:
>>>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>>>>
>>>> at
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>>>>
>>>> at
>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>
>>>> at
>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>
>>>> at
>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>
>>>> at
>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>>>>
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>>>>
>>>> at
>>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>>
>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>>
>>>> ... 2 more
>>>>
>>>>
>>>> Caused by: java.lang.RuntimeException: java.net.ConnectException:
>>>> Connection refused
>>>>
>>>>         at com.redis.IO <http://com.redis.io/>
>>>> $class.connect(IO.scala:37)
>>>>
>>>>         at com.redis.RedisClient.connect(RedisClient.scala:94)
>>>>
>>>>         at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)
>>>>
>>>>         at com.redis.RedisClient.initialize(RedisClient.scala:94)
>>>>
>>>>         at com.redis.RedisClient.<init>(RedisClient.scala:98)
>>>>
>>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:12)
>>>>
>>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:7)
>>>>
>>>>         at
>>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)
>>>>
>>>>         at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>>>
>>>>         at
>>>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)
>>>>
>>>>         at
>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)
>>>>
>>>>         at
>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>>
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>>
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by: java.net.ConnectException: Connection refused
>>>>
>>>>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>>>>
>>>>         at
>>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>>>
>>>>         at
>>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>>>
>>>>         at
>>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>>>
>>>>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>>>
>>>>         at java.net.Socket.connect(Socket.java:589)
>>>>
>>>>         at java.net.Socket.connect(Socket.java:538)
>>>>
>>>>         at java.net.Socket.<init>(Socket.java:434)
>>>>
>>>>         at java.net.Socket.<init>(Socket.java:211)
>>>>
>>>
>>>
>>
>

Re: RemoteTransportException when trying to redis in flink code

Posted by Till Rohrmann <tr...@apache.org>.
Hmm I'm not a Redis expert, but are you sure that you see a successful ping
reply in the logs of the TaskManagers and not only in the client logs?

Another thing: Is the redisClient thread safe? Multiple map tasks might be
accessing the set and get methods concurrently.

Another question: The code of DriverStreamHelper you've just sent is not
the code you've used when receiving the stack trace, right? Because in the
stack trace it's written that you access a RedisClientPool from the
DriverStreamHelper.set method.

Cheers,
Till


On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> Till,
>   I have checked from all the taskmanager nodes I am able to establish a
> connection by installing a redis-cli on those nodes. The thing is in the
> constructor I am able to set and get values, also I am getting PONG for the
> ping. But once object is initialized when I try to call DriverStreamHelper.get
> and DriverStreamHelper.set from map/apply function I get the connection
> refused. This may not be related to flink but rather to some security
> setting with Amazon AWS EMR, this is assumption now. I have also tried with
> 3 different redis libraries to rule out any errors with libraries the same
> exception in all.
>
> object DriverStreamHelper {
>
>
>   implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")
>
>   val redisClient = RedisClient(host=redisHost, port=redisPort)
>
>   val p = redisClient.ping()
>   p.map{ res => LOG.info(s"Reply from Redis client : $res") }
>
>
>
>   val postFix = System.currentTimeMillis()
>   val key = "some-key" + postFix
>   val value = "some-value" + postFix
>   set(key, value, Some(10000L))
>   LOG.info(s"Going to get the value from Redis ${get(key)}")
>
>   def set(k: String, v: String): Unit = {
>     redisClient.set(k,v)
>   }
>
>   def set(k: String, v: String, exTime: Option[Long]): Unit = {
>       redisClient.set(k,v,exTime)
>   }
>
>
> def get(k: String): Option[String] = {
> import scala.concurrent.duration._
> val f = redisClient.get[String](k)
> Await.result(f, 1.seconds) //FIXME - really bad need to return future
> here.
> }
>
> }
>
>
> On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Balaji,
>>
>> from the stack trace it looks as if you cannot open a connection redis.
>> Have you checked that you can access redis from all your TaskManager nodes?
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <
>> balaji.rajagopalan@olacabs.com> wrote:
>>
>>> I am trying to use AWS EMR yarn cluster where the flink code runs, in
>>> one of apply window function, I try to set some values in redis it fails. I
>>> have tried to access the same redis with no flink code and get/set works,
>>> but from the flink I get  into this exception. Any inputs on what might be
>>> going wrong.
>>>
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Error at remote task manager 'some-ip'.
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>
>>> at
>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>
>>> at
>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>
>>> at
>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>>
>>> at
>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>
>>> at
>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>
>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by:
>>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>
>>> at
>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>>>
>>> at
>>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>>>
>>> at
>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>
>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>
>>> ... 2 more
>>>
>>>
>>> Caused by: java.lang.RuntimeException: java.net.ConnectException:
>>> Connection refused
>>>
>>>         at com.redis.IO <http://com.redis.io/>
>>> $class.connect(IO.scala:37)
>>>
>>>         at com.redis.RedisClient.connect(RedisClient.scala:94)
>>>
>>>         at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)
>>>
>>>         at com.redis.RedisClient.initialize(RedisClient.scala:94)
>>>
>>>         at com.redis.RedisClient.<init>(RedisClient.scala:98)
>>>
>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:12)
>>>
>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:7)
>>>
>>>         at
>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)
>>>
>>>         at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>>
>>>         at
>>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)
>>>
>>>         at
>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)
>>>
>>>         at
>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)
>>>
>>>         at
>>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)
>>>
>>>         at
>>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.net.ConnectException: Connection refused
>>>
>>>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>>>
>>>         at
>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>>
>>>         at
>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>>
>>>         at
>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>>
>>>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>>
>>>         at java.net.Socket.connect(Socket.java:589)
>>>
>>>         at java.net.Socket.connect(Socket.java:538)
>>>
>>>         at java.net.Socket.<init>(Socket.java:434)
>>>
>>>         at java.net.Socket.<init>(Socket.java:211)
>>>
>>
>>
>

Re: RemoteTransportException when trying to redis in flink code

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Till,
  I have checked from all the taskmanager nodes I am able to establish a
connection by installing a redis-cli on those nodes. The thing is in the
constructor I am able to set and get values, also I am getting PONG for the
ping. But once object is initialized when I try to call DriverStreamHelper.get
and DriverStreamHelper.set from map/apply function I get the connection
refused. This may not be related to flink but rather to some security
setting with Amazon AWS EMR, this is assumption now. I have also tried with
3 different redis libraries to rule out any errors with libraries the same
exception in all.

object DriverStreamHelper {


  implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")

  val redisClient = RedisClient(host=redisHost, port=redisPort)

  val p = redisClient.ping()
  p.map{ res => LOG.info(s"Reply from Redis client : $res") }



  val postFix = System.currentTimeMillis()
  val key = "some-key" + postFix
  val value = "some-value" + postFix
  set(key, value, Some(10000L))
  LOG.info(s"Going to get the value from Redis ${get(key)}")

  def set(k: String, v: String): Unit = {
    redisClient.set(k,v)
  }

  def set(k: String, v: String, exTime: Option[Long]): Unit = {
      redisClient.set(k,v,exTime)
  }


def get(k: String): Option[String] = {
import scala.concurrent.duration._
val f = redisClient.get[String](k)
Await.result(f, 1.seconds) //FIXME - really bad need to return future here.
}

}


On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Balaji,
>
> from the stack trace it looks as if you cannot open a connection redis.
> Have you checked that you can access redis from all your TaskManager nodes?
>
> Cheers,
> Till
>
> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> I am trying to use AWS EMR yarn cluster where the flink code runs, in one
>> of apply window function, I try to set some values in redis it fails. I
>> have tried to access the same redis with no flink code and get/set works,
>> but from the flink I get  into this exception. Any inputs on what might be
>> going wrong.
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Error at remote task manager 'some-ip'.
>>
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>>
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by:
>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>>
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>>
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>>
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>
>> ... 2 more
>>
>>
>> Caused by: java.lang.RuntimeException: java.net.ConnectException:
>> Connection refused
>>
>>         at com.redis.IO <http://com.redis.io/>$class.connect(IO.scala:37)
>>
>>         at com.redis.RedisClient.connect(RedisClient.scala:94)
>>
>>         at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)
>>
>>         at com.redis.RedisClient.initialize(RedisClient.scala:94)
>>
>>         at com.redis.RedisClient.<init>(RedisClient.scala:98)
>>
>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:12)
>>
>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:7)
>>
>>         at
>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)
>>
>>         at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>
>>         at
>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)
>>
>>         at
>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)
>>
>>         at
>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)
>>
>>         at
>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)
>>
>>         at
>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)
>>
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)
>>
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)
>>
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)
>>
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)
>>
>>         at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.net.ConnectException: Connection refused
>>
>>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>>
>>         at
>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>
>>         at
>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>
>>         at
>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>
>>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>
>>         at java.net.Socket.connect(Socket.java:589)
>>
>>         at java.net.Socket.connect(Socket.java:538)
>>
>>         at java.net.Socket.<init>(Socket.java:434)
>>
>>         at java.net.Socket.<init>(Socket.java:211)
>>
>
>

Re: RemoteTransportException when trying to redis in flink code

Posted by Till Rohrmann <tr...@apache.org>.
Hi Balaji,

from the stack trace it looks as if you cannot open a connection redis.
Have you checked that you can access redis from all your TaskManager nodes?

Cheers,
Till

On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> I am trying to use AWS EMR yarn cluster where the flink code runs, in one
> of apply window function, I try to set some values in redis it fails. I
> have tried to access the same redis with no flink code and get/set works,
> but from the flink I get  into this exception. Any inputs on what might be
> going wrong.
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Error at remote task manager 'some-ip'.
>
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by:
> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>
> at
> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>
> at
> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>
> ... 2 more
>
>
> Caused by: java.lang.RuntimeException: java.net.ConnectException:
> Connection refused
>
>         at com.redis.IO <http://com.redis.io/>$class.connect(IO.scala:37)
>
>         at com.redis.RedisClient.connect(RedisClient.scala:94)
>
>         at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)
>
>         at com.redis.RedisClient.initialize(RedisClient.scala:94)
>
>         at com.redis.RedisClient.<init>(RedisClient.scala:98)
>
>         at com.redis.RedisClientFactory.makeObject(Pool.scala:12)
>
>         at com.redis.RedisClientFactory.makeObject(Pool.scala:7)
>
>         at
> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)
>
>         at com.redis.RedisClientPool.withClient(Pool.scala:34)
>
>         at
> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)
>
>         at
> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)
>
>         at
> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)
>
>         at
> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)
>
>         at
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)
>
>         at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)
>
>         at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)
>
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)
>
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)
>
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.net.ConnectException: Connection refused
>
>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>
>         at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>
>         at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>
>         at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>
>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>
>         at java.net.Socket.connect(Socket.java:589)
>
>         at java.net.Socket.connect(Socket.java:538)
>
>         at java.net.Socket.<init>(Socket.java:434)
>
>         at java.net.Socket.<init>(Socket.java:211)
>