You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Satyam Shekhar <sa...@gmail.com> on 2020/06/03 06:58:52 UTC

Table Environment for Remote Execution

Hello,

I am running into a very basic problem while working with Table API. I wish
to create a TableEnvironment connected to a remote environment that uses
Blink planner in batch mode. Examples and documentation I have come across
so far recommend the following pattern to create such an environment -

var settings = EnvironmentSettings.newInstance()
                  .useBlinkPlanner()
                  .inBatchMode()
                  .build();
var tEnv = TableEnvironment.create(settings);

The above configuration, however, does not connect to a remote environment.
Tracing code in TableEnvironment.java, I see the following method in
BlinkExecutorFactory.java that appears to relevant -

Executor create(Map<String, String>, StreamExecutionEnvironment);

However, it seems to be only accessible through the Scala bridge. I can't
seem to find a way to instantiate a TableEnvironment that takes
StreamExecutionEnvironment as an argument. How do I achieve that?

Regards,
Satyam

Re: Table Environment for Remote Execution

Posted by Jeff Zhang <zj...@gmail.com>.
Hi Satyam,

I also meet the same issue when I integrate flink with zeppelin. Here's
what I did.

https://github.com/apache/zeppelin/blob/master/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java#L226

If you are interested in flink on zeppelin, you can refer the following
blogs and videos.

Flink on Zeppelin video
https://www.youtube.com/watch?v=YxPo0Fosjjg&list=PL4oy12nnS7FFtg3KV1iS5vDb0pTz12VcX

Flink on Zeppelin tutorial blogs: 1) Get started
https://link.medium.com/oppqD6dIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FoppqD6dIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
2) Batch https://link.medium.com/3qumbwRIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2F3qumbwRIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
3) Streaming https://link.medium.com/RBHa2lTIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FRBHa2lTIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
4) Advanced usage https://link.medium.com/CAekyoXIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FCAekyoXIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>



Satyam Shekhar <sa...@gmail.com> 于2020年6月4日周四 上午2:27写道:

>
> Thanks, Jark & Godfrey.
>
> The workaround was successful.
>
> I have created the following ticket to track the issue -
> https://issues.apache.org/jira/browse/FLINK-18095
>
> Regards,
> Satyam
>
> On Wed, Jun 3, 2020 at 3:26 AM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Satyam,
>>
>> In the long term, TableEnvironment is the entry point for pure Table/SQL
>> users. So it should have all the ability of StreamExecutionEnvironment.
>> I think remote execution is a reasonable feature, could you create an
>> JIRA issue for this?
>>
>> As a workaround, you can construct `StreamTableEnvironmentImpl` by
>> yourself via constructor, it can support batch mode
>> and StreamExecutionEnvironment.
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar <sa...@gmail.com>
>> wrote:
>>
>>> Thanks for the reply, Godfrey.
>>>
>>> I would also love to learn the reasoning behind that limitation.
>>>
>>> For more context, I am building a Java application that receives some
>>> user input via a GRPC service. The user's input is translated to some SQL
>>> that may be executed in streaming or batch mode based on custom business
>>> logic and submitted it to Flink for execution. In my current setup, I
>>> create an ExecutionEnvironment, register sources, and execute the
>>> corresponding SQL. I was able to achieve the desired behavior with
>>> StreamTableEnvironment but it has limitations around supported SQL in batch
>>> mode.
>>>
>>> While invoking the CLI from java program might be a solution, it doesn't
>>> feel like the most natural solution for the problem. Are there other ways
>>> to address this?
>>>
>>> Regards,
>>> Satyam
>>>
>>> On Wed, Jun 3, 2020 at 12:50 AM godfrey he <go...@gmail.com> wrote:
>>>
>>>> Hi Satyam,
>>>>
>>>> for blink batch mode, only TableEnvironment can be used,
>>>> and TableEnvironment do not take StreamExecutionEnvironment as argument.
>>>> Instead StreamExecutionEnvironment instance is created internally.
>>>>
>>>> back to your requirement, you can build your table program as user jar,
>>>> and submit the job through flink cli [1] to remote environment.
>>>>
>>>> Bests,
>>>> Godfrey
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>>>>
>>>>
>>>>
>>>> Satyam Shekhar <sa...@gmail.com> 于2020年6月3日周三 下午2:59写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am running into a very basic problem while working with Table API. I
>>>>> wish to create a TableEnvironment connected to a remote environment that
>>>>> uses Blink planner in batch mode. Examples and documentation I have come
>>>>> across so far recommend the following pattern to create such an environment
>>>>> -
>>>>>
>>>>> var settings = EnvironmentSettings.newInstance()
>>>>>                   .useBlinkPlanner()
>>>>>                   .inBatchMode()
>>>>>                   .build();
>>>>> var tEnv = TableEnvironment.create(settings);
>>>>>
>>>>> The above configuration, however, does not connect to a remote
>>>>> environment. Tracing code in TableEnvironment.java, I see the
>>>>> following method in BlinkExecutorFactory.java that appears to
>>>>> relevant -
>>>>>
>>>>> Executor create(Map<String, String>, StreamExecutionEnvironment);
>>>>>
>>>>> However, it seems to be only accessible through the Scala bridge. I
>>>>> can't seem to find a way to instantiate a TableEnvironment that takes
>>>>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>>>>
>>>>> Regards,
>>>>> Satyam
>>>>>
>>>>

