You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2021/08/26 12:34:00 UTC
[jira] [Created] (FLINK-24003) Lookback mode doesn't work when
mixing use of Python Table API and Python DataStream API
Dian Fu created FLINK-24003:
-------------------------------
Summary: Lookback mode doesn't work when mixing use of Python Table API and Python DataStream API
Key: FLINK-24003
URL: https://issues.apache.org/jira/browse/FLINK-24003
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.14.0
Reporter: Dian Fu
Assignee: Huang Xingbo
Fix For: 1.14.0
For the following program:
{code}
import logging
import time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
from pyflink.table import StreamTableEnvironment, DataTypes, Schema
def test_chaining():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().get_configuration().set_boolean("python.operator-chaining.enabled", False)
# 1. create source Table
t_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1000000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
)
""")
# 2. create sink Table
t_env.execute_sql("""
CREATE TABLE print (
id BIGINT,
data STRING,
flag STRING
) WITH (
'connector' = 'blackhole'
)
""")
t_env.execute_sql("""
CREATE TABLE print_2 (
id BIGINT,
data STRING,
flag STRING
) WITH (
'connector' = 'blackhole'
)
""")
# 3. query from source table and perform calculations
# create a Table from a Table API query:
source_table = t_env.from_path("datagen")
ds = t_env.to_append_stream(
source_table,
Types.ROW([Types.INT(), Types.STRING()]))
ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
ds2 = ds.map(lambda i: (i[0], i[1][2:]))
class MyCoMapFunction(CoMapFunction):
def map1(self, value):
print('hahah')
return value
def map2(self, value):
print('hahah')
return value
ds3 = ds1.connect(ds2).map(MyCoMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))
ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()]))
ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
.map(lambda i: i,
output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING()]))
schema = Schema.new_builder() \
.column("f0", DataTypes.BIGINT()) \
.column("f1", DataTypes.STRING()) \
.column("f2", DataTypes.STRING()) \
.build()
result_table_3 = t_env.from_data_stream(ds4, schema)
statement_set = t_env.create_statement_set()
statement_set.add_insert("print", result_table_3)
result_table_4 = t_env.from_data_stream(ds5, schema)
statement_set.add_insert("print_2", result_table_4)
statement_set.execute().wait()
if __name__ == "__main__":
start_ts = time.time()
test_chaining()
end_ts = time.time()
print("--- %s seconds ---" % (end_ts - start_ts))
{code}
Lookback mode doesn't work.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)