You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "1129656513@qq.com" <11...@qq.com> on 2020/07/22 06:13:01 UTC

回复: Re: flinksql1.11中主键声明的问题

您好:

       非常感谢您的建议,我已经成功解决了这个问题,但是我又发现了一个新的问题,我这里设置的超时时间是一分钟或者超时行数是5000行,
我在这期间更新了维表数据,但是我发现已经超过了超时时间,输出结果仍然没有被更新,是我理解的有问题么?
我尝试了停止输入流数据直到达到超时时间后仍然没有更新维表,除非停止整个程序,否则我的维表数据都不会被更新。
请问这个问题有解决的办法么?

def register_mysql_source(st_env):
    source_ddl = \
    """
    CREATE TABLE dim_mysql (
    id int,  -- 
    type varchar -- 
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3390/test',
    'table-name' = 'flink_test',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '****',
    'password' = '****',
    'lookup.cache.max-rows' = '5000',
    'lookup.cache.ttl' = '1s',
    'lookup.max-retries' = '3'
    )
    """    
    st_env.sql_update(source_ddl)
  


                                      感谢!




琴师

 
发件人: Leonard Xu
发送时间: 2020-07-22 10:54
收件人: user-zh
主题: Re: flinksql1.11中主键声明的问题
Hi,
 
你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。
在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。
我理解你把connector的with参数更新成新的就解决问题了。
 
Best
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options>
> 
> def register_rides_source(st_env):
>    source_ddl = \
>    """
>    create table source1(
>     id int,
>     time1 varchar ,
>     type string
>     ) with (
>    'connector.type' = 'kafka',
>    'connector.topic' = 'tp1',
>    'connector.startup-mode' = 'latest-offset',
>    'connector.properties.bootstrap.servers' = 'localhost:9092',
>    'connector.properties.zookeeper.connect' = 'localhost:2181',
>    'format.type' = 'json',
>    'connector.version' = 'universal',
>    'update-mode' = 'append'
>     )
>    “"" 

回复: Re: flinksql1.11中主键声明的问题

Posted by 琴师 <11...@qq.com>.
你好:
下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。

我的输入流如下,每秒新增一条写入到kafka
 topic = 'tp1'
    for i  in  range(1,10000) :
        stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        msg = {}
        msg['id']= i
        msg['time1']= stime
        msg['type']=1
        print(msg)
        send_msg(topic, msg)
        time.sleep(1)

{'id': 1, 'time1': '20200722140624', 'type': 1}
{'id': 2, 'time1': '20200722140625', 'type': 1}
{'id': 3, 'time1': '20200722140626', 'type': 1}
{'id': 4, 'time1': '20200722140627', 'type': 1}
{'id': 5, 'time1': '20200722140628', 'type': 1}
{'id': 6, 'time1': '20200722140629', 'type': 1}
{'id': 7, 'time1': '20200722140631', 'type': 1}
{'id': 8, 'time1': '20200722140632', 'type': 1}

维表数据如下
id    type
2 err
1 err

我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble 


def from_kafka_to_kafka_demo():

    # use blink table planner
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
    st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)

    # register source and sink
    register_rides_source(st_env)
    register_rides_sink(st_env)
    register_mysql_source(st_env)
  

    st_env.sql_update("insert into flink_result select  cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")
    st_env.execute("2-from_kafka_to_kafka")
    


def register_rides_source(st_env):
    source_ddl = \
    """
    create table source1(
     id int,
     time1 varchar ,
     type string
     ) with (
    'connector' = 'kafka',
    'topic' = 'tp1',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
     )
    """
    st_env.sql_update(source_ddl)

def register_mysql_source(st_env):
    source_ddl = \
    """
    CREATE TABLE dim_mysql (
    id int,  -- 
    type varchar -- 
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3390/test',
    'table-name' = 'flink_test',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '***',
    'password' = '***',
    'lookup.cache.max-rows' = '5000',
    'lookup.cache.ttl' = '1s',
    'lookup.max-retries' = '3'
    )
    """    
    st_env.sql_update(source_ddl)

def register_rides_sink(st_env):
    sink_ddl = \
    """
    CREATE TABLE flink_result (
    id int,   
    type varchar,
    rtime bigint,
    primary key(id)  NOT ENFORCED
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3390/test',
    'table-name' = 'flink_result',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '***',
    'password' = '***',
    'sink.buffer-flush.max-rows' = '5000', 
    'sink.buffer-flush.interval' = '2s', 
    'sink.max-retries' = '3'
    )
    """
    st_env.sql_update(sink_ddl)


if __name__ == '__main__':
    from_kafka_to_kafka_demo()



初学者
PyFlink爱好者
琴师

 
发件人: Leonard Xu
发送时间: 2020-07-22 15:05
收件人: user-zh
主题: Re: flinksql1.11中主键声明的问题
Hi,
 
  我试了下应该是会更新缓存的,你有能复现的例子吗?
 
祝好
> 在 2020年7月22日,14:50,奇怪的不朽琴师 <11...@qq.com> 写道:
> 
> 你好:
> 
> 
> 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
> 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
> 
> 
> 谢谢
> 
> 
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "user-zh"                                                                                    <xbjtdcq@gmail.com&gt;;
> 发送时间:&nbsp;2020年7月22日(星期三) 下午2:42
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> 
> 主题:&nbsp;Re: flinksql1.11中主键声明的问题
> 
> 
> 
> Hello
> 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
> 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
> 
> 祝好
> Leonard Xu
> 
> 
> &gt; 在 2020年7月22日,14:13,1129656513@qq.com 写道:
> &gt; 
> &gt; 输出结果仍然没有被更新
 

Re: flinksql1.11中主键声明的问题

Posted by Leonard Xu <xb...@gmail.com>.
Hi,

  我试了下应该是会更新缓存的,你有能复现的例子吗?

祝好
> 在 2020年7月22日,14:50,奇怪的不朽琴师 <11...@qq.com> 写道:
> 
> 你好:
> 
> 
> 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
> 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
> 
> 
> 谢谢
> 
> 
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "user-zh"                                                                                    <xbjtdcq@gmail.com&gt;;
> 发送时间:&nbsp;2020年7月22日(星期三) 下午2:42
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> 
> 主题:&nbsp;Re: flinksql1.11中主键声明的问题
> 
> 
> 
> Hello
> 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
> 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
> 
> 祝好
> Leonard Xu
> 
> 
> &gt; 在 2020年7月22日,14:13,1129656513@qq.com 写道:
> &gt; 
> &gt; 输出结果仍然没有被更新


回复: flinksql1.11中主键声明的问题

Posted by 奇怪的不朽琴师 <11...@qq.com>.
你好:


可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
我感觉上是维表没有刷新缓存,但是我不知道这为什么。


谢谢


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <xbjtdcq@gmail.com&gt;;
发送时间:&nbsp;2020年7月22日(星期三) 下午2:42
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flinksql1.11中主键声明的问题



Hello
你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
去look up该表时的数据,维表的更新是不会retract之前的历史记录的。

祝好
Leonard Xu


&gt; 在 2020年7月22日,14:13,1129656513@qq.com 写道:
&gt; 
&gt; 输出结果仍然没有被更新

Re: flinksql1.11中主键声明的问题

Posted by Leonard Xu <xb...@gmail.com>.
Hello
你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
去look up该表时的数据,维表的更新是不会retract之前的历史记录的。

祝好
Leonard Xu


> 在 2020年7月22日,14:13,1129656513@qq.com 写道:
> 
> 输出结果仍然没有被更新