-- 
Best Regards

Jeff Zhang

Re: Table Environment for Remote Execution

Posted by Satyam Shekhar <sa...@gmail.com>.
Thanks, Jark & Godfrey.

The workaround was successful.

I have created the following ticket to track the issue -
https://issues.apache.org/jira/browse/FLINK-18095

Regards,
Satyam

On Wed, Jun 3, 2020 at 3:26 AM Jark Wu <im...@gmail.com> wrote:

> Hi Satyam,
>
> In the long term, TableEnvironment is the entry point for pure Table/SQL
> users. So it should have all the ability of StreamExecutionEnvironment.
> I think remote execution is a reasonable feature, could you create an JIRA
> issue for this?
>
> As a workaround, you can construct `StreamTableEnvironmentImpl` by
> yourself via constructor, it can support batch mode
> and StreamExecutionEnvironment.
>
> Best,
> Jark
>
>
> On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar <sa...@gmail.com>
> wrote:
>
>> Thanks for the reply, Godfrey.
>>
>> I would also love to learn the reasoning behind that limitation.
>>
>> For more context, I am building a Java application that receives some
>> user input via a GRPC service. The user's input is translated to some SQL
>> that may be executed in streaming or batch mode based on custom business
>> logic and submitted it to Flink for execution. In my current setup, I
>> create an ExecutionEnvironment, register sources, and execute the
>> corresponding SQL. I was able to achieve the desired behavior with
>> StreamTableEnvironment but it has limitations around supported SQL in batch
>> mode.
>>
>> While invoking the CLI from java program might be a solution, it doesn't
>> feel like the most natural solution for the problem. Are there other ways
>> to address this?
>>
>> Regards,
>> Satyam
>>
>> On Wed, Jun 3, 2020 at 12:50 AM godfrey he <go...@gmail.com> wrote:
>>
>>> Hi Satyam,
>>>
>>> for blink batch mode, only TableEnvironment can be used,
>>> and TableEnvironment do not take StreamExecutionEnvironment as argument.
>>> Instead StreamExecutionEnvironment instance is created internally.
>>>
>>> back to your requirement, you can build your table program as user jar,
>>> and submit the job through flink cli [1] to remote environment.
>>>
>>> Bests,
>>> Godfrey
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>>>
>>>
>>>
>>> Satyam Shekhar <sa...@gmail.com> 于2020年6月3日周三 下午2:59写道:
>>>
>>>> Hello,
>>>>
>>>> I am running into a very basic problem while working with Table API. I
>>>> wish to create a TableEnvironment connected to a remote environment that
>>>> uses Blink planner in batch mode. Examples and documentation I have come
>>>> across so far recommend the following pattern to create such an environment
>>>> -
>>>>
>>>> var settings = EnvironmentSettings.newInstance()
>>>>                   .useBlinkPlanner()
>>>>                   .inBatchMode()
>>>>                   .build();
>>>> var tEnv = TableEnvironment.create(settings);
>>>>
>>>> The above configuration, however, does not connect to a remote
>>>> environment. Tracing code in TableEnvironment.java, I see the
>>>> following method in BlinkExecutorFactory.java that appears to relevant
>>>> -
>>>>
>>>> Executor create(Map<String, String>, StreamExecutionEnvironment);
>>>>
>>>> However, it seems to be only accessible through the Scala bridge. I
>>>> can't seem to find a way to instantiate a TableEnvironment that takes
>>>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>>>
>>>> Regards,
>>>> Satyam
>>>>
>>>

