You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by eric hoffmann <si...@gmail.com> on 2021/03/03 09:47:13 UTC

Re: Processing-time temporal join is not supported yet.

Hi Leonard,
Thx for your reply,
Not problem to help on the JIRA topic,
In my situation, in a full sql env, what will be the best workaround to
enrich stream of data from a kafka topics with statical data based on id?
i know how to do t in stream.
eric

Le sam. 27 févr. 2021 à 05:15, Leonard Xu <xb...@gmail.com> a écrit :

> Hi, Eric
>
> Firstly FileSystemTableSource doe not implement LookupTableSource which
> means we cannot directly lookup a Filesystem table.
>
> In FLINK-19830, we plan to support Processing-time temporal join any
> table/views by lookup the data in join operator state which scanned from
> the filesystem table, but as the issue described: join processing for
> left stream doesn't wait for the complete snapshot of temporal table, this
> may mislead users in production environment.
> Eg: your s3 table has 1000 records, but the join operator does not know
> when all records has been arrived, the correlation maybe incorrect, thus we
> disable this feature.
>
> I think we can  implement LookupTableSource for  FileSystemTableSource
> currently, after that, we can directly lookup a Filesystem table, the
> implementation will be similar to Hive table where we cache all data of the
> files and then lookup the cache.  Could you help create an JIRA ticket for
> this?
>
>
> Best,
> Leonard
>
>
> 在 2021年2月26日,23:41,Matthias Pohl <ma...@ververica.com> 写道:
>
> Hi Eric,
> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
> thread. Maybe, he has a workaround for your case.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-19830
>
> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <si...@gmail.com>
> wrote:
>
>> Hello
>> Working with flink 1.12.1 i read in the doc that Processing-time temporal
>> join is supported for kv like join but when i try i get a:
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Processing-time temporal join is not supported yet.
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>>         at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>>
>> my query:
>>
>> SELECT e.id
>> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON
>> e.id = r.id
>>
>> my s3 table:
>>
>> CREATE TABLE s3Table(id STRING,
>> test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>>
>> my kafka table:
>>
>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
>> proctime AS PROCTIME())
>>
>>       WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
>> 127.0.0.1:9092','properties.group.id
>> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')
>>
>>
>
>

Re: Processing-time temporal join is not supported yet.

Posted by Leonard Xu <xb...@gmail.com>.
Sorry, I mean you can create a UDTF where you can cache data from your files and then enrich your stream with LATERAL TABLE grammar,

BTW, you can reference FileSystemLookupFunction.java[1]. If we plan to support Lookup for filesystem, we should use this function too.

Best,
Leonard

[1] https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java

 

> 在 2021年3月4日,19:26,eric hoffmann <si...@gmail.com> 写道:
> 
> Thx Leonard,
> by UDF you mean a custom table source on s3?
> 
> Le jeu. 4 mars 2021 à 05:31, Leonard Xu <xbjtdcq@gmail.com <ma...@gmail.com>> a écrit :
> Hi, Eric
> 
>> what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id?
> Currently you can put your statical data in Hive/JDBC/HBase which supports lookup the data in full table env as a workaround,.
> You can also write a UDF which caches the s3 files that can be used to enrich your stream data.
> 
> Best,
> Leonard
> 
>> 
>> 
>> Le sam. 27 févr. 2021 à 05:15, Leonard Xu <xbjtdcq@gmail.com <ma...@gmail.com>> a écrit :
>> Hi, Eric
>> 
>> Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table.
>> 
>> In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment.  
>> Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature.
>> 
>> I think we can  implement LookupTableSource for  FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache.  Could you help create an JIRA ticket for this?
>> 
>> 
>> Best,
>> Leonard 
>> 
>> 
>>> 在 2021年2月26日,23:41,Matthias Pohl <matthias@ververica.com <ma...@ververica.com>> 写道:
>>> 
>>> Hi Eric,
>>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.
>>> 
>>> Best,
>>> Matthias
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-19830 <https://issues.apache.org/jira/browse/FLINK-19830>
>>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <sigfrid.hoffmann@gmail.com <ma...@gmail.com>> wrote:
>>> Hello
>>> Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>>>         at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>>> 
>>> my query:
>>> 
>>> SELECT e.id <http://e.id/>, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.id <http://e.id/> = r.id <http://r.id/>
>>> 
>>> my s3 table:
>>> 
>>> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json <>')
>>> 
>>> my kafka table:
>>> 
>>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
>>>       WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092 <http://127.0.0.1:9092/>','properties.group.id <http://properties.group.id/>'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')
>>> 
>>> 
>> 
> 


Re: Processing-time temporal join is not supported yet.

Posted by eric hoffmann <si...@gmail.com>.
Thx Leonard,
by UDF you mean a custom table source on s3?

Le jeu. 4 mars 2021 à 05:31, Leonard Xu <xb...@gmail.com> a écrit :

