You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Colin Williams <co...@gmail.com> on 2017/11/21 05:44:55 UTC

Docker and AWS taskmanager configuration

Hi,

We noticed that we couldn't parallelize our flink docker containers and
this looks like an issue that other have experienced. In our environment we
were not setting any hostname in the flink configuration. This worked for
the single node, but it looks like the taskmanagers would have the
exception also similar to others:

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition 2ec10eeac8969a16945b3713b63c0f4f@11052a989c7921423e44653285481e23
not found.
> 	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
> 	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
> 	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
> 	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
> 	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
> 	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
> 	at akka.dispatch.OnComplete.internal(Future.scala:248)
> 	at akka.dispatch.OnComplete.internal(Future.scala:245)
> 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> 	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> 	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
In our AWS environment we are only running one container per EC2 instance,
and each instance has a "unique-dns-address" associated with it.
uniquie-dns-address is similar to ip-XXX-XX-XX-XXX.aws-region-X .

Then so that we don't have to do any additional DNS configuration, it would
be convenient to exploit this dns address for each taskmanager to talk to
each other.

I tested that I could reach each taskmanager from the unique-dns-address
via telnet to one of the taskmanager ports and I was able to connect. This
made me think that setting taskmanager.hostname to the address would solve
my issue.

However when I tried to set taskmanager.hostname :
unique-dns-address in flink-conf.yaml I ended up with a java.net.BindException:
Cannot assign requested address.I'm not entirely sure why this happened.

But I looked around and found some other list message that mentioned
https://doc.akka.io/docs/akka/2.4.1/additional/faq.html / RE: remote actor.

So I set *akka.remote.netty.tcp.hostname*: unique-dns-address again for
each instance. However I was not certain which taskmanager port to set
for akka.remote.netty.tcp.port
. Then I left it unset but tried again I had the same
PartitionNotFoundException

I realize this is a complicated issue which varies for each environment.
But I am asking for advice regarding other things I should try to tackle
the issue.

Furhtermore, if I'm on the right track, what taskmanager service port
should correspond to akka.remote.netty.tcp.port ?

Re: Docker and AWS taskmanager configuration

Posted by Colin Williams <co...@gmail.com>.
Hi Patrick "unique-dns-address"  is an alias for private IP. If XXX-XX-XX-XXX
is the private IP, then ip-XXX-XX-XX-XXX.aws-region-X is the
"unique-dns-address".
We were using auto scaling groups.

What worked out with the givens above was setting docker --net=host , and
then we saw "unique-dns-address" was set for the taskmanagers akka address.
Given we are ok with 1 taskmanager container per host, this all worked out.

Thanks,

Colin Williams



On Tue, Nov 21, 2017 at 6:14 AM, Patrick Lucas <pa...@data-artisans.com>
wrote:

> Hi Colin,
>
> Is each instance's "unique-dns-address" equal to the hostname of the
> instance or is the hostname something else? If it's different from the
> hostname, you're correct in assuming you need to configure each node to
> advertise its unique-dns-address intead.
>
> Are the unique-dns-addresses aliases for public or private IPs? I.e. in
> your example of a unique-dns-address do the X's map to the private IP of
> the instance or some public IP? If I recall correctly, in AWS (at least
> within a VPC), instance's public IPs are not actually bound to the instance
> itself and are more like a NAT/DMZ address, meaning you can't actually bind
> a port to them. This might work differently in EC2-Classic.
>
> If you ensure that each node advertises a bindable, resolvable name or IP
> address—with jobmanager.rpc.address on the jobmanager and
> taskmanager.hostname on the taskmanager—then they should all be able to
> discover, address, and communicate with each other with no problems.
>
> --
> Patrick Lucas
>
> On Tue, Nov 21, 2017 at 6:44 AM, Colin Williams <
> colin.williams.seattle@gmail.com> wrote:
>
>> Hi,
>>
>> We noticed that we couldn't parallelize our flink docker containers and
>> this looks like an issue that other have experienced. In our environment we
>> were not setting any hostname in the flink configuration. This worked for
>> the single node, but it looks like the taskmanagers would have the
>> exception also similar to others:
>>
>> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 2ec10eeac8969a16945b3713b63c0f4f@11052a989c7921423e44653285481e23 not found.
>>> 	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>>> 	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>>> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>>> 	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>>> 	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>>> 	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>>> 	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>>> 	at akka.dispatch.OnComplete.internal(Future.scala:248)
>>> 	at akka.dispatch.OnComplete.internal(Future.scala:245)
>>> 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>>> 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>>> 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>> 	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> 	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>> 	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>> In our AWS environment we are only running one container per EC2
>> instance, and each instance has a "unique-dns-address" associated with it.
>> uniquie-dns-address is similar to ip-XXX-XX-XX-XXX.aws-region-X .
>>
>> Then so that we don't have to do any additional DNS configuration, it
>> would be convenient to exploit this dns address for each taskmanager to
>> talk to each other.
>>
>> I tested that I could reach each taskmanager from the unique-dns-address
>> via telnet to one of the taskmanager ports and I was able to connect. This
>> made me think that setting taskmanager.hostname to the address would solve
>> my issue.
>>
>> However when I tried to set taskmanager.hostname :
>> unique-dns-address in flink-conf.yaml I ended up with a java.net.BindException:
>> Cannot assign requested address.I'm not entirely sure why this happened.
>>
>> But I looked around and found some other list message that mentioned
>> https://doc.akka.io/docs/akka/2.4.1/additional/faq.html / RE: remote
>> actor.
>>
>> So I set *akka.remote.netty.tcp.hostname*: unique-dns-address again for
>> each instance. However I was not certain which taskmanager port to set for akka.remote.netty.tcp.port
>> . Then I left it unset but tried again I had the same
>> PartitionNotFoundException
>>
>> I realize this is a complicated issue which varies for each environment.
>> But I am asking for advice regarding other things I should try to tackle
>> the issue.
>>
>> Furhtermore, if I'm on the right track, what taskmanager service port
>> should correspond to akka.remote.netty.tcp.port ?
>>
>>
>>
>>
>

