You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by whh_960101 <wh...@163.com> on 2020/09/09 03:31:58 UTC

pyflink execute_insert问题求解答

您好,我使用pyflink时的代码如下,有如下两个问题:
1.
source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
main_table = source.select(".......")
sub_table = source.select(".......")
main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()


最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?


2.
for i in range(1,20):
     sub_table = source.select("...%s...%d...." %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
     sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result() #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案


以上两个问题希望您们能够给予解答!感谢!






Re: pyflink execute_insert问题求解答

Posted by Dian Fu <di...@gmail.com>.
针对问题1: 你的需求是这样的吗:先获取表中字段'data'的值(第一行的值),根据'data'的值,再构造不同的作业逻辑?

针对问题2:现在join不支持两个表的字段名重复,可以看一下JIRA [1],所以目前必须保证两个表的字段名不重复。

[1] https://issues.apache.org/jira/browse/FLINK-18679 <https://issues.apache.org/jira/browse/FLINK-18679>
> 在 2020年9月9日,下午4:27,whh_960101 <wh...@163.com> 写道:
> 
> 问题1:
> 我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
> 例如:
> if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
>   ......
> 
> 
> 
> 
> 问题2:
> full_outer_join(right, join_predicate)[source]¶
> 
> Joins two Table. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.
> 
> Note
> 
> 
> 
> Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).
> 
> Example:
> 
>>>> left.full_outer_join(right,"a = b").select("a, b, d")
> Parameters
> 
> right (pyflink.table.Table) – Right table.
> 
> join_predicate (str) – The join predicate expression string.
> 
> Returns
> 
> The result table.
> 
> Return type
> 
> pyflink.table.Table
> 
> The fields of the two joined operations must not overlap是什么意思,sql中的full_outer_join例如:
> SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo
> FROM Persons
> FULL JOIN Orders
> ON Persons.Id_P=Orders.Id_P
> 
> #on中的两个表的字段是可以重复的,The fields of the two joined operations must not overlap意思是做匹配的两个字段名不能重复吗
> 
> 在 2020-09-09 15:54:35,"nicholasjiang" <pr...@163.com> 写道:
>> 1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>> 针对Multiple Sink的话推荐通过Statement Set方式:
>> statement_set = TableEnvironment.create_statement_set()
>> main_table = source.select(".......")
>> sub_table = source.select(".......")
>> statement_set.add_insert("main_table", main_table)
>> statement_set.add_insert("sub_table", sub_table)
>> 
>> 2.for i in range(1,20):
>>    sub_table = source.select("...%s...%d...."
>> %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>> 
>> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>> #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>> 按照上述方式进行Multiple Sink是可以插入多个表。
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: pyflink execute_insert问题求解答

Posted by whh_960101 <wh...@163.com>.
问题1:
我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
例如:
if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
   ......




问题2:
full_outer_join(right, join_predicate)[source]¶

Joins two Table. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.

Note

 

Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).

Example:

>>> left.full_outer_join(right,"a = b").select("a, b, d")
Parameters

right (pyflink.table.Table) – Right table.

join_predicate (str) – The join predicate expression string.

Returns

The result table.

Return type

pyflink.table.Table

The fields of the two joined operations must not overlap是什么意思,sql中的full_outer_join例如:
SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo
FROM Persons
FULL JOIN Orders
ON Persons.Id_P=Orders.Id_P

 #on中的两个表的字段是可以重复的,The fields of the two joined operations must not overlap意思是做匹配的两个字段名不能重复吗

在 2020-09-09 15:54:35,"nicholasjiang" <pr...@163.com> 写道:
>1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>针对Multiple Sink的话推荐通过Statement Set方式:
>statement_set = TableEnvironment.create_statement_set()
>main_table = source.select(".......")
>sub_table = source.select(".......")
>statement_set.add_insert("main_table", main_table)
>statement_set.add_insert("sub_table", sub_table)
>
>2.for i in range(1,20):
>     sub_table = source.select("...%s...%d...."
>%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>    
>sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>按照上述方式进行Multiple Sink是可以插入多个表。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink execute_insert问题求解答

Posted by nicholasjiang <pr...@163.com>.
1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
针对Multiple Sink的话推荐通过Statement Set方式:
statement_set = TableEnvironment.create_statement_set()
main_table = source.select(".......")
sub_table = source.select(".......")
statement_set.add_insert("main_table", main_table)
statement_set.add_insert("sub_table", sub_table)

2.for i in range(1,20):
     sub_table = source.select("...%s...%d...."
%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
    
sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
按照上述方式进行Multiple Sink是可以插入多个表。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink execute_insert问题求解答

Posted by Dian Fu <di...@gmail.com>.
这两个看起来是同一个问题,1.11是支持的,可以看一下TableEnvironment.create_statement_set(): https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/table_environment.html#executeexplain-jobs

> 在 2020年9月9日,上午11:31,whh_960101 <wh...@163.com> 写道:
> 
> 您好,我使用pyflink时的代码如下,有如下两个问题:
> 1.
> source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
> main_table = source.select(".......")
> sub_table = source.select(".......")
> main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
> 
> 
> 最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
> 
> 
> 2.
> for i in range(1,20):
>     sub_table = source.select("...%s...%d...." %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>     sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result() #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
> 
> 
> 以上两个问题希望您们能够给予解答!感谢!
> 
> 
> 
> 
>