You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wh...@163.com on 2020/09/03 13:44:42 UTC

回复:pyflink-udf 问题反馈

我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] 

在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
或者正确写法是什么样的,感谢解答!


| |
whh_960101
|
|
邮箱:whh_960101@163.com
|

签名由 网易邮箱大师 定制

在2020年09月03日 21:14,Xingbo Huang 写道:
Hi,
input_types定义的是每一个列的具体类型。
比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
正确的写法是

   input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]

针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
   input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
DataTypes.STRING())])

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:

> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> input_type:input_type should be DataType but contain RowField(RECID,
> VARCHAR)
> 我的pyflink版本:1.11.1

Re: pyflink execute_insert问题求解答

Posted by Dian Fu <di...@gmail.com>.
针对问题1: 你的需求是这样的吗:先获取表中字段'data'的值(第一行的值),根据'data'的值,再构造不同的作业逻辑?

针对问题2:现在join不支持两个表的字段名重复,可以看一下JIRA [1],所以目前必须保证两个表的字段名不重复。

[1] https://issues.apache.org/jira/browse/FLINK-18679 <https://issues.apache.org/jira/browse/FLINK-18679>
> 在 2020年9月9日,下午4:27,whh_960101 <wh...@163.com> 写道:
> 
> 问题1:
> 我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
> 例如:
> if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
>   ......
> 
> 
> 
> 
> 问题2:
> full_outer_join(right, join_predicate)[source]¶
> 
> Joins two Table. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.
> 
> Note
> 
> 
> 
> Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).
> 
> Example:
> 
>>>> left.full_outer_join(right,"a = b").select("a, b, d")
> Parameters
> 
> right (pyflink.table.Table) – Right table.
> 
> join_predicate (str) – The join predicate expression string.
> 
> Returns
> 
> The result table.
> 
> Return type
> 
> pyflink.table.Table
> 
> The fields of the two joined operations must not overlap是什么意思,sql中的full_outer_join例如:
> SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo
> FROM Persons
> FULL JOIN Orders
> ON Persons.Id_P=Orders.Id_P
> 
> #on中的两个表的字段是可以重复的,The fields of the two joined operations must not overlap意思是做匹配的两个字段名不能重复吗
> 
> 在 2020-09-09 15:54:35,"nicholasjiang" <pr...@163.com> 写道:
>> 1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>> 针对Multiple Sink的话推荐通过Statement Set方式:
>> statement_set = TableEnvironment.create_statement_set()
>> main_table = source.select(".......")
>> sub_table = source.select(".......")
>> statement_set.add_insert("main_table", main_table)
>> statement_set.add_insert("sub_table", sub_table)
>> 
>> 2.for i in range(1,20):
>>    sub_table = source.select("...%s...%d...."
>> %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>> 
>> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>> #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>> 按照上述方式进行Multiple Sink是可以插入多个表。
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: pyflink execute_insert问题求解答

Posted by whh_960101 <wh...@163.com>.
问题1:
我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
例如:
if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
   ......




问题2:
full_outer_join(right, join_predicate)[source]¶

Joins two Table. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.

Note

 

Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).

Example:

>>> left.full_outer_join(right,"a = b").select("a, b, d")
Parameters

right (pyflink.table.Table) – Right table.

join_predicate (str) – The join predicate expression string.

Returns

The result table.

Return type

pyflink.table.Table

The fields of the two joined operations must not overlap是什么意思,sql中的full_outer_join例如:
SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo
FROM Persons
FULL JOIN Orders
ON Persons.Id_P=Orders.Id_P

 #on中的两个表的字段是可以重复的,The fields of the two joined operations must not overlap意思是做匹配的两个字段名不能重复吗

在 2020-09-09 15:54:35,"nicholasjiang" <pr...@163.com> 写道:
>1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>针对Multiple Sink的话推荐通过Statement Set方式:
>statement_set = TableEnvironment.create_statement_set()
>main_table = source.select(".......")
>sub_table = source.select(".......")
>statement_set.add_insert("main_table", main_table)
>statement_set.add_insert("sub_table", sub_table)
>
>2.for i in range(1,20):
>     sub_table = source.select("...%s...%d...."
>%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>    
>sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>按照上述方式进行Multiple Sink是可以插入多个表。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink execute_insert问题求解答

Posted by nicholasjiang <pr...@163.com>.
1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
针对Multiple Sink的话推荐通过Statement Set方式:
statement_set = TableEnvironment.create_statement_set()
main_table = source.select(".......")
sub_table = source.select(".......")
statement_set.add_insert("main_table", main_table)
statement_set.add_insert("sub_table", sub_table)

2.for i in range(1,20):
     sub_table = source.select("...%s...%d...."
%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
    
sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
按照上述方式进行Multiple Sink是可以插入多个表。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

Posted by whh_960101 <wh...@163.com>.
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表
然后使用我的udf
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印出来的结果能够很好的筛选出我想要的数据




但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
 source  = st_env.from_path('source').where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()

这个where筛选就失效了,最后打印出全部数据


想请问一下这种问题出在哪里?






在 2020-10-15 16:57:39,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101 <wh...@163.com> 于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?

















在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>

Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

从源码编译安装把。可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年10月22日周四 下午6:47写道:

> 现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = 'http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>

Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

Pyflink 1.11还不支持datastream,1.12才有

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年10月27日周二 下午2:58写道:

> 有没有其他方式可以写入username和password,我了解java
> flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = 'http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
有没有其他方式可以写入username和password,我了解java flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!







在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>





 

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>

Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by Yangze Guo <ka...@gmail.com>.
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>
> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>   user_id STRING,
>   user_name STRING
>   uv BIGINT,
>   pv BIGINT,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://localhost:9200',
>   'index' = 'users'
> );Connector Options
> | Option | Required | Default | Type | Description |
> |
> connector
> | required | (none) | String | Specify what connector to use, valid values are:
> elasticsearch-6: connect to Elasticsearch 6.x cluster
> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> |
> |
> hosts
> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> |
> index
> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
> |
> document-type
> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
> |
> document-id.key-delimiter
> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> |
> failure-handler
> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
> fail: throws an exception if a request fails and thus causes a job failure.
> ignore: ignores failures and drops the request.
> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
> |
> |
> sink.flush-on-checkpoint
> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
> |
> sink.bulk-flush.max-actions
> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.max-size
> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.interval
> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
> |
> sink.bulk-flush.backoff.strategy
> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
> DISABLED: no retry performed, i.e. fail after the first request error.
> CONSTANT: wait for backoff delay between retries.
> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
> |
> |
> sink.bulk-flush.backoff.max-retries
> | optional | 8 | Integer | Maximum number of backoff retries. |
> |
> sink.bulk-flush.backoff.delay
> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
> |
> connection.max-retry-timeout
> | optional | (none) | Duration | Maximum timeout between retries. |
> |
> connection.path-prefix
> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
> |
> format
> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>
>
>
>
>
>
>

pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
  user_id STRING,
  user_name STRING
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
);Connector Options
| Option | Required | Default | Type | Description |
|
connector
| required | (none) | String | Specify what connector to use, valid values are:
elasticsearch-6: connect to Elasticsearch 6.x cluster
elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
|
|
hosts
| required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
|
index
| required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
|
document-type
| required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
|
document-id.key-delimiter
| optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
|
failure-handler
| optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
fail: throws an exception if a request fails and thus causes a job failure.
ignore: ignores failures and drops the request.
retry_rejected: re-adds requests that have failed due to queue capacity saturation.
custom class name: for failure handling with a ActionRequestFailureHandler subclass.
|
|
sink.flush-on-checkpoint
| optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
|
sink.bulk-flush.max-actions
| optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
|
sink.bulk-flush.max-size
| optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
|
sink.bulk-flush.interval
| optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
|
sink.bulk-flush.backoff.strategy
| optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
DISABLED: no retry performed, i.e. fail after the first request error.
CONSTANT: wait for backoff delay between retries.
EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
|
|
sink.bulk-flush.backoff.max-retries
| optional | 8 | Integer | Maximum number of backoff retries. |
|
sink.bulk-flush.backoff.delay
| optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
|
connection.max-retry-timeout
| optional | (none) | Duration | Maximum timeout between retries. |
|
connection.path-prefix
| optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
|
format
| optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |






 

Re: 提交pyflink作业到YARN集群报错

Posted by Dian Fu <di...@gmail.com>.
看一下目录site-packages/pyflink/opt,里面是否有名为flink-python的jar

> 在 2020年10月26日,下午4:38,whh_960101 <wh...@163.com> 写道:
> 
> Hi,各位大佬,     想请教一下,我使用flink run -m yarn-cluster -p 4 -py myjob.py,报错java.lang.RuntimeException: Found 0 flink-python jar.
> at org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies(PackagedProgram.java:263)
> at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:140)
> at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:68)
> at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:590)
> at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:758)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:250)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:968)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$12(CliFrontend.java:1042)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1042)
> 
> 没有发现flink-python jar,这个jar在哪里可以找见,site-packages/pyflink里面,还是自己构建,目前本机已经安装pyflink 1.11.1


提交pyflink作业到YARN集群报错

Posted by whh_960101 <wh...@163.com>.
Hi,各位大佬,     想请教一下,我使用flink run -m yarn-cluster -p 4 -py myjob.py,报错java.lang.RuntimeException: Found 0 flink-python jar.
 at org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies(PackagedProgram.java:263)
 at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:140)
 at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:68)
 at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:590)
 at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:758)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:250)
 at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:968)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$12(CliFrontend.java:1042)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
 at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1042)

没有发现flink-python jar,这个jar在哪里可以找见,site-packages/pyflink里面,还是自己构建,目前本机已经安装pyflink 1.11.1

