You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Navneeth Krishnan <re...@gmail.com> on 2017/09/10 07:04:22 UTC

Queryable State

Hi All,

I'm running a streaming job on flink 1.3.2 with few queryable states. There
are 3 task managers and a job manager. I'm getting timeout exception when
trying to query a state and also a warning message in the job manager log.

*Client:*
final Configuration config = new Configuration();

config.setString(JobManagerOptions.ADDRESS, jobMgrHost);
config.setInteger(JobManagerOptions.PORT,
JobManagerOptions.PORT.defaultValue());

final HighAvailabilityServices highAvailabilityServices =
        HighAvailabilityServicesUtils.createHighAvailabilityServices(
                config,
                Executors.newSingleThreadScheduledExecutor(),

HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

QueryableStateClient client = new QueryableStateClient(config,
highAvailabilityServices);


*Exception:*
Exception in thread "main" io.netty.channel.ConnectTimeoutException:
connection timed out: /172.31.18.170:43537
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
at
io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

*Job Manager:*
2017-09-10 06:55:41,599 WARN  akka.remote.ReliableDeliverySupervisor
                 - Association with remote system [akka.tcp://
flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms.
Reason: [Disassociated]

Thanks,
Navneeth

Re: Queryable State

Posted by Navneeth Krishnan <re...@gmail.com>.
Hi,

Any idea on how to solve this issue?

Thanks

On Wed, Sep 13, 2017 at 10:12 AM, Navneeth Krishnan <
reachnavneeth2@gmail.com> wrote:

> Hi,
>
> I am sure I have provided the right job manager details because the
> connection timeout ip is the task manager where the state is kept. I guess
> the client is able to reach the job manager and figure out where the state
> is. Also if I provide a wrong state name, I'm receiving unknown state
> exception. I couldn't find why there is a timeout and a warning message is
> logged in the job manager.
>
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas <re...@gmail.com>
> wrote:
>
>> Hi,
>>
>>
>> are you sure your jobmanager is running and is accessible from the
>> supplied
>> hostname and port? If you can start up the FLink UI of the job which
>> creates
>> your queryable state, it should have the details of the job manager and
>> the
>> port to be used in this queryable client job.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Re: Queryable State

Posted by Navneeth Krishnan <re...@gmail.com>.
No, it doesn't work even if I increase the timeout.

The state being fetched is a Map of data and has around 100 entries in it.
I have a single job manager and 3 task managers with 16 slots each running
on AWS EC2.

final TypeSerializer<String> keySerializer =
        TypeInformation.of(new TypeHint<String>()
{}).createSerializer(new ExecutionConfig());
final TypeSerializer<HashMap<String, Object>> valueSerializer =
        TypeInformation.of(new TypeHint<HashMap<String, Object>>()
{}).createSerializer(new ExecutionConfig());

final byte[] serializedKey =
        KvStateRequestSerializer.serializeKeyAndNamespace(
                key, keySerializer,
                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

final FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);


On Fri, Sep 15, 2017 at 6:44 AM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> Hi Navneeth,
>
> If you increase the timeout, everything works ok?
> I suppose from your config that you are running in standalone mode, right?
>
> Any other information about the job (e.g. code and/or size of state being
> fetched) and
> the cluster setup that can help us pin down the problem, would be
> appreciated.
>
> Thanks,
> Kostas
>
> On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan <re...@gmail.com>
> wrote:
>
> Hi,
>
> I am sure I have provided the right job manager details because the
> connection timeout ip is the task manager where the state is kept. I guess
> the client is able to reach the job manager and figure out where the state
> is. Also if I provide a wrong state name, I'm receiving unknown state
> exception. I couldn't find why there is a timeout and a warning message is
> logged in the job manager.
>
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas <re...@gmail.com>
> wrote:
>
>> Hi,
>>
>>
>> are you sure your jobmanager is running and is accessible from the
>> supplied
>> hostname and port? If you can start up the FLink UI of the job which
>> creates
>> your queryable state, it should have the details of the job manager and
>> the
>> port to be used in this queryable client job.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>

Re: Queryable State

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Navneeth,

If you increase the timeout, everything works ok?
I suppose from your config that you are running in standalone mode, right?

Any other information about the job (e.g. code and/or size of state being fetched) and 
the cluster setup that can help us pin down the problem, would be appreciated.

Thanks,
Kostas

> On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan <re...@gmail.com> wrote:
> 
> Hi,
> 
> I am sure I have provided the right job manager details because the connection timeout ip is the task manager where the state is kept. I guess the client is able to reach the job manager and figure out where the state is. Also if I provide a wrong state name, I'm receiving unknown state exception. I couldn't find why there is a timeout and a warning message is logged in the job manager.
> 
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas <revolutionisme@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> 
> are you sure your jobmanager is running and is accessible from the supplied
> hostname and port? If you can start up the FLink UI of the job which creates
> your queryable state, it should have the details of the job manager and the
> port to be used in this queryable client job.
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 


Re: Queryable State

Posted by Navneeth Krishnan <re...@gmail.com>.
Hi,

I am sure I have provided the right job manager details because the
connection timeout ip is the task manager where the state is kept. I guess
the client is able to reach the job manager and figure out where the state
is. Also if I provide a wrong state name, I'm receiving unknown state
exception. I couldn't find why there is a timeout and a warning message is
logged in the job manager.

On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas <re...@gmail.com>
wrote:

> Hi,
>
>
> are you sure your jobmanager is running and is accessible from the supplied
> hostname and port? If you can start up the FLink UI of the job which
> creates
> your queryable state, it should have the details of the job manager and the
> port to be used in this queryable client job.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

RE: Queryable State

Posted by "Marchant, Hayden " <ha...@citi.com>.
I can see the job running in the FlinkUI for the job, and specifically specified the port for the Job Manager. When I provided a different port, I got an akka exception. Here, it seems that the code is getting further. I think that it might be connected with how I am creating the StateDescriptor. What exactly does it mean when the KvStateLocation can't be found?

-----Original Message-----
From: Biplob Biswas [mailto:revolutionisme@gmail.com] 
Sent: Wednesday, September 13, 2017 2:20 PM
To: user@flink.apache.org
Subject: Re: Queryable State

Hi, 


are you sure your jobmanager is running and is accessible from the supplied hostname and port? If you can start up the FLink UI of the job which creates your queryable state, it should have the details of the job manager and the port to be used in this queryable client job.



--
Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=ox9rY5RgZleCKLmUaw2y4BpSeUaf32AN7o4HRP1gkUQ&s=gZtSvvulOpw2jMACIgulbIacj6bKfndY6B7LdP-jRbg&e= 

Re: Queryable State

Posted by Biplob Biswas <re...@gmail.com>.
Hi, 


are you sure your jobmanager is running and is accessible from the supplied
hostname and port? If you can start up the FLink UI of the job which creates
your queryable state, it should have the details of the job manager and the
port to be used in this queryable client job.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Queryable State

Posted by Navneeth Krishnan <re...@gmail.com>.
Hi All,

Any suggestions would really be helpful. Thanks

On Sun, Sep 10, 2017 at 12:04 AM, Navneeth Krishnan <
reachnavneeth2@gmail.com> wrote:

> Hi All,
>
> I'm running a streaming job on flink 1.3.2 with few queryable states.
> There are 3 task managers and a job manager. I'm getting timeout exception
> when trying to query a state and also a warning message in the job manager
> log.
>
> *Client:*
> final Configuration config = new Configuration();
>
> config.setString(JobManagerOptions.ADDRESS, jobMgrHost);
> config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());
>
> final HighAvailabilityServices highAvailabilityServices =
>         HighAvailabilityServicesUtils.createHighAvailabilityServices(
>                 config,
>                 Executors.newSingleThreadScheduledExecutor(),
>                 HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>
> QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
>
>
> *Exception:*
> Exception in thread "main" io.netty.channel.ConnectTimeoutException:
> connection timed out: /172.31.18.170:43537
> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(
> AbstractNioChannel.java:212)
> at io.netty.util.concurrent.PromiseTask$RunnableAdapter.
> call(PromiseTask.java:38)
> at io.netty.util.concurrent.ScheduledFutureTask.run(
> ScheduledFutureTask.java:120)
> at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
> SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> *Job Manager:*
> 2017-09-10 06:55:41,599 WARN  akka.remote.ReliableDeliverySupervisor
>                    - Association with remote system [akka.tcp://
> flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms.
> Reason: [Disassociated]
>
> Thanks,
> Navneeth
>
>
>