Re: Table Environment for Remote Execution

Posted by Jark Wu <im...@gmail.com>.
Hi Satyam,

In the long term, TableEnvironment is the entry point for pure Table/SQL
users. So it should have all the ability of StreamExecutionEnvironment.
I think remote execution is a reasonable feature, could you create an JIRA
issue for this?

As a workaround, you can construct `StreamTableEnvironmentImpl` by yourself
via constructor, it can support batch mode and StreamExecutionEnvironment.

Best,
Jark


On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar <sa...@gmail.com> wrote:

> Thanks for the reply, Godfrey.
>
> I would also love to learn the reasoning behind that limitation.
>
> For more context, I am building a Java application that receives some user
> input via a GRPC service. The user's input is translated to some SQL that
> may be executed in streaming or batch mode based on custom business logic
> and submitted it to Flink for execution. In my current setup, I create an
> ExecutionEnvironment, register sources, and execute the corresponding SQL.
> I was able to achieve the desired behavior with StreamTableEnvironment but
> it has limitations around supported SQL in batch mode.
>
> While invoking the CLI from java program might be a solution, it doesn't
> feel like the most natural solution for the problem. Are there other ways
> to address this?
>
> Regards,
> Satyam
>
> On Wed, Jun 3, 2020 at 12:50 AM godfrey he <go...@gmail.com> wrote:
>
>> Hi Satyam,
>>
>> for blink batch mode, only TableEnvironment can be used,
>> and TableEnvironment do not take StreamExecutionEnvironment as argument.
>> Instead StreamExecutionEnvironment instance is created internally.
>>
>> back to your requirement, you can build your table program as user jar,
>> and submit the job through flink cli [1] to remote environment.
>>
>> Bests,
>> Godfrey
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>>
>>
>>
>> Satyam Shekhar <sa...@gmail.com> 于2020年6月3日周三 下午2:59写道:
>>
>>> Hello,
>>>
>>> I am running into a very basic problem while working with Table API. I
>>> wish to create a TableEnvironment connected to a remote environment that
>>> uses Blink planner in batch mode. Examples and documentation I have come
>>> across so far recommend the following pattern to create such an environment
>>> -
>>>
>>> var settings = EnvironmentSettings.newInstance()
>>>                   .useBlinkPlanner()
>>>                   .inBatchMode()
>>>                   .build();
>>> var tEnv = TableEnvironment.create(settings);
>>>
>>> The above configuration, however, does not connect to a remote
>>> environment. Tracing code in TableEnvironment.java, I see the following
>>> method in BlinkExecutorFactory.java that appears to relevant -
>>>
>>> Executor create(Map<String, String>, StreamExecutionEnvironment);
>>>
>>> However, it seems to be only accessible through the Scala bridge. I
>>> can't seem to find a way to instantiate a TableEnvironment that takes
>>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>>
>>> Regards,
>>> Satyam
>>>
>>