Re: pyflink 1.11 运行pyflink作业时报错

Posted by Wei Zhong <we...@gmail.com>.
Hi 你好,

只看目前的报错看不出问题来,请问能贴出出错部分的job源码吗?

> 在 2020年11月17日,16:58,whh_960101 <wh...@163.com> 写道:
> 
> Hi,各位大佬,    pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in from_kafka_to_oracle_demo
>    main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result()
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", line 783, in execute_insert
>    return TableResult(self._j_table.executeInsert(table_path, overwrite))
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>    answer, self.gateway_client, self.target_id, self.name)
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 154, in deco
>    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: 'Failed to execute sql'
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
> at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed to execute sql 是什么原因
> 
> 
> 
> 
> 
> 
> 
> 
> 


pyflink 1.11 运行pyflink作业时报错

Posted by whh_960101 <wh...@163.com>.
Hi,各位大佬,    pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in from_kafka_to_oracle_demo
    main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result()
  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", line 783, in execute_insert
    return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'Failed to execute sql'
org.apache.flink.client.program.ProgramAbortException
 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
 at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
 at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed to execute sql 是什么原因



 





 

Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

Posted by Dian Fu <di...@gmail.com>.
PyFlink里有Row类型的类, pyflink.table.Row

> 在 2020年10月21日,上午9:05,whh_960101 <wh...@163.com> 写道:
> 
> Row类型的对象在python中是怎么表示的,字典?
> 
> 
> 
> 
> 
> 
> 在 2020-10-20 20:35:22,"Dian Fu" <di...@gmail.com> 写道:
>> 你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。
>> 
>>> 在 2020年10月20日,下午7:56,Dian Fu <di...@gmail.com> 写道:
>>> 
>>> 错误堆栈看着似乎不太完整,有更完整的堆栈吗?
>>> 
>>>> 在 2020年10月20日,下午7:38,whh_960101 <wh...@163.com> 写道:
>>>> 
>>>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
>>>>  def error_exist(message):
>>>>      if message is None:
>>>>          return False
>>>>      mes_dic = json.loads(message.strip())
>>>>      log = mes_dic.get('log').lower().strip()
>>>>      if 'error' in log:
>>>>          return True
>>>>      else:
>>>>          return False
>>>> 
>>>>  @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>>>>  def error_get(message):  
>>>>      if message is None:
>>>>          return ''
>>>>      mes_dic = json.loads(message.strip())
>>>>      log = mes_dic.get('log')
>>>>      return json.dumps({"content":log.strip()})
>>>> 
>>>>  @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
>>>>          DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>>>>          DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>>>>          DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>>>>          DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>>>>  def headers_get(message,container,clusterName):
>>>>      mes_dic = json.loads(message.strip())
>>>>      tz_utc = mes_dic.get('time')
>>>>      tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>>>>      from_zone = tz.gettz('UTC')
>>>>      dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>>>>      dt_utc = dt_utc.replace(tzinfo=from_zone)
>>>>      dt_ts = dt_utc.timestamp()
>>>> 
>>>>      map_df = pd.read_csv('cc_log_dev_map.csv')
>>>>      clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]
>>>> 
>>>>      return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>>>>                         'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
>>>> 
>>>> #st_env.execute_sql("""
>>>> #        CREATE TABLE source(
>>>> #           message STRING,
>>>> #           clusterName STRING,
>>>> #           kubernetes ROW<container ROW<name STRING>>
>>>> #        ) WITH(
>>>> #            'connector' = 'kafka',
>>>> #        )
>>>> #    """)
>>>> 
>>>> st_env.execute_sql("""
>>>>      CREATE TABLE sink(
>>>>          body ROW<content STRING>,
>>>>          headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
>>>>          `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
>>>>      ) WITH(
>>>>          'connector' = 'print',
>>>>      )
>>>>  """)tmp_table =  st_env.from_path("source") \
>>>>      .select("message,kubernetes.get('container').get('name') as container,clusterName")
>>>> 
>>>>  data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>>>>  table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
>>>> 
>>>>  sink_table = table \
>>>>      .where("error_exist(message) = true") \
>>>>      .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
>>>> 
>>>>  sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>>>>  sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
>>>>  return self._py_class(self._j_completable_future.get())
>>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>>>>  answer, self.gateway_client, self.target_id, self.name)
>>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
>>>>  return f(*a, **kw)
>>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
>>>>  format(target_id, ".", name), value)
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
>>>> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>>> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
>>>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>>> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>>>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 


Re:Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

Posted by whh_960101 <wh...@163.com>.
Row类型的对象在python中是怎么表示的,字典?






在 2020-10-20 20:35:22,"Dian Fu" <di...@gmail.com> 写道:
>你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。
>
>> 在 2020年10月20日,下午7:56,Dian Fu <di...@gmail.com> 写道:
>> 
>> 错误堆栈看着似乎不太完整,有更完整的堆栈吗?
>> 
>>> 在 2020年10月20日,下午7:38,whh_960101 <wh...@163.com> 写道:
>>> 
>>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
>>>   def error_exist(message):
>>>       if message is None:
>>>           return False
>>>       mes_dic = json.loads(message.strip())
>>>       log = mes_dic.get('log').lower().strip()
>>>       if 'error' in log:
>>>           return True
>>>       else:
>>>           return False
>>> 
>>>   @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>>>   def error_get(message):  
>>>       if message is None:
>>>           return ''
>>>       mes_dic = json.loads(message.strip())
>>>       log = mes_dic.get('log')
>>>       return json.dumps({"content":log.strip()})
>>> 
>>>   @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
>>>           DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>>>           DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>>>           DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>>>           DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>>>   def headers_get(message,container,clusterName):
>>>       mes_dic = json.loads(message.strip())
>>>       tz_utc = mes_dic.get('time')
>>>       tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>>>       from_zone = tz.gettz('UTC')
>>>       dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>>>       dt_utc = dt_utc.replace(tzinfo=from_zone)
>>>       dt_ts = dt_utc.timestamp()
>>> 
>>>       map_df = pd.read_csv('cc_log_dev_map.csv')
>>>       clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]
>>> 
>>>       return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>>>                          'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
>>> 
>>> #st_env.execute_sql("""
>>> #        CREATE TABLE source(
>>> #           message STRING,
>>> #           clusterName STRING,
>>> #           kubernetes ROW<container ROW<name STRING>>
>>> #        ) WITH(
>>> #            'connector' = 'kafka',
>>> #        )
>>> #    """)
>>> 
>>> st_env.execute_sql("""
>>>       CREATE TABLE sink(
>>>           body ROW<content STRING>,
>>>           headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
>>>           `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
>>>       ) WITH(
>>>           'connector' = 'print',
>>>       )
>>>   """)tmp_table =  st_env.from_path("source") \
>>>       .select("message,kubernetes.get('container').get('name') as container,clusterName")
>>> 
>>>   data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>>>   table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
>>> 
>>>   sink_table = table \
>>>       .where("error_exist(message) = true") \
>>>       .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
>>> 
>>>   sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>>>   sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
>>>   return self._py_class(self._j_completable_future.get())
>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>>>   answer, self.gateway_client, self.target_id, self.name)
>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
>>>   return f(*a, **kw)
>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
>>>   format(target_id, ".", name), value)
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
>>> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
>>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 

Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

Posted by Dian Fu <di...@gmail.com>.
你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。

> 在 2020年10月20日,下午7:56,Dian Fu <di...@gmail.com> 写道:
> 
> 错误堆栈看着似乎不太完整,有更完整的堆栈吗?
> 
>> 在 2020年10月20日,下午7:38,whh_960101 <wh...@163.com> 写道:
>> 
>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
>>   def error_exist(message):
>>       if message is None:
>>           return False
>>       mes_dic = json.loads(message.strip())
>>       log = mes_dic.get('log').lower().strip()
>>       if 'error' in log:
>>           return True
>>       else:
>>           return False
>> 
>>   @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>>   def error_get(message):  
>>       if message is None:
>>           return ''
>>       mes_dic = json.loads(message.strip())
>>       log = mes_dic.get('log')
>>       return json.dumps({"content":log.strip()})
>> 
>>   @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
>>           DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>>           DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>>           DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>>           DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>>   def headers_get(message,container,clusterName):
>>       mes_dic = json.loads(message.strip())
>>       tz_utc = mes_dic.get('time')
>>       tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>>       from_zone = tz.gettz('UTC')
>>       dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>>       dt_utc = dt_utc.replace(tzinfo=from_zone)
>>       dt_ts = dt_utc.timestamp()
>> 
>>       map_df = pd.read_csv('cc_log_dev_map.csv')
>>       clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]
>> 
>>       return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>>                          'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
>> 
>> #st_env.execute_sql("""
>> #        CREATE TABLE source(
>> #           message STRING,
>> #           clusterName STRING,
>> #           kubernetes ROW<container ROW<name STRING>>
>> #        ) WITH(
>> #            'connector' = 'kafka',
>> #        )
>> #    """)
>> 
>> st_env.execute_sql("""
>>       CREATE TABLE sink(
>>           body ROW<content STRING>,
>>           headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
>>           `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
>>       ) WITH(
>>           'connector' = 'print',
>>       )
>>   """)tmp_table =  st_env.from_path("source") \
>>       .select("message,kubernetes.get('container').get('name') as container,clusterName")
>> 
>>   data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>>   table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
>> 
>>   sink_table = table \
>>       .where("error_exist(message) = true") \
>>       .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
>> 
>>   sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>>   sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
>>   return self._py_class(self._j_completable_future.get())
>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>>   answer, self.gateway_client, self.target_id, self.name)
>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
>>   return f(*a, **kw)
>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
>>   format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
>> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 


Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