Re: Docker and AWS taskmanager configuration

Posted by Patrick Lucas <pa...@data-artisans.com>.
Hi Colin,

Is each instance's "unique-dns-address" equal to the hostname of the
instance or is the hostname something else? If it's different from the
hostname, you're correct in assuming you need to configure each node to
advertise its unique-dns-address intead.

Are the unique-dns-addresses aliases for public or private IPs? I.e. in
your example of a unique-dns-address do the X's map to the private IP of
the instance or some public IP? If I recall correctly, in AWS (at least
within a VPC), instance's public IPs are not actually bound to the instance
itself and are more like a NAT/DMZ address, meaning you can't actually bind
a port to them. This might work differently in EC2-Classic.

If you ensure that each node advertises a bindable, resolvable name or IP
address—with jobmanager.rpc.address on the jobmanager and
taskmanager.hostname on the taskmanager—then they should all be able to
discover, address, and communicate with each other with no problems.

--
Patrick Lucas

On Tue, Nov 21, 2017 at 6:44 AM, Colin Williams <
colin.williams.seattle@gmail.com> wrote:

> Hi,
>
> We noticed that we couldn't parallelize our flink docker containers and
> this looks like an issue that other have experienced. In our environment we
> were not setting any hostname in the flink configuration. This worked for
> the single node, but it looks like the taskmanagers would have the
> exception also similar to others:
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 2ec10eeac8969a16945b3713b63c0f4f@11052a989c7921423e44653285481e23 not found.
>> 	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>> 	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>> 	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>> 	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>> 	at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>> 	at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>> 	at akka.dispatch.OnComplete.internal(Future.scala:248)
>> 	at akka.dispatch.OnComplete.internal(Future.scala:245)
>> 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>> 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>> 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> 	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> 	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> 	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
> In our AWS environment we are only running one container per EC2 instance,
> and each instance has a "unique-dns-address" associated with it.
> uniquie-dns-address is similar to ip-XXX-XX-XX-XXX.aws-region-X .
>
> Then so that we don't have to do any additional DNS configuration, it
> would be convenient to exploit this dns address for each taskmanager to
> talk to each other.
>
> I tested that I could reach each taskmanager from the unique-dns-address
> via telnet to one of the taskmanager ports and I was able to connect. This
> made me think that setting taskmanager.hostname to the address would solve
> my issue.
>
> However when I tried to set taskmanager.hostname :
> unique-dns-address in flink-conf.yaml I ended up with a java.net.BindException:
> Cannot assign requested address.I'm not entirely sure why this happened.
>
> But I looked around and found some other list message that mentioned
> https://doc.akka.io/docs/akka/2.4.1/additional/faq.html / RE: remote
> actor.
>
> So I set *akka.remote.netty.tcp.hostname*: unique-dns-address again for
> each instance. However I was not certain which taskmanager port to set for akka.remote.netty.tcp.port
> . Then I left it unset but tried again I had the same
> PartitionNotFoundException
>
> I realize this is a complicated issue which varies for each environment.
> But I am asking for advice regarding other things I should try to tackle
> the issue.
>
> Furhtermore, if I'm on the right track, what taskmanager service port
> should correspond to akka.remote.netty.tcp.port ?
>
>
>
>