Re: Table Environment for Remote Execution

Posted by Satyam Shekhar <sa...@gmail.com>.
Thanks for the reply, Godfrey.

I would also love to learn the reasoning behind that limitation.

For more context, I am building a Java application that receives some user
input via a GRPC service. The user's input is translated to some SQL that
may be executed in streaming or batch mode based on custom business logic
and submitted it to Flink for execution. In my current setup, I create an
ExecutionEnvironment, register sources, and execute the corresponding SQL.
I was able to achieve the desired behavior with StreamTableEnvironment but
it has limitations around supported SQL in batch mode.

While invoking the CLI from java program might be a solution, it doesn't
feel like the most natural solution for the problem. Are there other ways
to address this?

Regards,
Satyam

On Wed, Jun 3, 2020 at 12:50 AM godfrey he <go...@gmail.com> wrote:

> Hi Satyam,
>
> for blink batch mode, only TableEnvironment can be used,
> and TableEnvironment do not take StreamExecutionEnvironment as argument.
> Instead StreamExecutionEnvironment instance is created internally.
>
> back to your requirement, you can build your table program as user jar,
> and submit the job through flink cli [1] to remote environment.
>
> Bests,
> Godfrey
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>
>
>
> Satyam Shekhar <sa...@gmail.com> 于2020年6月3日周三 下午2:59写道:
>
>> Hello,
>>
>> I am running into a very basic problem while working with Table API. I
>> wish to create a TableEnvironment connected to a remote environment that
>> uses Blink planner in batch mode. Examples and documentation I have come
>> across so far recommend the following pattern to create such an environment
>> -
>>
>> var settings = EnvironmentSettings.newInstance()
>>                   .useBlinkPlanner()
>>                   .inBatchMode()
>>                   .build();
>> var tEnv = TableEnvironment.create(settings);
>>
>> The above configuration, however, does not connect to a remote
>> environment. Tracing code in TableEnvironment.java, I see the following
>> method in BlinkExecutorFactory.java that appears to relevant -
>>
>> Executor create(Map<String, String>, StreamExecutionEnvironment);
>>
>> However, it seems to be only accessible through the Scala bridge. I can't
>> seem to find a way to instantiate a TableEnvironment that takes
>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>
>> Regards,
>> Satyam
>>
>

Re: Table Environment for Remote Execution

Posted by godfrey he <go...@gmail.com>.
Hi Satyam,

for blink batch mode, only TableEnvironment can be used,
and TableEnvironment do not take StreamExecutionEnvironment as argument.
Instead StreamExecutionEnvironment instance is created internally.

back to your requirement, you can build your table program as user jar,
and submit the job through flink cli [1] to remote environment.

Bests,
Godfrey

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html



Satyam Shekhar <sa...@gmail.com> 于2020年6月3日周三 下午2:59写道:

> Hello,
>
> I am running into a very basic problem while working with Table API. I
> wish to create a TableEnvironment connected to a remote environment that
> uses Blink planner in batch mode. Examples and documentation I have come
> across so far recommend the following pattern to create such an environment
> -
>
> var settings = EnvironmentSettings.newInstance()
>                   .useBlinkPlanner()
>                   .inBatchMode()
>                   .build();
> var tEnv = TableEnvironment.create(settings);
>
> The above configuration, however, does not connect to a remote
> environment. Tracing code in TableEnvironment.java, I see the following
> method in BlinkExecutorFactory.java that appears to relevant -
>
> Executor create(Map<String, String>, StreamExecutionEnvironment);
>
> However, it seems to be only accessible through the Scala bridge. I can't
> seem to find a way to instantiate a TableEnvironment that takes
> StreamExecutionEnvironment as an argument. How do I achieve that?
>
> Regards,
> Satyam
>