Posted by Dian Fu <di...@gmail.com>.
错误堆栈看着似乎不太完整,有更完整的堆栈吗?

> 在 2020年10月20日,下午7:38,whh_960101 <wh...@163.com> 写道:
> 
> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
>    def error_exist(message):
>        if message is None:
>            return False
>        mes_dic = json.loads(message.strip())
>        log = mes_dic.get('log').lower().strip()
>        if 'error' in log:
>            return True
>        else:
>            return False
> 
>    @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>    def error_get(message):  
>        if message is None:
>            return ''
>        mes_dic = json.loads(message.strip())
>        log = mes_dic.get('log')
>        return json.dumps({"content":log.strip()})
> 
>    @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
>            DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>            DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>            DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>            DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>    def headers_get(message,container,clusterName):
>        mes_dic = json.loads(message.strip())
>        tz_utc = mes_dic.get('time')
>        tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>        from_zone = tz.gettz('UTC')
>        dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>        dt_utc = dt_utc.replace(tzinfo=from_zone)
>        dt_ts = dt_utc.timestamp()
> 
>        map_df = pd.read_csv('cc_log_dev_map.csv')
>        clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]
> 
>        return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>                           'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
> 
> #st_env.execute_sql("""
> #        CREATE TABLE source(
> #           message STRING,
> #           clusterName STRING,
> #           kubernetes ROW<container ROW<name STRING>>
> #        ) WITH(
> #            'connector' = 'kafka',
> #        )
> #    """)
> 
> st_env.execute_sql("""
>        CREATE TABLE sink(
>            body ROW<content STRING>,
>            headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
>            `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
>        ) WITH(
>            'connector' = 'print',
>        )
>    """)tmp_table =  st_env.from_path("source") \
>        .select("message,kubernetes.get('container').get('name') as container,clusterName")
> 
>    data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>    table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
> 
>    sink_table = table \
>        .where("error_exist(message) = true") \
>        .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
> 
>    sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>    sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
>    return self._py_class(self._j_completable_future.get())
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>    answer, self.gateway_client, self.target_id, self.name)
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
>    return f(*a, **kw)
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
>    format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


pyflink1.11.0 报错JobExecutionException: Job execution failed.

Posted by whh_960101 <wh...@163.com>.
Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
    def error_exist(message):
        if message is None:
            return False
        mes_dic = json.loads(message.strip())
        log = mes_dic.get('log').lower().strip()
        if 'error' in log:
            return True
        else:
            return False

    @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
    def error_get(message):  
        if message is None:
            return ''
        mes_dic = json.loads(message.strip())
        log = mes_dic.get('log')
        return json.dumps({"content":log.strip()})

    @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
            DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
            DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
            DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
            DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
    def headers_get(message,container,clusterName):
        mes_dic = json.loads(message.strip())
        tz_utc = mes_dic.get('time')
        tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
        from_zone = tz.gettz('UTC')
        dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
        dt_utc = dt_utc.replace(tzinfo=from_zone)
        dt_ts = dt_utc.timestamp()

        map_df = pd.read_csv('cc_log_dev_map.csv')
        clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]

        return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
                           'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})

#st_env.execute_sql("""
#        CREATE TABLE source(
#           message STRING,
#           clusterName STRING,
#           kubernetes ROW<container ROW<name STRING>>
#        ) WITH(
#            'connector' = 'kafka',
#        )
#    """)

st_env.execute_sql("""
        CREATE TABLE sink(
            body ROW<content STRING>,
            headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
            `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
        ) WITH(
            'connector' = 'print',
        )
    """)tmp_table =  st_env.from_path("source") \
        .select("message,kubernetes.get('container').get('name') as container,clusterName")
 
    data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
    table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)

    sink_table = table \
        .where("error_exist(message) = true") \
        .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
       
    sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
    sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
    return self._py_class(self._j_completable_future.get())
  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
 at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
 at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
 at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
 at akka.dispatch.OnComplete.internal(Future.scala:264)
 at akka.dispatch.OnComplete.internal(Future.scala:261)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
 at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
 at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: pyflink sql中select,where都带udf,其中一个udf失效

Posted by Dian Fu <di...@gmail.com>.
这个问题是一个bug, 我创建了一个JIRA:https://issues.apache.org/jira/browse/FLINK-19675 <https://issues.apache.org/jira/browse/FLINK-19675>

出现的条件:在一个Calc里同时有Python UDF、Where条件、复合列访问。

在没有修复之前, 可以这样work around一下:

tmp_table = st_env.from_path("source")\
    .select("kubernetes.get('container').get('name') as name, message, clusterName, '@timestamp' "
            "as ts")

data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table, tmp_table._j_table.getSchema().toRowType())
table = Table(st_env._j_tenv.fromDataStream(data_stream, "name, message, clusterName, ts"), st_env)

table\
    .where("error_exist(message) = true")\
    .select("log_get(message),'','','ERROR','Asia/Shanghai', 'ts',name,clusterName") \
    .execute_insert("sink").get_job_client().get_job_execution_result().result()


> 在 2020年10月16日,上午10:47,whh_960101 <wh...@163.com> 写道:
> 
> 我摘取了plan其中一部分
> 在过滤数据这里
> == Abstract Syntax Tree ==
> 
> +- LogicalFilter(condition=[error_exist($1)])
> 
> 
> 
> 
> 
> 
> == Optimized Logical Plan ==
> 
>      +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0]) 
> #感觉应该是这个地方出问题了,这里应该不是select,应该是where或者filter,上面已经有了LogicalFilter(condition=[error_exist($1)])
> 
> 
> 
> 
> == Physical Execution Plan ==
> 
> 
> 
> 
>  Stage 3 : Operator
> 
>   content : StreamExecPythonCalc
> 
>   ship_strategy : FORWARD
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-10-15 20:59:12,"Dian Fu" <di...@gmail.com> 写道:
>> 可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>> 
>>> 在 2020年10月15日,下午7:02,whh_960101 <wh...@163.com> 写道:
>>> 
>>> hi,
>>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>>> 然后使用我的udf
>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 打印出来的结果能够很好的筛选出我想要的数据
>>> 
>>> 
>>> 
>>> 
>>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
>>> source  = st_env.from_path('source')
>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 
>>> 这个where筛选就失效了,最后打印出全部数据
>>> 
>>> 
>>> 而只在where中使用udf,即
>>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 打印结果就是经过筛选后的
>>> 
>>> 
>>> 
>>> 
>>> 想请问一下这种问题出在哪里?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-10-15 16:57:39,"Xingbo Huang" <hx...@gmail.com> 写道:
>>>> Hi,
>>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>>> 
>>>> 
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>>> 
>>>> Best,
>>>> Xingbo
>>>> 
>>>> whh_960101 <wh...@163.com> 于2020年10月15日周四 下午2:30写道:
>>>> 
>>>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>>>> 
>>>>> 
>>>>> source  = st_env.from_path('source')
>>>>> #st_env是StreamTableEnvironment,source是kafka源端
>>>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>>>> DataTypes.BOOLEAN())
>>>>> table =
>>>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>> 
>>>>> 
>>>>> 这样打印出来的结果很好的筛选了数据
>>>>> 
>>>>> 
>>>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>>>> DataTypes.STRING())(将msg简单处理为新的String)
>>>>> table =
>>>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>> 这个where筛选就失效了,最后打印出全部数据
>>>>> 
>>>>> 
>>>>> 如果改成where在前也不行,换成filter也不行
>>>>> table =
>>>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>> 
>>>>> 
>>>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>>>> 希望您们能够给予解答!感谢!
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>>> 
>>> 
>>> 


Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

Posted by whh_960101 <wh...@163.com>.
我摘取了plan其中一部分
在过滤数据这里
== Abstract Syntax Tree ==

+- LogicalFilter(condition=[error_exist($1)])

 




== Optimized Logical Plan ==

      +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0]) 
#感觉应该是这个地方出问题了,这里应该不是select,应该是where或者filter,上面已经有了LogicalFilter(condition=[error_exist($1)])




== Physical Execution Plan ==




  Stage 3 : Operator

   content : StreamExecPythonCalc

   ship_strategy : FORWARD

















在 2020-10-15 20:59:12,"Dian Fu" <di...@gmail.com> 写道:
>可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>
>> 在 2020年10月15日,下午7:02,whh_960101 <wh...@163.com> 写道:
>> 
>> hi,
>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>> 然后使用我的udf
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印出来的结果能够很好的筛选出我想要的数据
>> 
>> 
>> 
>> 
>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
>> source  = st_env.from_path('source')
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 
>> 这个where筛选就失效了,最后打印出全部数据
>> 
>> 
>> 而只在where中使用udf,即
>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印结果就是经过筛选后的
>> 
>> 
>> 
>> 
>> 想请问一下这种问题出在哪里?
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-10-15 16:57:39,"Xingbo Huang" <hx...@gmail.com> 写道:
>>> Hi,
>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>> 
>>> 
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>> 
>>> Best,
>>> Xingbo
>>> 
>>> whh_960101 <wh...@163.com> 于2020年10月15日周四 下午2:30写道:
>>> 
>>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>>> 
>>>> 
>>>> source  = st_env.from_path('source')
>>>> #st_env是StreamTableEnvironment,source是kafka源端
>>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>>> DataTypes.BOOLEAN())
>>>> table =
>>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>> 
>>>> 
>>>> 这样打印出来的结果很好的筛选了数据
>>>> 
>>>> 
>>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>>> DataTypes.STRING())(将msg简单处理为新的String)
>>>> table =
>>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>> 这个where筛选就失效了,最后打印出全部数据
>>>> 
>>>> 
>>>> 如果改成where在前也不行,换成filter也不行
>>>> table =
>>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>> 
>>>> 
>>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>>> 希望您们能够给予解答!感谢!
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 
>> 
>> 
>> 

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

