You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2019/03/26 00:57:25 UTC

Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

I  have 2 options

1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a
side car container colocated on JM  ( Though it could on all TMs but that
looks to an overkill )

2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a
side car container colocated on TMs.  The Query Proxies are on the TMs, so
in essence the communication would be within containers of the POD and I
could load balance ( have ot test  )

The second alternative seems doable, but looks an overkill  but am not sure
how to establish a TM on the standalone QueryableStateClient, given that
TM's pod IP is not known till the pod is launched.

Has anyone had a successful QueryableState setup for flink  on k8s?

Regards,

Re: Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Posted by Vishal Santoshi <vi...@gmail.com>.
Konstantin,
                            I revert my reservations. My initial
reservation was having 2 services ( one for TMs and one for the native
Queryable Client proxy  ). Having established this setup though  it makes
sense. Having the  native Queryable Client proxy as a side car kind of
deeply couples the query layer with the TMs, inhibiting independent
development of the query layer.

Thanks.



On Fri, Mar 29, 2019 at 9:08 AM Vishal Santoshi <vi...@gmail.com>
wrote:

> Thanks Konstantin,
>                             That makes sense. To give you some context,
> the reason we are gravitating towards Queryable State is the architectural
> preference of Prometheus to scrape ( pull rather then push model ) and our
> intent to expose aggregations. That said your idea makes sense. The worry I
> had is the ip resolution of TMs that QueryableStateClient does and our
> wanting to avoid static ips . If I understand correctly you are proposing a
> proxy "external" to the Job deployment, as in an external service that
> discovers the job and  works off the ingress End Point that exposes the
> Queryable Port of the TMs ?
>
> That creates a fragmented architecture that I wanted to avoid, iff I
> understood your advise correctly.
>
> Vishal
>
>
>
>
>
>
> On Fri, Mar 29, 2019 at 5:42 AM Konstantin Knauf <ko...@ververica.com>
> wrote:
>
>> Hi Vishal,
>>
>> my approach would be a single Kubernetes service, which is backed by all
>> Taskmanagers of the job. The Taskmanagers will proxy the request for a
>> specific key to the correct Taskmanager. Yes, the Taskmanagers will cache
>> the location of the key groups.
>>
>> In addition to this Kubernetes service, you can of course have a
>> Jetty/Jersey REST based server that sends queries to this service.
>>
>> Please le me know if this works for you.
>>
>> Hope this helps and cheers,
>>
>> Konstantin
>>
>>
>> On Thu, Mar 28, 2019 at 12:37 AM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> I think I got a handle on this. For those who might want to do this
>>>
>>>
>>> Here are the steps ( I could share the  Jetty/Jersey REST code too is
>>> required )
>>>
>>>
>>> *1.* Create a side car container on each pod that has a TM. I wrote a
>>> simple Jetty/Jersey REST based server that  execute queries against the
>>> local TM query server.
>>>
>>>   .
>>>
>>>   .
>>>
>>>   - name: queryable-state
>>>
>>>         image: _IMAGE_
>>>
>>>         args: ["queryable-state"]
>>>
>>>         env:
>>>
>>>           - name: POD_IP
>>>
>>>             valueFrom:
>>>
>>>               fieldRef:
>>>
>>>                 fieldPath: status.podIP
>>>
>>>         ports:
>>>
>>>           - containerPort: 9999
>>>
>>>             name: qstate-client
>>>
>>>         resources:
>>>
>>>           requests:
>>>
>>>             cpu: "0.25"
>>>
>>>             memory: "256Mi"
>>>
>>>
>>>    Note that POD_IP is the ip used by the REST based server to start
>>> the QueryableStateClient and the port is the default port of the TM query
>>> server ( 9069 I think ) of the colocated TM container.
>>>
>>>
>>>
>>> *2.* Expose the port ( in this case 9999 ) at the k8s service layer.
>>>
>>>
>>>
>>> And that did it.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> I though am worried about a couple of things
>>>
>>>
>>> *1*.
>>>
>>>  The  TM query server will ask JM for the key group and hence the TM a
>>> key belongs to for every request and then coordinate the coummunication
>>> between the client and that TM. Does flink do any optimzation, as in cache
>>> the key ranges and thus the affinity to a TM to reduce JM stress. I would
>>> imagine that being some well known distribution function on some well known
>>> hash algorithm, an incident key could be pinned to a TM without visiting
>>> the JM more then once.
>>>
>>>
>>> *2. *
>>>
>>> We do have use cases where we would want to iterate over all the keys in
>>> a key group ( and by extension on a TM ) for a job. Is that a possibility ?
>>>
>>>
>>>
>>> *3. *
>>>
>>> The overhead of having as many client containers as TMs.
>>>
>>>
>>>
>>> Any advise/ideas on the 3 worry points ?
>>>
>>>
>>>
>>> Regards
>>>
>>> On Mon, Mar 25, 2019 at 8:57 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> I  have 2 options
>>>>
>>>> 1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient
>>>> in a side car container colocated on JM  ( Though it could on all TMs but
>>>> that looks to an overkill )
>>>>
>>>> 2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in
>>>> a side car container colocated on TMs.  The Query Proxies are on the TMs,
>>>> so in essence the communication would be within containers of the POD and I
>>>> could load balance ( have ot test  )
>>>>
>>>> The second alternative seems doable, but looks an overkill  but am not
>>>> sure how to establish a TM on the standalone QueryableStateClient, given
>>>> that TM's pod IP is not known till the pod is launched.
>>>>
>>>> Has anyone had a successful QueryableState setup for flink  on k8s?
>>>>
>>>> Regards,
>>>>
>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>

