You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martijn Visser <ma...@apache.org> on 2023/02/08 10:36:01 UTC

Re: Need help how to use Table API to join two Kafka streams

Moving the Dev mailing list to BCC and adding the User ML in this thread

On Wed, Feb 8, 2023 at 8:08 AM Amir Hossein Sharifzadeh <
amirsharifzadeh@gmail.com> wrote:

> Thanks. If you look at the code, I am defining/creating the table as:
>
> create_kafka_source_ddl = """
>         CREATE TABLE payment_msg(
>             createTime VARCHAR,
>             orderId BIGINT,
>             payAmount DOUBLE,
>             payPlatform INT,
>             provinceId INT
>         ) WITH (
>           'connector' = 'kafka',
>           'topic' = 'payment_msg',
>           'properties.bootstrap.servers' = 'kafka:9092',
>           'properties.group.id' = 'test_3',
>           'scan.startup.mode' = 'latest-offset',
>           'format' = 'json'
>         )
>         """
>
> t_env.execute_sql(create_kafka_source_ddl)
>
> Is this enough to satisfy the sink table?
>
> Can you show me an example of the query output to the sink table?
>
> Thanks.
>
> Best,
>
> Amir
>
>
> On Wed, Feb 8, 2023 at 1:48 AM Leonard Xu <xb...@gmail.com> wrote:
>
> >
> > > 1) *First*: In *payment_msg_proccessing.py
> > > <
> >
> https://apache.googlesource.com/flink-playgrounds/+/HEAD/pyflink-walkthrough/payment_msg_proccessing.py
> > >*
> > > code,
> > > I want to run a simple query on Kafka stream (payment_msg table)
> without
> > > insertion data into the sink table (es_sink here) and do some data
> > > processing. (In my project, I won’t insert any data).  So, is it
> possible
> > > to run the query (queries) on sources (streams) *without insertion data
> > > into other tables*?
> > No, you need to define at least one sink table to receive the query
> result.
> >
> >
> > > 2) *Second*: How can I iterate over results, and print data in the
> > output?
> > > For example, I wrote this simple query: *table_result =
> > > t_env.execute_sql(“select provinceId, payAmount from payment_msg”)
> *then
> > > after:
> > >
> > > with table_result.collect() as results:
> > >    for result in results:
> > >        print(result)
> >
> > The result here is the representation of the statement execution result,
> > NOT the query output.
> > You can define a print connector table[1] as your sink table, and insert
> > into the query output to the sink table to achieve your goal.
> >
> > Btw, you should send email to user mailing list for user QA, we discuss
> > community development in dev mailing list.
> >
> > Best,
> > Leonard
>