Posted by whh_960101 <wh...@163.com>.
== Abstract Syntax Tree ==

LogicalProject(_c0=[log_get($1)], _c1=[_UTF-16LE''], _c2=[_UTF-16LE''], _c3=[_UTF-16LE'ERROR'], _c4=[_UTF-16LE'Asia/Shanghai'], _c5=[_UTF-16LE'@timestamp'], kubernetes$container$name=[$3.container.name], clusterName=[$2])

+- LogicalFilter(condition=[error_exist($1)])

   +- LogicalTableScan(table=[[default_catalog, default_database, source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]]])




== Optimized Logical Plan ==

Calc(select=[f00 AS _c0, _UTF-16LE'' AS _c1, _UTF-16LE'' AS _c2, _UTF-16LE'ERROR' AS _c3, _UTF-16LE'Asia/Shanghai' AS _c4, _UTF-16LE'@timestamp' AS _c5, f0 AS kubernetes$container$name, clusterName])

+- PythonCalc(select=[f0, clusterName, log_get(message) AS f00])

   +- Calc(select=[message, clusterName, kubernetes.container.name AS f0])

      +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0])

         +- LegacyTableSourceScan(table=[[default_catalog, default_database, source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]]], fields=[@timestamp, message, clusterName, kubernetes])




== Physical Execution Plan ==

Stage 1 : Data Source

 content : Source: KafkaTableSource(@timestamp, message, clusterName, kubernetes)




 Stage 2 : Operator

  content : SourceConversion(table=[default_catalog.default_database.source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]], fields=[@timestamp, message, clusterName, kubernetes])

  ship_strategy : FORWARD




  Stage 3 : Operator

   content : StreamExecPythonCalc

   ship_strategy : FORWARD




   Stage 4 : Operator

    content : Calc(select=[message, clusterName, kubernetes.container.name AS f0])

    ship_strategy : FORWARD




    Stage 5 : Operator

     content : StreamExecPythonCalc

     ship_strategy : FORWARD




     Stage 6 : Operator

      content : Calc(select=[f00 AS _c0, _UTF-16LE'' AS _c1, _UTF-16LE'' AS _c2, _UTF-16LE'ERROR' AS _c3, _UTF-16LE'Asia/Shanghai' AS _c4, _UTF-16LE'@timestamp' AS _c5, f0 AS kubernetes$container$name, clusterName])

      ship_strategy : FORWARD

















在 2020-10-15 20:59:12,"Dian Fu" <di...@gmail.com> 写道:
>可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>
>> 在 2020年10月15日,下午7:02,whh_960101 <wh...@163.com> 写道:
>> 
>> hi,
>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>> 然后使用我的udf
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印出来的结果能够很好的筛选出我想要的数据
>> 
>> 
>> 
>> 
>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
>> source  = st_env.from_path('source')
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 
>> 这个where筛选就失效了,最后打印出全部数据
>> 
>> 
>> 而只在where中使用udf,即
>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印结果就是经过筛选后的
>> 
>> 
>> 
>> 
>> 想请问一下这种问题出在哪里?
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-10-15 16:57:39,"Xingbo Huang" <hx...@gmail.com> 写道:
>>> Hi,
>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>> 
>>> 
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>> 
>>> Best,
>>> Xingbo
>>> 
>>> whh_960101 <wh...@163.com> 于2020年10月15日周四 下午2:30写道:
>>> 
>>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>>> 
>>>> 
>>>> source  = st_env.from_path('source')
>>>> #st_env是StreamTableEnvironment,source是kafka源端
>>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>>> DataTypes.BOOLEAN())
>>>> table =
>>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>> 
>>>> 
>>>> 这样打印出来的结果很好的筛选了数据
>>>> 
>>>> 
>>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>>> DataTypes.STRING())(将msg简单处理为新的String)
>>>> table =
>>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>> 这个where筛选就失效了,最后打印出全部数据
>>>> 
>>>> 
>>>> 如果改成where在前也不行,换成filter也不行
>>>> table =
>>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>> 
>>>> 
>>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>>> 希望您们能够给予解答!感谢!
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 
>> 
>> 
>> 

pyflink1.11.0 kafka connector如果有访问权限

Posted by whh_960101 <wh...@163.com>.
CREATETABLEkafkaTable(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3))WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','format'='csv','scan.startup.mode'='earliest-offset')你好,如果使用sql语句来创建kafkaTable,kafka节点有访问权限,option里面没有设置用户名密码这一项该如何解决?

Re: pyflink 1.11 运行pyflink作业时报错

Posted by Dian Fu <di...@gmail.com>.
看起来是的,找不到JAVA_HOME,显式export一下JAVA_HOME试试?

> 在 2020年11月13日,下午5:06,whh_960101 <wh...@163.com> 写道:
> 
> Hi,各位大佬,    pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2]  No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决
> 
> 
> 


pyflink 1.11 运行pyflink作业时报错

Posted by whh_960101 <wh...@163.com>.
Hi,各位大佬,    pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2]  No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决



 

Re: pyflink和flink版本的兼容性问题

Posted by zhisheng <zh...@gmail.com>.
估计可能会有问题,很多变动

whh_960101 <wh...@163.com> 于2020年10月23日周五 上午11:41写道:

> Hi,各位大佬,
>  想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink
> 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12
> 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗

pyflink和flink版本的兼容性问题

Posted by whh_960101 <wh...@163.com>.
Hi,各位大佬,     想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗

Re: pyflink sql中select,where都带udf,其中一个udf失效

Posted by Dian Fu <di...@gmail.com>.
可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables

> 在 2020年10月15日,下午7:02,whh_960101 <wh...@163.com> 写道:
> 
> hi,
> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
> 然后使用我的udf
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
> 打印出来的结果能够很好的筛选出我想要的数据
> 
> 
> 
> 
> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
> source  = st_env.from_path('source')
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
> 
> 这个where筛选就失效了,最后打印出全部数据
> 
> 
> 而只在where中使用udf,即
> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
> 打印结果就是经过筛选后的
> 
> 
> 
> 
> 想请问一下这种问题出在哪里?
> 
> 
> 
> 
> 
> 
> 在 2020-10-15 16:57:39,"Xingbo Huang" <hx...@gmail.com> 写道:
>> Hi,
>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>> 
>> 
>> [1]
>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>> 
>> Best,
>> Xingbo
>> 
>> whh_960101 <wh...@163.com> 于2020年10月15日周四 下午2:30写道:
>> 
>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>> 
>>> 
>>> source  = st_env.from_path('source')
>>> #st_env是StreamTableEnvironment,source是kafka源端
>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>> DataTypes.BOOLEAN())
>>> table =
>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 
>>> 
>>> 这样打印出来的结果很好的筛选了数据
>>> 
>>> 
>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>> DataTypes.STRING())(将msg简单处理为新的String)
>>> table =
>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 这个where筛选就失效了,最后打印出全部数据
>>> 
>>> 
>>> 如果改成where在前也不行,换成filter也不行
>>> table =
>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 
>>> 
>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>> 希望您们能够给予解答!感谢!
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
> 
> 
> 
> 
> 


Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

Posted by whh_960101 <wh...@163.com>.
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表
然后使用我的udf
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印出来的结果能够很好的筛选出我想要的数据




但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
 source  = st_env.from_path('source')
 source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()

这个where筛选就失效了,最后打印出全部数据


而只在where中使用udf,即
source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印结果就是经过筛选后的




想请问一下这种问题出在哪里?






在 2020-10-15 16:57:39,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101 <wh...@163.com> 于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>





 

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

Posted by whh_960101 <wh...@163.com>.
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
def udf1(msg): #udf1就是简单的筛选log中的error关键字
      if msg is None:
             return ''
      msg_dic = json.loads(msg.strip())
      log = msg_dic.get('log').lower()
      if 'error' in log or 'fail' in log:
             return True
      else:
             return False
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.STRING())
def udf2(msg): #udf2就是简单的把msg中的log提取出来
if msg is None:
             return ''
      msg_dic = json.loads(msg.strip())
      log = msg_dic.get('log')
      return log
感觉两个udf没有冲突吧?








在 2020-10-15 16:57:39,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101 <wh...@163.com> 于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

Re: pyflink sql中select,where都带udf,其中一个udf失效

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的


[1]
https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年10月15日周四 下午2:30写道:

> 您好,我使用pyflink时的代码如下,有如下问题:
>
>
> source  = st_env.from_path('source')
> #st_env是StreamTableEnvironment,source是kafka源端
> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
> DataTypes.BOOLEAN())
> table =
> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>
>
> 这样打印出来的结果很好的筛选了数据
>
>
> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
> DataTypes.STRING())(将msg简单处理为新的String)
> table =
> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
> 这个where筛选就失效了,最后打印出全部数据
>
>
> 如果改成where在前也不行,换成filter也不行
> table =
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>
>
> select、where中的udf会冲突吗?这个问题该怎么解决?
> 希望您们能够给予解答!感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

pyflink sql中select,where都带udf,其中一个udf失效

Posted by whh_960101 <wh...@163.com>.
您好,我使用pyflink时的代码如下,有如下问题:


source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
#只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
table = source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()


这样打印出来的结果很好的筛选了数据


但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = DataTypes.STRING())(将msg简单处理为新的String)
table = source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
这个where筛选就失效了,最后打印出全部数据


如果改成where在前也不行,换成filter也不行
table = source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()


select、where中的udf会冲突吗?这个问题该怎么解决?
希望您们能够给予解答!感谢!










 





 





 

pyflink sql中select,where都带udf,其中一个udf失效