Re: Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Posted by Vishal Santoshi <vi...@gmail.com>.
Thanks Konstantin,
                            That makes sense. To give you some context, the
reason we are gravitating towards Queryable State is the architectural
preference of Prometheus to scrape ( pull rather then push model ) and our
intent to expose aggregations. That said your idea makes sense. The worry I
had is the ip resolution of TMs that QueryableStateClient does and our
wanting to avoid static ips . If I understand correctly you are proposing a
proxy "external" to the Job deployment, as in an external service that
discovers the job and  works off the ingress End Point that exposes the
Queryable Port of the TMs ?

That creates a fragmented architecture that I wanted to avoid, iff I
understood your advise correctly.

Vishal






On Fri, Mar 29, 2019 at 5:42 AM Konstantin Knauf <ko...@ververica.com>
wrote:

> Hi Vishal,
>
> my approach would be a single Kubernetes service, which is backed by all
> Taskmanagers of the job. The Taskmanagers will proxy the request for a
> specific key to the correct Taskmanager. Yes, the Taskmanagers will cache
> the location of the key groups.
>
> In addition to this Kubernetes service, you can of course have a
> Jetty/Jersey REST based server that sends queries to this service.
>
> Please le me know if this works for you.
>
> Hope this helps and cheers,
>
> Konstantin
>
>
> On Thu, Mar 28, 2019 at 12:37 AM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> I think I got a handle on this. For those who might want to do this
>>
>>
>> Here are the steps ( I could share the  Jetty/Jersey REST code too is
>> required )
>>
>>
>> *1.* Create a side car container on each pod that has a TM. I wrote a
>> simple Jetty/Jersey REST based server that  execute queries against the
>> local TM query server.
>>
>>   .
>>
>>   .
>>
>>   - name: queryable-state
>>
>>         image: _IMAGE_
>>
>>         args: ["queryable-state"]
>>
>>         env:
>>
>>           - name: POD_IP
>>
>>             valueFrom:
>>
>>               fieldRef:
>>
>>                 fieldPath: status.podIP
>>
>>         ports:
>>
>>           - containerPort: 9999
>>
>>             name: qstate-client
>>
>>         resources:
>>
>>           requests:
>>
>>             cpu: "0.25"
>>
>>             memory: "256Mi"
>>
>>
>>    Note that POD_IP is the ip used by the REST based server to start the
>> QueryableStateClient and the port is the default port of the TM query
>> server ( 9069 I think ) of the colocated TM container.
>>
>>
>>
>> *2.* Expose the port ( in this case 9999 ) at the k8s service layer.
>>
>>
>>
>> And that did it.
>>
>>
>>
>>
>>
>>
>>
>>
>> I though am worried about a couple of things
>>
>>
>> *1*.
>>
>>  The  TM query server will ask JM for the key group and hence the TM a
>> key belongs to for every request and then coordinate the coummunication
>> between the client and that TM. Does flink do any optimzation, as in cache
>> the key ranges and thus the affinity to a TM to reduce JM stress. I would
>> imagine that being some well known distribution function on some well known
>> hash algorithm, an incident key could be pinned to a TM without visiting
>> the JM more then once.
>>
>>
>> *2. *
>>
>> We do have use cases where we would want to iterate over all the keys in
>> a key group ( and by extension on a TM ) for a job. Is that a possibility ?
>>
>>
>>
>> *3. *
>>
>> The overhead of having as many client containers as TMs.
>>
>>
>>
>> Any advise/ideas on the 3 worry points ?
>>
>>
>>
>> Regards
>>
>> On Mon, Mar 25, 2019 at 8:57 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> I  have 2 options
>>>
>>> 1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in
>>> a side car container colocated on JM  ( Though it could on all TMs but that
>>> looks to an overkill )
>>>
>>> 2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in
>>> a side car container colocated on TMs.  The Query Proxies are on the TMs,
>>> so in essence the communication would be within containers of the POD and I
>>> could load balance ( have ot test  )
>>>
>>> The second alternative seems doable, but looks an overkill  but am not
>>> sure how to establish a TM on the standalone QueryableStateClient, given
>>> that TM's pod IP is not known till the pod is launched.
>>>
>>> Has anyone had a successful QueryableState setup for flink  on k8s?
>>>
>>> Regards,
>>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Re: Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Vishal,