> Hi, Eric
>
> what will be the best workaround to enrich stream of data from a kafka
> topics with statical data based on id?
>
> Currently you can put your statical data in Hive/JDBC/HBase which supports
> lookup the data in full table env as a workaround,.
> You can also write a UDF which caches the s3 files that can be used to
> enrich your stream data.
>
> Best,
> Leonard
>
>
>
> Le sam. 27 févr. 2021 à 05:15, Leonard Xu <xb...@gmail.com> a écrit :
>
>> Hi, Eric
>>
>> Firstly FileSystemTableSource doe not implement LookupTableSource which
>> means we cannot directly lookup a Filesystem table.
>>
>> In FLINK-19830, we plan to support Processing-time temporal join any
>> table/views by lookup the data in join operator state which scanned from
>> the filesystem table, but as the issue described: join processing for
>> left stream doesn't wait for the complete snapshot of temporal table, this
>> may mislead users in production environment.
>> Eg: your s3 table has 1000 records, but the join operator does not know
>> when all records has been arrived, the correlation maybe incorrect, thus we
>> disable this feature.
>>
>> I think we can  implement LookupTableSource for  FileSystemTableSource
>> currently, after that, we can directly lookup a Filesystem table, the
>> implementation will be similar to Hive table where we cache all data of the
>> files and then lookup the cache.  Could you help create an JIRA ticket for
>> this?
>>
>>
>> Best,
>> Leonard
>>
>>
>> 在 2021年2月26日,23:41,Matthias Pohl <ma...@ververica.com> 写道:
>>
>> Hi Eric,
>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
>> thread. Maybe, he has a workaround for your case.
>>
>> Best,
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19830
>>
>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <
>> sigfrid.hoffmann@gmail.com> wrote:
>>
>>> Hello
>>> Working with flink 1.12.1 i read in the doc that Processing-time
>>> temporal join is supported for kv like join but when i try i get a:
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Processing-time temporal join is not supported yet.
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>>         at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>>>         at
>>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>>>
>>> my query:
>>>
>>> SELECT e.id
>>> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON
>>> e.id = r.id
>>>
>>> my s3 table:
>>>
>>> CREATE TABLE s3Table(id STRING,
>>> test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>>>
>>> my kafka table:
>>>
>>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
>>> proctime AS PROCTIME())
>>>
>>>       WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
>>> 127.0.0.1:9092','properties.group.id
>>> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')
>>>
>>>
>>
>>
>

Re: Processing-time temporal join is not supported yet.

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Eric

> what will be the best workaround to enrich stream of data from a kafka topics with statical data based on id?
Currently you can put your statical data in Hive/JDBC/HBase which supports lookup the data in full table env as a workaround,.
You can also write a UDF which caches the s3 files that can be used to enrich your stream data.

Best,
Leonard

> 
> 
> Le sam. 27 févr. 2021 à 05:15, Leonard Xu <xbjtdcq@gmail.com <ma...@gmail.com>> a écrit :
> Hi, Eric
> 
> Firstly FileSystemTableSource doe not implement LookupTableSource which means we cannot directly lookup a Filesystem table.
> 
> In FLINK-19830, we plan to support Processing-time temporal join any table/views by lookup the data in join operator state which scanned from the filesystem table, but as the issue described: join processing for left stream doesn't wait for the complete snapshot of temporal table, this may mislead users in production environment.  
> Eg: your s3 table has 1000 records, but the join operator does not know when all records has been arrived, the correlation maybe incorrect, thus we disable this feature.
> 
> I think we can  implement LookupTableSource for  FileSystemTableSource currently, after that, we can directly lookup a Filesystem table, the implementation will be similar to Hive table where we cache all data of the files and then lookup the cache.  Could you help create an JIRA ticket for this?
> 
> 
> Best,
> Leonard 
> 
> 
>> 在 2021年2月26日,23:41,Matthias Pohl <matthias@ververica.com <ma...@ververica.com>> 写道:
>> 
>> Hi Eric,
>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the thread. Maybe, he has a workaround for your case.
>> 
>> Best,
>> Matthias
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-19830 <https://issues.apache.org/jira/browse/FLINK-19830>
>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann <sigfrid.hoffmann@gmail.com <ma...@gmail.com>> wrote:
>> Hello
>> Working with flink 1.12.1 i read in the doc that Processing-time temporal join is supported for kv like join but when i try i get a:
>> 
>> Exception in thread "main" org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>>         at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>>         at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>>         at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>> 
>> my query:
>> 
>> SELECT e.id <http://e.id/>, r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime AS r ON e.id <http://e.id/> = r.id <http://r.id/>
>> 
>> my s3 table:
>> 
>> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>       WITH ('connector'='filesystem','path'='s3a://fs/','format'='json <>')
>> 
>> my kafka table:
>> 
>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT, proctime AS PROCTIME())
>>       WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='127.0.0.1:9092 <http://127.0.0.1:9092/>','properties.group.id <http://properties.group.id/>'='mygroup','format'='json','scan.startup.mode'='group-offsets', 'properties.enable.auto.commit'='false')
>> 
>> 
>