Posted by whh_960101 <wh...@163.com>.
您好,我使用pyflink时的代码如下,有如下问题:


source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
#只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
table = source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()


这样打印出来的结果很好的筛选了数据


但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = DataTypes.STRING())
table = source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
这个where筛选就失效了,最后打印出全部数据


如果改成where在前也不行,换成filter也不行
table = source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()


这个问题怎么解决
希望您们能够给予解答!感谢!










 





 

pyflink sql select带特殊符号的字段名

Posted by whh_960101 <wh...@163.com>.
您好,我使用pyflink时的代码如下,有如下问题:
source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
table = source.select("@timestamp").execute_insert('sink').get_job_client().get_job_execution_result().result()


kafka源端的json队列@timestamp字段名是固定死的,而我需要取这个字段进行处理,@timestamp涉及到@特殊符号和timestamp关键字,按照上面的代码会报解析sql错误,这个地方我该怎么修改,去网上查了加``或者''或者""都不行


希望您们能够给予解答!感谢!










 

Re: pyflink Table object如何打印出其中内容方便调试

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
你想要输出table的结果,可以有两种方便的方式,
1. table.to_pandas()
2. 使用print connector,可以参考[1]

然后你如果对pyflink感兴趣,可以看看这个doc[2],可以帮助你快速上手

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年10月15日周四 下午4:39写道:

> 您好,我使用pyflink时的代码如下,有如下问题:
>
>
> source  = st_env.from_path('source')
> #st_env是StreamTableEnvironment,source是kafka源端
> #udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
> table = source.select("msg").where(udf1(msg)=True)
>
>
> 这样单步调试print(table)出来的结果是<pyflink.table.table.Table object at
> 0x7f888fb2cef0>
> pyflink有没有将Table转化成可打印格式的方法
> 希望您们能够给予解答!感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

pyflink Table object如何打印出其中内容方便调试

Posted by whh_960101 <wh...@163.com>.
您好,我使用pyflink时的代码如下,有如下问题:


source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
#udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
table = source.select("msg").where(udf1(msg)=True)


这样单步调试print(table)出来的结果是<pyflink.table.table.Table object at 0x7f888fb2cef0>
pyflink有没有将Table转化成可打印格式的方法
希望您们能够给予解答!感谢!










 





 





 





 

Re: pyflink execute_insert问题求解答

Posted by Dian Fu <di...@gmail.com>.
这两个看起来是同一个问题,1.11是支持的,可以看一下TableEnvironment.create_statement_set(): https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/table_environment.html#executeexplain-jobs

> 在 2020年9月9日,上午11:31,whh_960101 <wh...@163.com> 写道:
> 
> 您好,我使用pyflink时的代码如下,有如下两个问题:
> 1.
> source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
> main_table = source.select(".......")
> sub_table = source.select(".......")
> main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
> 
> 
> 最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
> 
> 
> 2.
> for i in range(1,20):
>     sub_table = source.select("...%s...%d...." %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>     sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result() #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
> 
> 
> 以上两个问题希望您们能够给予解答!感谢!
> 
> 
> 
> 
> 


pyflink execute_insert问题求解答

Posted by whh_960101 <wh...@163.com>.
您好,我使用pyflink时的代码如下,有如下两个问题:
1.
source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
main_table = source.select(".......")
sub_table = source.select(".......")
main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()


最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?


2.
for i in range(1,20):
     sub_table = source.select("...%s...%d...." %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
     sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result() #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案


以上两个问题希望您们能够给予解答!感谢!






Re: Re: Re: Re: Re: Re: pyflink-udf 问题反馈

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

你这个图挂了。json, csv这种是format[1] 。filesystem,datagen, print,
kafka等这种都是connector[2] ,用来从外部一个数据源读入数据或者写出数据。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年9月7日周一 下午5:14写道:

> 您好,
>
> 图中像datagen和print这样的connector更像是表名,之前听说的只有json、csv、filesystem这种类型的connector,请问connector在使用连接器DDL创建表时的作用是什么
>
>
>
>
>
>
>
> 在 2020-09-07 11:33:06,"Xingbo Huang" <hx...@gmail.com> 写道:
> >Hi,
> >你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。
> >
> >你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
> >[2] 来读取一个dataframe。
> >
> >[1]
> >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >[2]
> >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
> >
> >Best,
> >Xingbo
> >
> >whh_960101 <wh...@163.com> 于2020年9月7日周一 上午11:22写道:
> >
> >> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
> >> dic = {1:'a',2:'b'}
> >> 此时定义udf如下:
> >>
> >> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
> >> def func(dic,f):
> >>    ......
> >>    return L
> >> st_env.register_function("func", func)
> >> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
> >> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
> >> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 16:02:56,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >Hi,
> >> >
> >> >推荐你使用ddl来声明你上下游用的connector
> >> >
> >> >```
> >> >table_env.execute_sql("""
> >> >CREATE TABLE output (
> >> >data STRING ARRAY
> >> >) WITH (
> >> > 'connector' = 'filesystem',           -- required: specify the connector
> >> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> >> > 'format' = 'json',
> >> > 'json.fail-on-missing-field' = 'false',
> >> > 'json.ignore-parse-errors' = 'true'
> >> >)
> >> >""")
> >> >
> >>
> >> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
> >> >```
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >
> >> >
> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 下午3:46写道:
> >> >
> >> >> 您好,我是想让输出insert_into到目标表中,具体如下:
> >> >> st_env=StreamExecutionEnvironment.get_execution_environment()
> >> >> st_env.connect了一个source table(table包含a字段),
> >> >> 然后
> >> >> | st_env.connect(FileSystem().path('tmp')) \ |
> >> >> | | .with_format(OldCsv() |
> >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> >> >> | | .with_schema(Schema() |
> >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> >> >> | | .create_temporary_table('sink') |
> >> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> >> >> 然后我定义了一个udf
> >> >>
> >> >>
> >> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >> def func(a):
> >> >>     rec_list = a.split(',')
> >> >>     res_arr = np.arrary(rec_list,dtype=str)
> >> >>     return res_arr
> >> >> st_env.register_function("func", func)
> >> >> st_env.from_path("source").select("func(a)").insert_into("sink")
> >> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
> >> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> >> >> res_arr[0],tmp文件里面的字符串就是正确。
> >> >> 我想要得到array,该怎么解决?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >> >Hi,
> >> >> >
> >> >> >你是调试的时候想看结果吗?
> >> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >> >> >
> >> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >> >> >
> >> >> >```
> >> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >> >> >
> >> >> >@udf(input_types=DataTypes.STRING(),
> >> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >> >def func(a):
> >> >> >     return np.array([a, a, a], dtype=str)
> >> >> >
> >> >> >table_env.register_function("func", func)
> >> >> >
> >> >> >table.select("func(b)").to_pandas()
> >> >> >```
> >> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> >[1]
> >> >> >
> >> >>
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >> >> >
> >> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 下午2:50写道:
> >> >> >
> >> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> >> >> 我的udf输出了一个numpy.array(dtype = str),
> >> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >> >> >>
> >> >> >>
> >> >>
> >> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> >> >> 请问这个问题该怎么解决?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >> >> >Hi,
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Xingbo
> >> >> >> >
> >> >> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
> >> >> >> >
> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> >> >> >> udf定义如下:
> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> >> >> >> def fun(data):
> >> >> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> >> >> >> 希望您能给我提供好的解决办法,万分感谢!
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >> >> >> >Hi,
> >> >> >> >> >
> >> >> >> >> >我觉得你从头详细描述一下你的表结构。
> >> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >> >> >>
> >> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >> >> >> >
> >> >> >> >> >[1]
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >> >> >> >
> >> >> >> >> >Best,
> >> >> >> >> >Xingbo
> >> >> >> >> >
> >> >> >> >> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
> >> >> >> >> >
> >> >> >> >> >>
> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> >> DataTypes.STRING()]
> >> >> >> >> >>
> >> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> >> >> >> 或者正确写法是什么样的,感谢解答!
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >> | |
> >> >> >> >> >> whh_960101
> >> >> >> >> >> |
> >> >> >> >> >> |
> >> >> >> >> >> 邮箱:whh_960101@163.com
> >> >> >> >> >> |
> >> >> >> >> >>
> >> >> >> >> >> 签名由 网易邮箱大师 定制
> >> >> >> >> >>
> >> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> >> >> >> Hi,
> >> >> >> >> >> input_types定义的是每一个列的具体类型。
> >> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> >> >> >> 正确的写法是
> >> >> >> >> >>
> >> >> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> >> >> DataTypes.STRING()]
> >> >> >> >> >>
> >> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> >> >> >> DataTypes.STRING()),
> >> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> >> >> >> DataTypes.STRING())])
> >> >> >> >> >>
> >> >> >> >> >> Best,
> >> >> >> >> >> Xingbo
> >> >> >> >> >>
> >> >> >> >> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
> >> >> >> >> >>
> >> >> >> >> >> >
> >> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> >> >> >> > input_type:input_type should be DataType but contain
> >> >> >> RowField(RECID,
> >> >> >> >> >> > VARCHAR)
> >> >> >> >> >> > 我的pyflink版本:1.11.1
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>
>
>
>
>

Re:Re: Re: Re: Re: Re: pyflink-udf 问题反馈

Posted by whh_960101 <wh...@163.com>.
您好,
图中像datagen和print这样的connector更像是表名,之前听说的只有json、csv、filesystem这种类型的connector,请问connector在使用连接器DDL创建表时的作用是什么

















