You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "chengyanan1008@foxmail.com" <ch...@foxmail.com> on 2020/07/21 02:12:40 UTC
回复: 回复: pyflink1.11.0window
Hi,因为你的Sink只支持数据的insert插入,请检查insert 语句
关键报错信息是这一句:
“AppendStreamTableSink requires that Table has only insert changes.”
chengyanan1008@foxmail.com
发件人: 奇怪的不朽琴师
发送时间: 2020-07-20 16:23
收件人: user-zh
主题: 回复: pyflink1.11.0window
HI :
我现在有一个新的问题,我在此基础上加了一个关联,再写入kafka时报错,如下
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate.
: org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "tou.py", line 99, in <module>
from_kafka_to_kafka_demo()
File "tou.py", line 33, in from_kafka_to_kafka_demo
st_env.sql_update("insert into flink_result select id,type,rowtime from final_result2")
File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 547, in sql_update
self._j_tenv.sqlUpdate(stmt)
File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'AppendStreamTableSink requires that Table has only insert changes.'
这种应该如何实现,需求大概是一个流表(需要分组汇总)关联一个维表。
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble
def from_kafka_to_kafka_demo():
# use blink table planner
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
# register source and sink
register_rides_source(st_env)
register_rides_sink(st_env)
register_mysql_source(st_env)
query = """
select cast(sum(t1.id) as int) as id, max(t1.type) as type,cast(tumble_start(t1.time1, interval '4' second) as bigint) as rowtime
from source1 t1
group by tumble(t1.time1, interval '4' second)
"""
count_result = st_env.sql_query(query)
st_env.create_temporary_view('final_result', count_result)
query2 = """
select t1.id,t2.type,t1.rowtime from final_result t1 left join dim_mysql t2 on t1.type=t2.id
"""
count_result2 = st_env.sql_query(query2)
st_env.create_temporary_view('final_result2', count_result2)
st_env.sql_update("insert into flink_result select id,type,rowtime from final_result2")
st_env.execute("2-from_kafka_to_kafka")
def register_rides_source(st_env):
source_ddl = \
"""
create table source1(
id int,
time2 varchar ,
time1 as TO_TIMESTAMP(time2,'yyyyMMddHHmmss'),
type string,
WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND
) with (
'connector.type' = 'kafka',
'connector.topic' = 'tp1',
'connector.startup-mode' = 'latest-offset',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'format.type' = 'json',
'connector.version' = 'universal'
)
"""
st_env.sql_update(source_ddl)
def register_mysql_source(st_env):
source_ddl = \
"""
CREATE TABLE dim_mysql (
id varchar, --
type varchar --
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3390/test',
'connector.table' = 'flink_test',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = '****',
'connector.password' = '*****',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
)
"""
st_env.sql_update(source_ddl)
def register_rides_sink(st_env):
sink_ddl = \
"""
CREATE TABLE flink_result (
id int,
type varchar,
rtime bigint,
primary key(id)
) WITH (
with (
'connector.type' = 'kafka',
'connector.topic' = 'tp4',
'connector.startup-mode' = 'latest-offset',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'format.type' = 'json',
'connector.version' = 'universal'
)
)
"""
st_env.sql_update(sink_ddl)
if __name__ == '__main__':
from_kafka_to_kafka_demo()
------------------ 原始邮件 ------------------
发件人: "我自己的邮箱" <1129656513@qq.com>;
发送时间: 2020年7月15日(星期三) 下午5:30
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: 回复: pyflink1.11.0window
非常感谢!
------------------ 原始邮件 ------------------
发件人: "user-zh" <acqua.csq@gmail.com>;
发送时间: 2020年7月15日(星期三) 下午5:23
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: pyflink1.11.0window
下面这个例子从kafka读取json格式的数据, 然后做窗口聚合后写入es, 可以参考下代码结构, 修改相应数据字段。 这份代码你本地应该是不能运行的
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.STRING())
def platform_code_to_name(code):
return "mobile" if code == 0 else "pc"
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=env_settings)
source_ddl = """
CREATE TABLE payment_msg(
createTime VARCHAR,
rt as TO_TIMESTAMP(createTime),
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
paySource INT,
WATERMARK FOR rt as rt - INTERVAL '2' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'payment_msg_2',
'connector.properties.bootstrap.servers' = '0.0.0.0:9092',
'connector.properties.group.id' = 'test_3',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
t_env.sql_update(source_ddl)
es_sink_ddl = """
CREATE TABLE es_sink (
platform VARCHAR,
pay_amount DOUBLE,
rowtime TIMESTAMP(3)
) with (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'platform_pay_amount_1',
'connector.document-type' = 'payment',
'update-mode' = 'upsert',
'connector.flush-on-checkpoint' = 'true',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.max-size' = '42mb',
'connector.bulk-flush.max-actions' = '32',
'connector.bulk-flush.interval' = '1000',
'connector.bulk-flush.backoff.delay' = '1000',
'format.type' = 'json'
)
"""
t_env.sql_update(es_sink_ddl)
t_env.register_function('platformcodetoname', platform_code_to_name)
query = """
select platformcodetoname(payPlatform) as platform, sum(payAmount)
as pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT)
as rowtime
from payment_msg
group by tumble(rt, interval '5' seconds), payPlatform
"""
count_result = t_env.sql_query(query)
t_env.create_temporary_view('windowed_values', count_result)
query2 = """
select platform, last_value(pay_amount), rowtime from
windowed_values group by platform, rowtime
"""
final_result = t_env.sql_query(query2)
final_result.execute_insert(table_path='es_sink')
if __name__ == '__main__':
log_processing()
奇怪的不朽琴师 <1129656513@qq.com> 于2020年7月15日周三 下午4:40写道:
> &nbsp;Shuiqiang,你好:
> &nbsp; &nbsp;
> &nbsp;hi,能否请求您贡献一下完整的代码的案例,我是初学者,官网的2-from_kafka_to_kafka.py这个没有窗口,我现在想要一个在此基础上有窗口的demo,尝试编了很久也未能解决。我在给这个demo加上窗口功能后总是有各种各样的问题,十分痛苦,如能帮助,感激不尽。
>
>
> 恳请所有看到此封邮件的大佬!
>
>
> 谢谢!
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
> "user-zh"
> <
> acqua.csq@gmail.com&gt;;
> 发送时间:&nbsp;2020年7月15日(星期三) 中午11:25
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: pyflink1.11.0window
>
>
>
> 举个sql例子
> select platformcodetoname(payPlatform) as platform, sum(payAmount) as
> pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as
> rowtime
> from payment_msg group by tumble(rt, interval '5' seconds), payPlatform
> 这个query 对每5s的tumble窗口做统计。
>
> 奇怪的不朽琴师 <1129656513@qq.com&gt; 于2020年7月15日周三 上午11:10写道:
>
> &gt; Shuiqiang,你好:
> &gt; &amp;nbsp;
> &amp;nbsp;我的目的是每间隔一段时间做一次汇总统计,比如每两秒做一下汇总,请问这个需求我改如何定义window?
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "user-zh"
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> <
> &gt; acqua.csq@gmail.com&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年7月15日(星期三) 上午10:51
> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: pyflink1.11.0window
> &gt;
> &gt;
> &gt;
> &gt; 琴师你好,
> &gt; 异常栈信息org.apache.flink.table.api.ValidationException: A tumble window
> &gt; expects a size value literal.
> &gt; 看起来是接下tumble window定义的代码不太正确吧
> &gt;
> &gt; Best,
> &gt; Shuiqiang
> &gt;
> &gt; 奇怪的不朽琴师 <1129656513@qq.com&amp;gt; 于2020年7月15日周三 上午10:27写道:
> &gt;
> &gt; &amp;gt; 你好:
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> &gt; &amp;gt;
> &gt;
> &amp;amp;nbsp;我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。
> &gt; &amp;gt; Traceback (most recent call last):
> &gt; &amp;gt; &amp;amp;nbsp; File "tou.py", line 71, in <module&amp;amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; from_kafka_to_kafka_demo()
> &gt; &amp;gt; &amp;amp;nbsp; File "tou.py", line 21, in
> from_kafka_to_kafka_demo
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; .select(" id,&amp;amp;nbsp;
> time1 , time1 ")\
> &gt; &amp;gt; &amp;amp;nbsp; File
> &gt; &amp;gt;
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table.py", line
> &gt; 907,
> &gt; &amp;gt; in select
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; return
> Table(self._j_table.select(fields),
> &gt; self._t_env)
> &gt; &amp;gt; &amp;amp;nbsp; File
> &gt; "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py",
> &gt; &amp;gt; line 1286, in __call__
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; answer, self.gateway_client,
> self.target_id,
> &gt; self.name)
> &gt; &amp;gt; &amp;amp;nbsp; File
> &gt; &amp;gt;
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> &gt; line
> &gt; &amp;gt; 147, in deco
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; return f(*a, **kw)
> &gt; &amp;gt; &amp;amp;nbsp; File
> &gt; "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
> &gt; &amp;gt; line 328, in get_return_value
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; format(target_id, ".", name),
> value)
> &gt; &amp;gt; py4j.protocol.Py4JJavaError: An error occurred while calling
> &gt; o26.select.
> &gt; &amp;gt; : org.apache.flink.table.api.ValidationException: A tumble
> window
> &gt; expects
> &gt; &amp;gt; a size value literal.
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.operations.utils.AggregateOperationFactory.getAsValueLiteral(AggregateOperationFactory.java:384)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateAndCreateTumbleWindow(AggregateOperationFactory.java:302)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:236)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt; sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt; java.lang.reflect.Method.invoke(Method.java:498)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> at
> &gt; java.lang.Thread.run(Thread.java:748)
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; def register_rides_source(st_env):
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; source_ddl = \
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; """
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; create table source1(
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;id int,
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;time1 timestamp,
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;type string,
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;WATERMARK FOR
> time1 as time1 -
> &gt; INTERVAL '2' SECOND
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;) with (
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 'connector.type' = 'kafka',
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 'update-mode' = 'append',
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 'connector.topic' = 'tp1',
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> 'connector.properties.bootstrap.servers' =
> &gt; 'localhost:9092',
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> 'connector.properties.zookeeper.connect' =
> &gt; 'localhost:2181',
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 'format.type' = 'json',
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 'format.derive-schema' =
> 'true',
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 'connector.version' =
> 'universal'
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; """
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; st_env.sql_update(source_ddl)
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;&amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; s_env =
> &gt; &amp;gt; StreamExecutionEnvironment.get_execution_environment()
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; s_env.set_parallelism(1)
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; st_env =
> StreamTableEnvironment.create(s_env)
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; register_rides_source(st_env)
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; register_rides_sink(st_env)
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; st_env.from_path("source1")\
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> &gt; &amp;gt; .window(Tumble.over("2.secends").on("time1").alias("w")) \
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> .group_by("w") \
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> .select(" id,&amp;amp;nbsp;
> &gt; time1 , time1 ")\
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
> .insert_into("sink1")
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;&amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
> st_env.execute("2-from_kafka_to_kafka")
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; 代码如上
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
> &gt; &amp;gt; 发件人:
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; "user-zh"
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; <
> &gt; &amp;gt; acqua.csq@gmail.com&amp;amp;gt;;
> &gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年7月10日(星期五) 上午9:17
> &gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org
> &amp;amp;gt;;
> &gt; &amp;gt;
> &gt; &amp;gt; 主题:&amp;amp;nbsp;Re: pyflink1.11.0window
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; 琴师你好,
> &gt; &amp;gt;
> &gt; &amp;gt; 你的source ddl里有指定time1为 time attribute吗?
> &gt; &amp;gt; create table source1(
> &gt; &amp;gt;
> &gt;
> &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
> id
> &gt; int,
> &gt; &amp;gt;
> &gt;
> &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
> &gt; time1 timestamp,
> &gt; &amp;gt;
> &gt;
> &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
> type
> &gt; string,
> &gt; &amp;gt;
> &gt;
> &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
> &gt; WATERMARK FOR time1 as time1 -
> &gt; &amp;gt; INTERVAL '2' SECOND
> &gt; &amp;gt; ) with (...)
> &gt; &amp;gt;
> &gt; &amp;gt; 奇怪的不朽琴师 <1129656513@qq.com&amp;amp;gt; 于2020年7月10日周五
> 上午8:43写道:
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt;
> ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
> &gt; &amp;gt; &amp;amp;gt; 发件人:
> &gt; &amp;gt;
> &gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
> &gt; &amp;gt; "奇怪的不朽琴师"
> &gt; &amp;gt;
> &gt;
> &amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
> &gt; &amp;gt; <
> &gt; &amp;gt; &amp;amp;gt; 1129656513@qq.com&amp;amp;amp;gt;;
> &gt; &amp;gt; &amp;amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年7月9日(星期四) 下午5:08
> &gt; &amp;gt; &amp;amp;gt; 收件人:&amp;amp;amp;nbsp;"godfrey he"<
> godfreyhe@gmail.com
> &gt; &amp;amp;amp;gt;;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 主题:&amp;amp;amp;nbsp;pyflink1.11.0window
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 你好:
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;我在使用pyflink1.11版本时,window开窗仍会报错
> &gt; &amp;gt; &amp;amp;gt; :
> org.apache.flink.table.api.ValidationException: A group
> &gt; window
> &gt; &amp;gt; expects a
> &gt; &amp;gt; &amp;amp;gt; time attribute for grouping in a stream
> environment.
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; 请问这个问题没有修复么?或者是我使用的方式不对,如果是使用不对,能提供一个正确的案例么?
> &gt; &amp;gt; &amp;amp;gt; 代码如下
> &gt; &amp;gt; &amp;amp;gt; 谢谢
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; def from_kafka_to_kafka_demo():
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; s_env =
> &gt; &amp;gt; &amp;amp;gt;
> StreamExecutionEnvironment.get_execution_environment()
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> s_env.set_parallelism(1)
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; # use
> blink table planner
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; st_env =
> &gt; StreamTableEnvironment.create(s_env)
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; #
> register source and sink
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> register_rides_source(st_env)
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> register_rides_sink(st_env)
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> st_env.from_path("source1")\
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt;
> .window(Tumble.over("1.secends").on("time1").alias("w")) \
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; .group_by("w") \
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; .select(" id,&amp;amp;amp;nbsp;
> &gt; &amp;gt; time1 , time1 ")\
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; .insert_into("sink1")
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; st_env.execute("2-from_kafka_to_kafka")
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; def register_rides_source(st_env):
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> source_ddl = \
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; '''
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; create
> table source1(
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; id int,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;time1 timestamp,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;type string
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;) with (
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> 'connector.type' = 'kafka',
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> 'update-mode' = 'append',
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> 'connector.topic' = 'tp1',
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; 'connector.properties.bootstrap.servers' =
> &gt; &amp;gt; 'localhost:9092'
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;)
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; '''
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> st_env.sql_update(source_ddl)
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; def register_rides_sink(st_env):
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; sink_ddl
> = \
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; '''
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; create
> table sink1(
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; id int,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;time1 timestamp,
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;time2 timestamp
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;) with (
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> 'connector.type' = 'kafka',
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> 'update-mode' = 'append',
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> 'connector.topic' = 'tp3',
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &gt; 'connector.properties.bootstrap.servers' =
> &gt; &amp;gt; 'localhost:9092'
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> &amp;amp;amp;nbsp;)
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; '''
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> st_env.sql_update(sink_ddl)
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; if __name__ == '__main__':
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
> from_kafka_to_kafka_demo()
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt;
> &gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;