my approach would be a single Kubernetes service, which is backed by all
Taskmanagers of the job. The Taskmanagers will proxy the request for a
specific key to the correct Taskmanager. Yes, the Taskmanagers will cache
the location of the key groups.

In addition to this Kubernetes service, you can of course have a
Jetty/Jersey REST based server that sends queries to this service.

Please le me know if this works for you.

Hope this helps and cheers,

Konstantin


On Thu, Mar 28, 2019 at 12:37 AM Vishal Santoshi <vi...@gmail.com>
wrote:

> I think I got a handle on this. For those who might want to do this
>
>
> Here are the steps ( I could share the  Jetty/Jersey REST code too is
> required )
>
>
> *1.* Create a side car container on each pod that has a TM. I wrote a
> simple Jetty/Jersey REST based server that  execute queries against the
> local TM query server.
>
>   .
>
>   .
>
>   - name: queryable-state
>
>         image: _IMAGE_
>
>         args: ["queryable-state"]
>
>         env:
>
>           - name: POD_IP
>
>             valueFrom:
>
>               fieldRef:
>
>                 fieldPath: status.podIP
>
>         ports:
>
>           - containerPort: 9999
>
>             name: qstate-client
>
>         resources:
>
>           requests:
>
>             cpu: "0.25"
>
>             memory: "256Mi"
>
>
>    Note that POD_IP is the ip used by the REST based server to start the
> QueryableStateClient and the port is the default port of the TM query
> server ( 9069 I think ) of the colocated TM container.
>
>
>
> *2.* Expose the port ( in this case 9999 ) at the k8s service layer.
>
>
>
> And that did it.
>
>
>
>
>
>
>
>
> I though am worried about a couple of things
>
>
> *1*.
>
>  The  TM query server will ask JM for the key group and hence the TM a
> key belongs to for every request and then coordinate the coummunication
> between the client and that TM. Does flink do any optimzation, as in cache
> the key ranges and thus the affinity to a TM to reduce JM stress. I would
> imagine that being some well known distribution function on some well known
> hash algorithm, an incident key could be pinned to a TM without visiting
> the JM more then once.
>
>
> *2. *
>
> We do have use cases where we would want to iterate over all the keys in a
> key group ( and by extension on a TM ) for a job. Is that a possibility ?
>
>
> *3. *
>
> The overhead of having as many client containers as TMs.
>
>
>
> Any advise/ideas on the 3 worry points ?
>
>
>
> Regards
>
> On Mon, Mar 25, 2019 at 8:57 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> I  have 2 options
>>
>> 1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in
>> a side car container colocated on JM  ( Though it could on all TMs but that
>> looks to an overkill )
>>
>> 2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a
>> side car container colocated on TMs.  The Query Proxies are on the TMs, so
>> in essence the communication would be within containers of the POD and I
>> could load balance ( have ot test  )
>>
>> The second alternative seems doable, but looks an overkill  but am not
>> sure how to establish a TM on the standalone QueryableStateClient, given
>> that TM's pod IP is not known till the pod is launched.
>>
>> Has anyone had a successful QueryableState setup for flink  on k8s?
>>
>> Regards,
>>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Re: Do we have an example of setting up Queryable state ( proxies, client etc ) on k8s ?