在 2020-09-07 11:33:06,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。
>
>你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
>[2] 来读取一个dataframe。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
>
>Best,
>Xingbo
>
>whh_960101 <wh...@163.com> 于2020年9月7日周一 上午11:22写道:
>
>> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
>> dic = {1:'a',2:'b'}
>> 此时定义udf如下:
>>
>> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
>> def func(dic,f):
>>    ......
>>    return L
>> st_env.register_function("func", func)
>> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
>> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
>> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 16:02:56,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >Hi,
>> >
>> >推荐你使用ddl来声明你上下游用的connector
>> >
>> >```
>> >table_env.execute_sql("""
>> >CREATE TABLE output (
>> >data STRING ARRAY
>> >) WITH (
>> > 'connector' = 'filesystem',           -- required: specify the connector
>> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
>> > 'format' = 'json',
>> > 'json.fail-on-missing-field' = 'false',
>> > 'json.ignore-parse-errors' = 'true'
>> >)
>> >""")
>> >
>>
>> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
>> >```
>> >
>> >Best,
>> >Xingbo
>> >
>> >
>> >
>> >whh_960101 <wh...@163.com> 于2020年9月4日周五 下午3:46写道:
>> >
>> >> 您好,我是想让输出insert_into到目标表中,具体如下:
>> >> st_env=StreamExecutionEnvironment.get_execution_environment()
>> >> st_env.connect了一个source table(table包含a字段),
>> >> 然后
>> >> | st_env.connect(FileSystem().path('tmp')) \ |
>> >> | | .with_format(OldCsv() |
>> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
>> >> | | .with_schema(Schema() |
>> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
>> >> | | .create_temporary_table('sink') |
>> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
>> >> 然后我定义了一个udf
>> >>
>> >>
>> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >> def func(a):
>> >>     rec_list = a.split(',')
>> >>     res_arr = np.arrary(rec_list,dtype=str)
>> >>     return res_arr
>> >> st_env.register_function("func", func)
>> >> st_env.from_path("source").select("func(a)").insert_into("sink")
>> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
>> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
>> >> res_arr[0],tmp文件里面的字符串就是正确。
>> >> 我想要得到array,该怎么解决?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >> >Hi,
>> >> >
>> >> >你是调试的时候想看结果吗?
>> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
>> >> >
>> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
>> >> >
>> >> >```
>> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>> >> >
>> >> >@udf(input_types=DataTypes.STRING(),
>> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >> >def func(a):
>> >> >     return np.array([a, a, a], dtype=str)
>> >> >
>> >> >table_env.register_function("func", func)
>> >> >
>> >> >table.select("func(b)").to_pandas()
>> >> >```
>> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>> >> >
>> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 下午2:50写道:
>> >> >
>> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> >> >> 我的udf输出了一个numpy.array(dtype = str),
>> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>> >> >>
>> >> >>
>> >>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> >> >> 请问这个问题该怎么解决?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >> >> >Hi,
>> >> >> >
>> >> >>
>> >> >>
>> >>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >> >> >
>> >> >> >Best,
>> >> >> >Xingbo
>> >> >> >
>> >> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
>> >> >> >
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> >> >> udf定义如下:
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> >> >> def fun(data):
>> >> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >> >> >>
>> >> >> >>
>> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> >> >> 希望您能给我提供好的解决办法,万分感谢!
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >> >> >> >Hi,
>> >> >> >> >
>> >> >> >> >我觉得你从头详细描述一下你的表结构。
>> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >> >>
>> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >> >> >
>> >> >> >> >[1]
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >> >> >
>> >> >> >> >Best,
>> >> >> >> >Xingbo
>> >> >> >> >
>> >> >> >> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
>> >> >> >> >
>> >> >> >> >>
>> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> >> DataTypes.STRING()]
>> >> >> >> >>
>> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> | |
>> >> >> >> >> whh_960101
>> >> >> >> >> |
>> >> >> >> >> |
>> >> >> >> >> 邮箱:whh_960101@163.com
>> >> >> >> >> |
>> >> >> >> >>
>> >> >> >> >> 签名由 网易邮箱大师 定制
>> >> >> >> >>
>> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> >> >> Hi,
>> >> >> >> >> input_types定义的是每一个列的具体类型。
>> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> >> >> 正确的写法是
>> >> >> >> >>
>> >> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> >> >> DataTypes.STRING()]
>> >> >> >> >>
>> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
>> >> >> DataTypes.STRING()),
>> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> >> >> DataTypes.STRING())])
>> >> >> >> >>
>> >> >> >> >> Best,
>> >> >> >> >> Xingbo
>> >> >> >> >>
>> >> >> >> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
>> >> >> >> >>
>> >> >> >> >> >
>> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> >> >> > input_type:input_type should be DataType but contain
>> >> >> RowField(RECID,
>> >> >> >> >> > VARCHAR)
>> >> >> >> >> > 我的pyflink版本:1.11.1
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>

Re: Re: Re: Re: Re: pyflink-udf 问题反馈

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。

你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
[2] 来读取一个dataframe。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年9月7日周一 上午11:22写道:

> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
> dic = {1:'a',2:'b'}
> 此时定义udf如下:
>
> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
> def func(dic,f):
>    ......
>    return L
> st_env.register_function("func", func)
> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 16:02:56,"Xingbo Huang" <hx...@gmail.com> 写道:
> >Hi,
> >
> >推荐你使用ddl来声明你上下游用的connector
> >
> >```
> >table_env.execute_sql("""
> >CREATE TABLE output (
> >data STRING ARRAY
> >) WITH (
> > 'connector' = 'filesystem',           -- required: specify the connector
> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> > 'format' = 'json',
> > 'json.fail-on-missing-field' = 'false',
> > 'json.ignore-parse-errors' = 'true'
> >)
> >""")
> >
>
> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
> >```
> >
> >Best,
> >Xingbo
> >
> >
> >
> >whh_960101 <wh...@163.com> 于2020年9月4日周五 下午3:46写道:
> >
> >> 您好,我是想让输出insert_into到目标表中,具体如下:
> >> st_env=StreamExecutionEnvironment.get_execution_environment()
> >> st_env.connect了一个source table(table包含a字段),
> >> 然后
> >> | st_env.connect(FileSystem().path('tmp')) \ |
> >> | | .with_format(OldCsv() |
> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> >> | | .with_schema(Schema() |
> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> >> | | .create_temporary_table('sink') |
> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> >> 然后我定义了一个udf
> >>
> >>
> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> def func(a):
> >>     rec_list = a.split(',')
> >>     res_arr = np.arrary(rec_list,dtype=str)
> >>     return res_arr
> >> st_env.register_function("func", func)
> >> st_env.from_path("source").select("func(a)").insert_into("sink")
> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> >> res_arr[0],tmp文件里面的字符串就是正确。
> >> 我想要得到array,该怎么解决?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >Hi,
> >> >
> >> >你是调试的时候想看结果吗?
> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >> >
> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >> >
> >> >```
> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >> >
> >> >@udf(input_types=DataTypes.STRING(),
> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >def func(a):
> >> >     return np.array([a, a, a], dtype=str)
> >> >
> >> >table_env.register_function("func", func)
> >> >
> >> >table.select("func(b)").to_pandas()
> >> >```
> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >> >
> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 下午2:50写道:
> >> >
> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> >> 我的udf输出了一个numpy.array(dtype = str),
> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >> >>
> >> >>
> >>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> >> 请问这个问题该怎么解决?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >> >Hi,
> >> >> >
> >> >>
> >> >>
> >>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> >> >> udf定义如下:
> >> >> >>
> >> >> >>
> >> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> >> >> def fun(data):
> >> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >> >> >>
> >> >> >>
> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> >> >> 希望您能给我提供好的解决办法,万分感谢!
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >> >> >Hi,
> >> >> >> >
> >> >> >> >我觉得你从头详细描述一下你的表结构。
> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >> >>
> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >> >> >
> >> >> >> >[1]
> >> >> >> >
> >> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Xingbo
> >> >> >> >
> >> >> >> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
> >> >> >> >
> >> >> >> >>
> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> DataTypes.STRING()]
> >> >> >> >>
> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> >> >> 或者正确写法是什么样的,感谢解答!
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> | |
> >> >> >> >> whh_960101
> >> >> >> >> |
> >> >> >> >> |
> >> >> >> >> 邮箱:whh_960101@163.com
> >> >> >> >> |
> >> >> >> >>
> >> >> >> >> 签名由 网易邮箱大师 定制
> >> >> >> >>
> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> >> >> Hi,
> >> >> >> >> input_types定义的是每一个列的具体类型。
> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> >> >> 正确的写法是
> >> >> >> >>
> >> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> >> DataTypes.STRING()]
> >> >> >> >>
> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> >> >> DataTypes.STRING()),
> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> >> >> DataTypes.STRING())])
> >> >> >> >>
> >> >> >> >> Best,
> >> >> >> >> Xingbo
> >> >> >> >>
> >> >> >> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
> >> >> >> >>
> >> >> >> >> >
> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> >> >> > input_type:input_type should be DataType but contain
> >> >> RowField(RECID,
> >> >> >> >> > VARCHAR)
> >> >> >> >> > 我的pyflink版本:1.11.1
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>

Re:Re: Re: Re: Re: pyflink-udf 问题反馈

Posted by whh_960101 <wh...@163.com>.
您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
dic = {1:'a',2:'b'}
此时定义udf如下:
@udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
def func(dic,f):
   ......
   return L
st_env.register_function("func", func)
st_env.from_path("source").select("func(dic,t)").insert_into("sink") #这时我在外部定义好的数据类型dic字典如何作为参数传进来
这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑











在 2020-09-04 16:02:56,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>
>推荐你使用ddl来声明你上下游用的connector
>
>```
>table_env.execute_sql("""
>CREATE TABLE output (
>data STRING ARRAY
>) WITH (
> 'connector' = 'filesystem',           -- required: specify the connector
> 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> 'format' = 'json',
> 'json.fail-on-missing-field' = 'false',
> 'json.ignore-parse-errors' = 'true'
>)
>""")
>
>table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
>```
>
>Best,
>Xingbo
>
>
>
>whh_960101 <wh...@163.com> 于2020年9月4日周五 下午3:46写道:
>
>> 您好,我是想让输出insert_into到目标表中,具体如下:
>> st_env=StreamExecutionEnvironment.get_execution_environment()
>> st_env.connect了一个source table(table包含a字段),
>> 然后
>> | st_env.connect(FileSystem().path('tmp')) \ |
>> | | .with_format(OldCsv() |
>> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
>> | | .with_schema(Schema() |
>> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
>> | | .create_temporary_table('sink') |
>> connect了一个sink表,format、schema都是DataTypes.ARRAY()
>> 然后我定义了一个udf
>>
>> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> def func(a):
>>     rec_list = a.split(',')
>>     res_arr = np.arrary(rec_list,dtype=str)
>>     return res_arr
>> st_env.register_function("func", func)
>> st_env.from_path("source").select("func(a)").insert_into("sink")
>> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return
>> res_arr[0],tmp文件里面的字符串就是正确。
>> 我想要得到array,该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 15:17:38,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >Hi,
>> >
>> >你是调试的时候想看结果吗?
>> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
>> >
>> >个人觉得to_pandas最简单,比如你可以试试下面的例子
>> >
>> >```
>> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>> >
>> >@udf(input_types=DataTypes.STRING(),
>> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >def func(a):
>> >     return np.array([a, a, a], dtype=str)
>> >
>> >table_env.register_function("func", func)
>> >
>> >table.select("func(b)").to_pandas()
>> >```
>> >然后,你可以看看官方文档[1],让你快速上手PyFlink
>> >
>> >Best,
>> >Xingbo
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>> >
>> >whh_960101 <wh...@163.com> 于2020年9月4日周五 下午2:50写道:
>> >
>> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> >> 我的udf输出了一个numpy.array(dtype = str),
>> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>> >>
>> >>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> >> 请问这个问题该怎么解决?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >> >Hi,
>> >> >
>> >>
>> >>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
>> >> >
>> >> >>
>> >> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> >> udf定义如下:
>> >> >>
>> >> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> >> def fun(data):
>> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >> >>
>> >> >>
>> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> >> 希望您能给我提供好的解决办法,万分感谢!
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >> >> >Hi,
>> >> >> >
>> >> >> >我觉得你从头详细描述一下你的表结构。
>> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >>
>> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >> >
>> >> >> >[1]
>> >> >> >
>> >> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >> >
>> >> >> >Best,
>> >> >> >Xingbo
>> >> >> >
>> >> >> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
>> >> >> >
>> >> >> >>
>> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >> >>
>> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >> >>
>> >> >> >>
>> >> >> >> | |
>> >> >> >> whh_960101
>> >> >> >> |
>> >> >> >> |
>> >> >> >> 邮箱:whh_960101@163.com
>> >> >> >> |
>> >> >> >>
>> >> >> >> 签名由 网易邮箱大师 定制
>> >> >> >>
>> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> >> Hi,
>> >> >> >> input_types定义的是每一个列的具体类型。
>> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> >> 正确的写法是
>> >> >> >>
>> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> >> DataTypes.STRING()]
>> >> >> >>
>> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
>> >> DataTypes.STRING()),
>> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> >> DataTypes.STRING())])
>> >> >> >>
>> >> >> >> Best,
>> >> >> >> Xingbo
>> >> >> >>
>> >> >> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
>> >> >> >>
>> >> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> >> > input_type:input_type should be DataType but contain
>> >> RowField(RECID,
>> >> >> >> > VARCHAR)
>> >> >> >> > 我的pyflink版本:1.11.1
>> >> >> >>
>> >> >>
>> >>
>>

Re: Re: Re: Re: pyflink-udf 问题反馈

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

推荐你使用ddl来声明你上下游用的connector

```
table_env.execute_sql("""
CREATE TABLE output (
data STRING ARRAY
) WITH (
 'connector' = 'filesystem',           -- required: specify the connector
 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)
""")

table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
```

Best,
Xingbo



whh_960101 <wh...@163.com> 于2020年9月4日周五 下午3:46写道:

> 您好,我是想让输出insert_into到目标表中,具体如下:
> st_env=StreamExecutionEnvironment.get_execution_environment()
> st_env.connect了一个source table(table包含a字段),
> 然后
> | st_env.connect(FileSystem().path('tmp')) \ |
> | | .with_format(OldCsv() |
> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> | | .with_schema(Schema() |
> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
> | | .create_temporary_table('sink') |
> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> 然后我定义了一个udf
>
> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def func(a):
>     rec_list = a.split(',')
>     res_arr = np.arrary(rec_list,dtype=str)
>     return res_arr
> st_env.register_function("func", func)
> st_env.from_path("source").select("func(a)").insert_into("sink")
> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> res_arr[0],tmp文件里面的字符串就是正确。
> 我想要得到array,该怎么解决?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 15:17:38,"Xingbo Huang" <hx...@gmail.com> 写道:
> >Hi,
> >
> >你是调试的时候想看结果吗?
> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >
> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >
> >```
> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >
> >@udf(input_types=DataTypes.STRING(),
> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >def func(a):
> >     return np.array([a, a, a], dtype=str)
> >
> >table_env.register_function("func", func)
> >
> >table.select("func(b)").to_pandas()
> >```
> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >
> >Best,
> >Xingbo
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >
> >whh_960101 <wh...@163.com> 于2020年9月4日周五 下午2:50写道:
> >
> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> 我的udf输出了一个numpy.array(dtype = str),
> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >>
> >>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> 请问这个问题该怎么解决?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >Hi,
> >> >
> >>
> >>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
> >> >
> >> >>
> >> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> >> udf定义如下:
> >> >>
> >> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> >> def fun(data):
> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >> >>
> >> >>
> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> >> 希望您能给我提供好的解决办法,万分感谢!
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >> >Hi,
> >> >> >
> >> >> >我觉得你从头详细描述一下你的表结构。
> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >>
> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >> >
> >> >> >[1]
> >> >> >
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
> >> >> >
> >> >> >>
> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> DataTypes.STRING()]
> >> >> >>
> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> >> 或者正确写法是什么样的,感谢解答!
> >> >> >>
> >> >> >>
> >> >> >> | |
> >> >> >> whh_960101
> >> >> >> |
> >> >> >> |
> >> >> >> 邮箱:whh_960101@163.com
> >> >> >> |
> >> >> >>
> >> >> >> 签名由 网易邮箱大师 定制
> >> >> >>
> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> >> Hi,
> >> >> >> input_types定义的是每一个列的具体类型。
> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> >> 正确的写法是
> >> >> >>
> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> DataTypes.STRING()]
> >> >> >>
> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> >> DataTypes.STRING()),
> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> >> DataTypes.STRING())])
> >> >> >>
> >> >> >> Best,
> >> >> >> Xingbo
> >> >> >>
> >> >> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
> >> >> >>
> >> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> >> > input_type:input_type should be DataType but contain
> >> RowField(RECID,
> >> >> >> > VARCHAR)
> >> >> >> > 我的pyflink版本:1.11.1
> >> >> >>
> >> >>
> >>
>

Re:Re: Re: Re: pyflink-udf 问题反馈

Posted by whh_960101 <wh...@163.com>.
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
| | .with_schema(Schema() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
| | .create_temporary_table('sink') |
connect了一个sink表,format、schema都是DataTypes.ARRAY()
然后我定义了一个udf
@udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
    rec_list = a.split(',')
    res_arr = np.arrary(rec_list,dtype=str)
    return res_arr
st_env.register_function("func", func)
st_env.from_path("source").select("func(a)").insert_into("sink")
最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return res_arr[0],tmp文件里面的字符串就是正确。
我想要得到array,该怎么解决?



















在 2020-09-04 15:17:38,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>
>你是调试的时候想看结果吗?
>你可以直接table.to_pandas()来看结果,或者用print connector来看。
>
>个人觉得to_pandas最简单,比如你可以试试下面的例子
>
>```
>table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>
>@udf(input_types=DataTypes.STRING(),
>result_type=DataTypes.ARRAY(DataTypes.STRING()))
>def func(a):
>     return np.array([a, a, a], dtype=str)
>
>table_env.register_function("func", func)
>
>table.select("func(b)").to_pandas()
>```
>然后,你可以看看官方文档[1],让你快速上手PyFlink
>
>Best,
>Xingbo
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>
>whh_960101 <wh...@163.com> 于2020年9月4日周五 下午2:50写道:
>
>> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> 我的udf输出了一个numpy.array(dtype = str),
>> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> 请问这个问题该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >Hi,
>> >
>>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >
>> >Best,
>> >Xingbo
>> >
>> >whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
>> >
>> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> udf定义如下:
>> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> def fun(data):
>> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >>
>> >>
>> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> 希望您能给我提供好的解决办法,万分感谢!
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >> >Hi,
>> >> >
>> >> >我觉得你从头详细描述一下你的表结构。
>> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
>> >> >
>> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> DataTypes.STRING()]
>> >> >>
>> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >>
>> >> >>
>> >> >> | |
>> >> >> whh_960101
>> >> >> |
>> >> >> |
>> >> >> 邮箱:whh_960101@163.com
>> >> >> |
>> >> >>
>> >> >> 签名由 网易邮箱大师 定制
>> >> >>
>> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> Hi,
>> >> >> input_types定义的是每一个列的具体类型。
>> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> 正确的写法是
>> >> >>
>> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >>
>> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
>> DataTypes.STRING()),
>> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> DataTypes.STRING())])
>> >> >>
>> >> >> Best,
>> >> >> Xingbo
>> >> >>
>> >> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
>> >> >>
>> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> > input_type:input_type should be DataType but contain
>> RowField(RECID,
>> >> >> > VARCHAR)
>> >> >> > 我的pyflink版本:1.11.1
>> >> >>
>> >>
>>