Posted by Vishal Santoshi <vi...@gmail.com>.
I think I got a handle on this. For those who might want to do this


Here are the steps ( I could share the  Jetty/Jersey REST code too is
required )


*1.* Create a side car container on each pod that has a TM. I wrote a
simple Jetty/Jersey REST based server that  execute queries against the
local TM query server.

  .

  .

  - name: queryable-state

        image: _IMAGE_

        args: ["queryable-state"]

        env:

          - name: POD_IP

            valueFrom:

              fieldRef:

                fieldPath: status.podIP

        ports:

          - containerPort: 9999

            name: qstate-client

        resources:

          requests:

            cpu: "0.25"

            memory: "256Mi"


   Note that POD_IP is the ip used by the REST based server to start the
QueryableStateClient and the port is the default port of the TM query
server ( 9069 I think ) of the colocated TM container.



*2.* Expose the port ( in this case 9999 ) at the k8s service layer.



And that did it.








I though am worried about a couple of things


*1*.

 The  TM query server will ask JM for the key group and hence the TM a key
belongs to for every request and then coordinate the coummunication between
the client and that TM. Does flink do any optimzation, as in cache the key
ranges and thus the affinity to a TM to reduce JM stress. I would imagine
that being some well known distribution function on some well known hash
algorithm, an incident key could be pinned to a TM without visiting the JM
more then once.


*2. *

We do have use cases where we would want to iterate over all the keys in a
key group ( and by extension on a TM ) for a job. Is that a possibility ?


*3. *

The overhead of having as many client containers as TMs.



Any advise/ideas on the 3 worry points ?



Regards

On Mon, Mar 25, 2019 at 8:57 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> I  have 2 options
>
> 1. A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a
> side car container colocated on JM  ( Though it could on all TMs but that
> looks to an overkill )
>
> 2.A Rest Based,  in my case a Jetty/REST based QueryableStateClient  in a
> side car container colocated on TMs.  The Query Proxies are on the TMs, so
> in essence the communication would be within containers of the POD and I
> could load balance ( have ot test  )
>
> The second alternative seems doable, but looks an overkill  but am not
> sure how to establish a TM on the standalone QueryableStateClient, given
> that TM's pod IP is not known till the pod is launched.
>
> Has anyone had a successful QueryableState setup for flink  on k8s?
>
> Regards,
>