Re:Re: Re: Re: pyflink-udf 问题反馈

Posted by whh_960101 <wh...@163.com>.
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
| | .with_schema(Schema() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
| | .create_temporary_table('sink') |
connect了一个sink表,format、schema都是DataTypes.ARRAY()
然后我定义了一个udf
@udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
    rec_list = a.split(',')
    res_arr = np.arrary(rec_list,dtype=str)
    return res_arr
st_env.register_function("func", func)
st_env.from_path("source").select("func(a)").insert_into("sink")
最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容(假如res_arr=['1','2','3']),如果我单独返回一个值,比如return res_arr[0],tmp文件就显示'1'。
我想要得到array,该怎么解决?



















在 2020-09-04 15:17:38,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>
>你是调试的时候想看结果吗?
>你可以直接table.to_pandas()来看结果,或者用print connector来看。
>
>个人觉得to_pandas最简单,比如你可以试试下面的例子
>
>```
>table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>
>@udf(input_types=DataTypes.STRING(),
>result_type=DataTypes.ARRAY(DataTypes.STRING()))
>def func(a):
>     return np.array([a, a, a], dtype=str)
>
>table_env.register_function("func", func)
>
>table.select("func(b)").to_pandas()
>```
>然后,你可以看看官方文档[1],让你快速上手PyFlink
>
>Best,
>Xingbo
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>
>whh_960101 <wh...@163.com> 于2020年9月4日周五 下午2:50写道:
>
>> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> 我的udf输出了一个numpy.array(dtype = str),
>> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> 请问这个问题该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >Hi,
>> >
>>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >
>> >Best,
>> >Xingbo
>> >
>> >whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
>> >
>> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> udf定义如下:
>> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> def fun(data):
>> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >>
>> >>
>> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> 希望您能给我提供好的解决办法,万分感谢!
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >> >Hi,
>> >> >
>> >> >我觉得你从头详细描述一下你的表结构。
>> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
>> >> >
>> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> DataTypes.STRING()]
>> >> >>
>> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >>
>> >> >>
>> >> >> | |
>> >> >> whh_960101
>> >> >> |
>> >> >> |
>> >> >> 邮箱:whh_960101@163.com
>> >> >> |
>> >> >>
>> >> >> 签名由 网易邮箱大师 定制
>> >> >>
>> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> Hi,
>> >> >> input_types定义的是每一个列的具体类型。
>> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> 正确的写法是
>> >> >>
>> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >>
>> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
>> DataTypes.STRING()),
>> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> DataTypes.STRING())])
>> >> >>
>> >> >> Best,
>> >> >> Xingbo
>> >> >>
>> >> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
>> >> >>
>> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> > input_type:input_type should be DataType but contain
>> RowField(RECID,
>> >> >> > VARCHAR)
>> >> >> > 我的pyflink版本:1.11.1
>> >> >>
>> >>
>>





 

Re: Re: Re: pyflink-udf 问题反馈

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

你是调试的时候想看结果吗?
你可以直接table.to_pandas()来看结果,或者用print connector来看。

个人觉得to_pandas最简单,比如你可以试试下面的例子

```
table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])

@udf(input_types=DataTypes.STRING(),
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
     return np.array([a, a, a], dtype=str)

table_env.register_function("func", func)

table.select("func(b)").to_pandas()
```
然后,你可以看看官方文档[1],让你快速上手PyFlink

Best,
Xingbo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html

whh_960101 <wh...@163.com> 于2020年9月4日周五 下午2:50写道:

> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> 我的udf输出了一个numpy.array(dtype = str),
> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> 请问这个问题该怎么解决?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
> >Hi,
> >
>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >
> >Best,
> >Xingbo
> >
> >whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
> >
> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> udf定义如下:
> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> def fun(data):
> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >>
> >>
> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> 希望您能给我提供好的解决办法,万分感谢!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
> >> >Hi,
> >> >
> >> >我觉得你从头详细描述一下你的表结构。
> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
> >> >
> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()]
> >> >>
> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> 或者正确写法是什么样的,感谢解答!
> >> >>
> >> >>
> >> >> | |
> >> >> whh_960101
> >> >> |
> >> >> |
> >> >> 邮箱:whh_960101@163.com
> >> >> |
> >> >>
> >> >> 签名由 网易邮箱大师 定制
> >> >>
> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> Hi,
> >> >> input_types定义的是每一个列的具体类型。
> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> 正确的写法是
> >> >>
> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> DataTypes.STRING()]
> >> >>
> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> DataTypes.STRING()),
> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> DataTypes.STRING())])
> >> >>
> >> >> Best,
> >> >> Xingbo
> >> >>
> >> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
> >> >>
> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> > input_type:input_type should be DataType but contain
> RowField(RECID,
> >> >> > VARCHAR)
> >> >> > 我的pyflink版本:1.11.1
> >> >>
> >>
>

Re:Re: Re: pyflink-udf 问题反馈

Posted by whh_960101 <wh...@163.com>.
您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
我的udf输出了一个numpy.array(dtype = str),  result_type设的是DataTypes.ARRAY(DataTypes.STRING())
把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
请问这个问题该怎么解决?

















在 2020-09-04 10:35:03,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>
>你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>
>Best,
>Xingbo
>
>whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:
>
>>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> udf定义如下:
>>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> def fun(data):
>>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>>
>>
>> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> 希望您能给我提供好的解决办法,万分感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
>> >Hi,
>> >
>> >我觉得你从头详细描述一下你的表结构。
>> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >
>> >Best,
>> >Xingbo
>> >
>> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
>> >
>> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>> >>
>> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> 或者正确写法是什么样的,感谢解答!
>> >>
>> >>
>> >> | |
>> >> whh_960101
>> >> |
>> >> |
>> >> 邮箱:whh_960101@163.com
>> >> |
>> >>
>> >> 签名由 网易邮箱大师 定制
>> >>
>> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> Hi,
>> >> input_types定义的是每一个列的具体类型。
>> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> 正确的写法是
>> >>
>> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> DataTypes.STRING()]
>> >>
>> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >>    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
>> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> DataTypes.STRING())])
>> >>
>> >> Best,
>> >> Xingbo
>> >>
>> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
>> >>
>> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> > input_type:input_type should be DataType but contain RowField(RECID,
>> >> > VARCHAR)
>> >> > 我的pyflink版本:1.11.1
>> >>
>>

Re: Re: pyflink-udf 问题反馈

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年9月4日周五 上午9:26写道:

>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> udf定义如下:
>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> def fun(data):
>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>
>
> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> 希望您能给我提供好的解决办法,万分感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
> >Hi,
> >
> >我觉得你从头详细描述一下你的表结构。
> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >
> >Best,
> >Xingbo
> >
> ><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
> >
> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
> >>
> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> 或者正确写法是什么样的,感谢解答!
> >>
> >>
> >> | |
> >> whh_960101
> >> |
> >> |
> >> 邮箱:whh_960101@163.com
> >> |
> >>
> >> 签名由 网易邮箱大师 定制
> >>
> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> Hi,
> >> input_types定义的是每一个列的具体类型。
> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> 正确的写法是
> >>
> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()]
> >>
> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >>    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> DataTypes.STRING())])
> >>
> >> Best,
> >> Xingbo
> >>
> >> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
> >>
> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> > input_type:input_type should be DataType but contain RowField(RECID,
> >> > VARCHAR)
> >> > 我的pyflink版本:1.11.1
> >>
>

Re:Re: pyflink-udf 问题反馈

Posted by whh_960101 <wh...@163.com>.
您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
udf定义如下:
@udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
def fun(data):
     b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错


如果通过table.select("after.b")或者table.select('after').select('b')也会报错
希望您能给我提供好的解决办法,万分感谢!

















在 2020-09-03 22:23:28,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>
>我觉得你从头详细描述一下你的表结构。
>比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>
>Best,
>Xingbo
>
><wh...@163.com> 于2020年9月3日周四 下午9:45写道:
>
>> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>>
>> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> 或者正确写法是什么样的,感谢解答!
>>
>>
>> | |
>> whh_960101
>> |
>> |
>> 邮箱:whh_960101@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> Hi,
>> input_types定义的是每一个列的具体类型。
>> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> 正确的写法是
>>
>>    input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>>
>> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>>    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
>> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> DataTypes.STRING())])
>>
>> Best,
>> Xingbo
>>
>> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
>>
>> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> > input_type:input_type should be DataType but contain RowField(RECID,
>> > VARCHAR)
>> > 我的pyflink版本:1.11.1
>>

Re: pyflink-udf 问题反馈

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

我觉得你从头详细描述一下你的表结构。
比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions

Best,
Xingbo

<wh...@163.com> 于2020年9月3日周四 下午9:45写道:

> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>
> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> 或者正确写法是什么样的,感谢解答!
>
>
> | |
> whh_960101
> |
> |
> 邮箱:whh_960101@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年09月03日 21:14,Xingbo Huang 写道:
> Hi,
> input_types定义的是每一个列的具体类型。
> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> 正确的写法是
>
>    input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>
> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>    input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> DataTypes.STRING())])
>
> Best,
> Xingbo
>
> whh_960101 <wh...@163.com> 于2020年9月3日周四 下午9:03写道:
>
> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> > input_type:input_type should be DataType but contain RowField(RECID,
> > VARCHAR)
> > 我的pyflink版本:1.11.1
>