You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2020/07/10 09:48:13 UTC

Table API jobs migration to Flink 1.11

Hi to all,
I was trying to update my legacy code to Flink 1.11. Before I was using a
BatchTableEnv and now I've tried to use the following:

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();

Unfortunately in the StreamTableEnvironmentImpl code there's :

if (!settings.isStreamingMode()) {
    throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use
TableEnvironment.");
}

What should I do here?

Thanks in advance,
Flavio

Re: Table API jobs migration to Flink 1.11

Posted by Flavio Pompermaier <po...@okkam.it>.
Thanks, that was definitely helpful!

On Mon, Jul 13, 2020 at 4:39 PM Jark Wu <im...@gmail.com> wrote:

> You can set string-based configuration on
> `tEnv.getConfig.getConfiguration.setString(..)` to replace them.
> Maybe you can try pipeline.default-kryo-serializers [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers
>
> On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> And what about the env.registerTypeWithKryoSerializer?
>> Now to create the table environment I don't use the ExecutionEnvironment
>> anymore..how can I register those serializers?
>> For example I used to run
>> env.registerTypeWithKryoSerializer(DateTime.class,
>> JodaDateTimeSerializer.class);
>>
>> Best,
>> Flavio
>>
>> On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> tableEnv.registerTableSource is deprecated in order to migrate to use
>>> DDL and the new connector interface (i.e. FLIP-95 [1]).
>>> You may need to implement a `ScanTableSource` that uses
>>> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>>>
>>> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>>> Ok..just one last thing: to use my TableSource I use the deprecated
>>>> API registerTableSource:
>>>>
>>>> tableEnv.registerTableSource("MySourceDataset", tableSource);
>>>>
>>>> The javadoc says to use executeSql but this requires some extra steps
>>>> (that are not mentioned in the documentation).
>>>> Do I have to create a TableFactory, right? How do I register it? Is
>>>> there an example somewhere?
>>>>
>>>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <im...@gmail.com> wrote:
>>>>
>>>>> I agree with you @Flavio Pompermaier <po...@okkam.it> , the
>>>>> exception message definitely should be improved.
>>>>> We created a similar issue a long time before
>>>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
>>>>> might be complicated.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <po...@okkam.it>
>>>>> wrote:
>>>>>
>>>>>> You're right Jark..sorry I didn't see the typo. The backticks are
>>>>>> also mandatory.
>>>>>> Maybe the exception message could be more meaningful and specify the
>>>>>> token that caused the error instead of a general "SQL parse failed.
>>>>>> Non-query expression encountered in illegal context".
>>>>>>
>>>>>> Thanks a lot for the support,
>>>>>> Flavio
>>>>>>
>>>>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <im...@gmail.com> wrote:
>>>>>>
>>>>>>> A typo of "INSERTO"? Try this?
>>>>>>>
>>>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>>>>>> MySourceDataset");
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>>> Now I'm able to run my code but there's something I don't
>>>>>>>> understand: what is the difference between the following two?
>>>>>>>>
>>>>>>>>      //common code
>>>>>>>>      final CsvTableSink outSink = new
>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>      tableEnv.registerTableSink("out", dsFields,
>>>>>>>> myInputformat.getFieldTypes(), outSink);
>>>>>>>>
>>>>>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>>>>>                                      --> this works
>>>>>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>>>>>    MySourceDataset");   --> this does not work
>>>>>>>>
>>>>>>>> The second option fails with the following exception:
>>>>>>>>
>>>>>>>> Exception in thread "main"
>>>>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>>>>>>> expression encountered in illegal context
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> hi Flavio,
>>>>>>>>>
>>>>>>>>> `BatchTableSource` can only be used for old planner.
>>>>>>>>> if you want to use Blink planner to run batch job,
>>>>>>>>> your table source should implement `StreamTableSource`
>>>>>>>>> and `isBounded` method return true.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Godfrey
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>>>>>
>>>>>>>>>> Is it correct to do something like this?
>>>>>>>>>>
>>>>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>>>>>       @Override
>>>>>>>>>>       public TableSchema getTableSchema() {
>>>>>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>>>>>       }
>>>>>>>>>>       @Override
>>>>>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment
>>>>>>>>>> execEnv) {
>>>>>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>>>>>       }
>>>>>>>>>>     };
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>>> How can you reuse InputFormat to write a TableSource? I think
>>>>>>>>>>> that at least initially this could be the simplest way to test the
>>>>>>>>>>> migration..then I could try yo implement the new Table Source interface
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> hi Flavio,
>>>>>>>>>>>> Only old planner supports BatchTableEnvironment (which can
>>>>>>>>>>>> convert to/from DataSet),
>>>>>>>>>>>> while Blink planner in batch mode only
>>>>>>>>>>>> support TableEnvironment. Because Blink planner
>>>>>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>>>>>
>>>>>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>>>>>> (InputFormat can be reused),
>>>>>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>>>>>> source[1]
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Godfrey
>>>>>>>>>>>>
>>>>>>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五
>>>>>>>>>>>> 下午8:54写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy
>>>>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv anymore so I
>>>>>>>>>>>>> can't call createInput.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>>>>>> TableSource instead?
>>>>>>>>>>>>>
>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>>>>>> ft).finish();
>>>>>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>>>>>> dsFields));
>>>>>>>>>>>>>     CsvTableSink outSink = new
>>>>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>>>>>     env.execute();
>>>>>>>>>>>>>   }
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> DataStream. We do not support converting batch Table programs
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> DataStream yet.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> A following code should work:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>>>>>> > Hi to all,
>>>>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before
>>>>>>>>>>>>>> I was
>>>>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the
>>>>>>>>>>>>>> following:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code
>>>>>>>>>>>>>> there's :
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>>>>>> >     throw new TableException(
>>>>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>>>>>> please use
>>>>>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>>>>>> > }
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > What should I do here?
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thanks in advance,
>>>>>>>>>>>>>> > Flavio
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>
>>>>
>>

Re: Table API jobs migration to Flink 1.11

Posted by Jark Wu <im...@gmail.com>.
You can set string-based configuration on
`tEnv.getConfig.getConfiguration.setString(..)` to replace them.
Maybe you can try pipeline.default-kryo-serializers [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers

On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier <po...@okkam.it>
wrote:

> And what about the env.registerTypeWithKryoSerializer?
> Now to create the table environment I don't use the ExecutionEnvironment
> anymore..how can I register those serializers?
> For example I used to run
> env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class);
>
> Best,
> Flavio
>
> On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> tableEnv.registerTableSource is deprecated in order to migrate to use DDL
>> and the new connector interface (i.e. FLIP-95 [1]).
>> You may need to implement a `ScanTableSource` that uses
>> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>>
>> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Ok..just one last thing: to use my TableSource I use the deprecated
>>> API registerTableSource:
>>>
>>> tableEnv.registerTableSource("MySourceDataset", tableSource);
>>>
>>> The javadoc says to use executeSql but this requires some extra steps
>>> (that are not mentioned in the documentation).
>>> Do I have to create a TableFactory, right? How do I register it? Is
>>> there an example somewhere?
>>>
>>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> I agree with you @Flavio Pompermaier <po...@okkam.it> , the
>>>> exception message definitely should be improved.
>>>> We created a similar issue a long time before
>>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
>>>> might be complicated.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <po...@okkam.it>
>>>> wrote:
>>>>
>>>>> You're right Jark..sorry I didn't see the typo. The backticks are also
>>>>> mandatory.
>>>>> Maybe the exception message could be more meaningful and specify the
>>>>> token that caused the error instead of a general "SQL parse failed.
>>>>> Non-query expression encountered in illegal context".
>>>>>
>>>>> Thanks a lot for the support,
>>>>> Flavio
>>>>>
>>>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <im...@gmail.com> wrote:
>>>>>
>>>>>> A typo of "INSERTO"? Try this?
>>>>>>
>>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>>>>> MySourceDataset");
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> Now I'm able to run my code but there's something I don't
>>>>>>> understand: what is the difference between the following two?
>>>>>>>
>>>>>>>      //common code
>>>>>>>      final CsvTableSink outSink = new
>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>      tableEnv.registerTableSink("out", dsFields,
>>>>>>> myInputformat.getFieldTypes(), outSink);
>>>>>>>
>>>>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>>>>                                      --> this works
>>>>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>>>>    MySourceDataset");   --> this does not work
>>>>>>>
>>>>>>> The second option fails with the following exception:
>>>>>>>
>>>>>>> Exception in thread "main"
>>>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>>>>>> expression encountered in illegal context
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>>>>> at
>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> hi Flavio,
>>>>>>>>
>>>>>>>> `BatchTableSource` can only be used for old planner.
>>>>>>>> if you want to use Blink planner to run batch job,
>>>>>>>> your table source should implement `StreamTableSource`
>>>>>>>> and `isBounded` method return true.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Godfrey
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>>>>
>>>>>>>>> Is it correct to do something like this?
>>>>>>>>>
>>>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>>>>       @Override
>>>>>>>>>       public TableSchema getTableSchema() {
>>>>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>>>>       }
>>>>>>>>>       @Override
>>>>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv)
>>>>>>>>> {
>>>>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>>>>       }
>>>>>>>>>     };
>>>>>>>>>
>>>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> How can you reuse InputFormat to write a TableSource? I think
>>>>>>>>>> that at least initially this could be the simplest way to test the
>>>>>>>>>> migration..then I could try yo implement the new Table Source interface
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> hi Flavio,
>>>>>>>>>>> Only old planner supports BatchTableEnvironment (which can
>>>>>>>>>>> convert to/from DataSet),
>>>>>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>>>>>> Because Blink planner
>>>>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>>>>
>>>>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>>>>> (InputFormat can be reused),
>>>>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>>>>> source[1]
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Godfrey
>>>>>>>>>>>
>>>>>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五
>>>>>>>>>>> 下午8:54写道:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy
>>>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv anymore so I
>>>>>>>>>>>> can't call createInput.
>>>>>>>>>>>>
>>>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>>>>> TableSource instead?
>>>>>>>>>>>>
>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>>>>> ft).finish();
>>>>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>>>>> dsFields));
>>>>>>>>>>>>     CsvTableSink outSink = new
>>>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>>>>     env.execute();
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>>>>>> DataStream. We do not support converting batch Table programs
>>>>>>>>>>>>> to
>>>>>>>>>>>>> DataStream yet.
>>>>>>>>>>>>>
>>>>>>>>>>>>> A following code should work:
>>>>>>>>>>>>>
>>>>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>>
>>>>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>>>>> > Hi to all,
>>>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before
>>>>>>>>>>>>> I was
>>>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the
>>>>>>>>>>>>> following:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's
>>>>>>>>>>>>> :
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>>>>> >     throw new TableException(
>>>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>>>>> please use
>>>>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>>>>> > }
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > What should I do here?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thanks in advance,
>>>>>>>>>>>>> > Flavio
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>>>
>

Re: Table API jobs migration to Flink 1.11

Posted by Flavio Pompermaier <po...@okkam.it>.
And what about the env.registerTypeWithKryoSerializer?
Now to create the table environment I don't use the ExecutionEnvironment
anymore..how can I register those serializers?
For example I used to run
env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class);

Best,
Flavio

On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <im...@gmail.com> wrote:

> Hi Flavio,
>
> tableEnv.registerTableSource is deprecated in order to migrate to use DDL
> and the new connector interface (i.e. FLIP-95 [1]).
> You may need to implement a `ScanTableSource` that uses
> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>
> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Ok..just one last thing: to use my TableSource I use the deprecated
>> API registerTableSource:
>>
>> tableEnv.registerTableSource("MySourceDataset", tableSource);
>>
>> The javadoc says to use executeSql but this requires some extra steps
>> (that are not mentioned in the documentation).
>> Do I have to create a TableFactory, right? How do I register it? Is there
>> an example somewhere?
>>
>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <im...@gmail.com> wrote:
>>
>>> I agree with you @Flavio Pompermaier <po...@okkam.it> , the
>>> exception message definitely should be improved.
>>> We created a similar issue a long time before
>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
>>> might be complicated.
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>>> You're right Jark..sorry I didn't see the typo. The backticks are also
>>>> mandatory.
>>>> Maybe the exception message could be more meaningful and specify the
>>>> token that caused the error instead of a general "SQL parse failed.
>>>> Non-query expression encountered in illegal context".
>>>>
>>>> Thanks a lot for the support,
>>>> Flavio
>>>>
>>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <im...@gmail.com> wrote:
>>>>
>>>>> A typo of "INSERTO"? Try this?
>>>>>
>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <po...@okkam.it>
>>>>> wrote:
>>>>>
>>>>>> Now I'm able to run my code but there's something I don't understand:
>>>>>> what is the difference between the following two?
>>>>>>
>>>>>>      //common code
>>>>>>      final CsvTableSink outSink = new
>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>      tableEnv.registerTableSink("out", dsFields,
>>>>>> myInputformat.getFieldTypes(), outSink);
>>>>>>
>>>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>>>                                    --> this works
>>>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>>>    MySourceDataset");   --> this does not work
>>>>>>
>>>>>> The second option fails with the following exception:
>>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>>>>> expression encountered in illegal context
>>>>>> at
>>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>>>> at
>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>>>> at
>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hi Flavio,
>>>>>>>
>>>>>>> `BatchTableSource` can only be used for old planner.
>>>>>>> if you want to use Blink planner to run batch job,
>>>>>>> your table source should implement `StreamTableSource`
>>>>>>> and `isBounded` method return true.
>>>>>>>
>>>>>>> Best,
>>>>>>> Godfrey
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>>>
>>>>>>>> Is it correct to do something like this?
>>>>>>>>
>>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>>>       @Override
>>>>>>>>       public TableSchema getTableSchema() {
>>>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>>>       }
>>>>>>>>       @Override
>>>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>>>       }
>>>>>>>>     };
>>>>>>>>
>>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> How can you reuse InputFormat to write a TableSource? I think that
>>>>>>>>> at least initially this could be the simplest way to test the
>>>>>>>>> migration..then I could try yo implement the new Table Source interface
>>>>>>>>>
>>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> hi Flavio,
>>>>>>>>>> Only old planner supports BatchTableEnvironment (which can
>>>>>>>>>> convert to/from DataSet),
>>>>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>>>>> Because Blink planner
>>>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>>>
>>>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>>>> (InputFormat can be reused),
>>>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>>>> source[1]
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Godfrey
>>>>>>>>>>
>>>>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>>>>>>
>>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy
>>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv anymore so I
>>>>>>>>>>> can't call createInput.
>>>>>>>>>>>
>>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>>>> TableSource instead?
>>>>>>>>>>>
>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>>>> ft).finish();
>>>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>>>> dsFields));
>>>>>>>>>>>     CsvTableSink outSink = new
>>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>>>     env.execute();
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>>>>>>> DataStream yet.
>>>>>>>>>>>>
>>>>>>>>>>>> A following code should work:
>>>>>>>>>>>>
>>>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>
>>>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>>
>>>>>>>>>>>> Dawid
>>>>>>>>>>>>
>>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>>>> > Hi to all,
>>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I
>>>>>>>>>>>> was
>>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>>>>>>> >
>>>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>> >
>>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>>>>>>> >
>>>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>>>> >     throw new TableException(
>>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>>>> please use
>>>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>>>> > }
>>>>>>>>>>>> >
>>>>>>>>>>>> > What should I do here?
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks in advance,
>>>>>>>>>>>> > Flavio
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>
>>

Re: Table API jobs migration to Flink 1.11

Posted by Jark Wu <im...@gmail.com>.
Hi Flavio,

tableEnv.registerTableSource is deprecated in order to migrate to use DDL
and the new connector interface (i.e. FLIP-95 [1]).
You may need to implement a `ScanTableSource` that uses
`InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html

On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <po...@okkam.it>
wrote:

> Ok..just one last thing: to use my TableSource I use the deprecated
> API registerTableSource:
>
> tableEnv.registerTableSource("MySourceDataset", tableSource);
>
> The javadoc says to use executeSql but this requires some extra steps
> (that are not mentioned in the documentation).
> Do I have to create a TableFactory, right? How do I register it? Is there
> an example somewhere?
>
> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <im...@gmail.com> wrote:
>
>> I agree with you @Flavio Pompermaier <po...@okkam.it> , the
>> exception message definitely should be improved.
>> We created a similar issue a long time before
>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might
>> be complicated.
>>
>> Best,
>> Jark
>>
>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> You're right Jark..sorry I didn't see the typo. The backticks are also
>>> mandatory.
>>> Maybe the exception message could be more meaningful and specify the
>>> token that caused the error instead of a general "SQL parse failed.
>>> Non-query expression encountered in illegal context".
>>>
>>> Thanks a lot for the support,
>>> Flavio
>>>
>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> A typo of "INSERTO"? Try this?
>>>>
>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <po...@okkam.it>
>>>> wrote:
>>>>
>>>>> Now I'm able to run my code but there's something I don't understand:
>>>>> what is the difference between the following two?
>>>>>
>>>>>      //common code
>>>>>      final CsvTableSink outSink = new
>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>      tableEnv.registerTableSink("out", dsFields,
>>>>> myInputformat.getFieldTypes(), outSink);
>>>>>
>>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>>                                    --> this works
>>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>>    MySourceDataset");   --> this does not work
>>>>>
>>>>> The second option fails with the following exception:
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>>>> expression encountered in illegal context
>>>>> at
>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>>> at
>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hi Flavio,
>>>>>>
>>>>>> `BatchTableSource` can only be used for old planner.
>>>>>> if you want to use Blink planner to run batch job,
>>>>>> your table source should implement `StreamTableSource`
>>>>>> and `isBounded` method return true.
>>>>>>
>>>>>> Best,
>>>>>> Godfrey
>>>>>>
>>>>>>
>>>>>>
>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>>
>>>>>>> Is it correct to do something like this?
>>>>>>>
>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>>       @Override
>>>>>>>       public TableSchema getTableSchema() {
>>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>>       }
>>>>>>>       @Override
>>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>>       }
>>>>>>>     };
>>>>>>>
>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>>> How can you reuse InputFormat to write a TableSource? I think that
>>>>>>>> at least initially this could be the simplest way to test the
>>>>>>>> migration..then I could try yo implement the new Table Source interface
>>>>>>>>
>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> hi Flavio,
>>>>>>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>>>>>>> to/from DataSet),
>>>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>>>> Because Blink planner
>>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>>
>>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>>> (InputFormat can be reused),
>>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>>> source[1]
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Godfrey
>>>>>>>>>
>>>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>>>>>
>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy
>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv anymore so I
>>>>>>>>>> can't call createInput.
>>>>>>>>>>
>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>>> TableSource instead?
>>>>>>>>>>
>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>>> ft).finish();
>>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>>> dsFields));
>>>>>>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>>>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>>     env.execute();
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>>>>>> DataStream yet.
>>>>>>>>>>>
>>>>>>>>>>> A following code should work:
>>>>>>>>>>>
>>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>
>>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Dawid
>>>>>>>>>>>
>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>>> > Hi to all,
>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I
>>>>>>>>>>> was
>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>>>>>> >
>>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>> >
>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>>>>>> >
>>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>>> >     throw new TableException(
>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>>> please use
>>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>>> > }
>>>>>>>>>>> >
>>>>>>>>>>> > What should I do here?
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks in advance,
>>>>>>>>>>> > Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>
>

Re: Table API jobs migration to Flink 1.11

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok..just one last thing: to use my TableSource I use the deprecated
API registerTableSource:

tableEnv.registerTableSource("MySourceDataset", tableSource);

The javadoc says to use executeSql but this requires some extra steps (that
are not mentioned in the documentation).
Do I have to create a TableFactory, right? How do I register it? Is there
an example somewhere?

On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <im...@gmail.com> wrote:

> I agree with you @Flavio Pompermaier <po...@okkam.it> , the
> exception message definitely should be improved.
> We created a similar issue a long time before
> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might
> be complicated.
>
> Best,
> Jark
>
> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> You're right Jark..sorry I didn't see the typo. The backticks are also
>> mandatory.
>> Maybe the exception message could be more meaningful and specify the
>> token that caused the error instead of a general "SQL parse failed.
>> Non-query expression encountered in illegal context".
>>
>> Thanks a lot for the support,
>> Flavio
>>
>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <im...@gmail.com> wrote:
>>
>>> A typo of "INSERTO"? Try this?
>>>
>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>>> Now I'm able to run my code but there's something I don't understand:
>>>> what is the difference between the following two?
>>>>
>>>>      //common code
>>>>      final CsvTableSink outSink = new
>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>      tableEnv.registerTableSink("out", dsFields,
>>>> myInputformat.getFieldTypes(), outSink);
>>>>
>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>                                  --> this works
>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>    MySourceDataset");   --> this does not work
>>>>
>>>> The second option fails with the following exception:
>>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>>> expression encountered in illegal context
>>>> at
>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com> wrote:
>>>>
>>>>> hi Flavio,
>>>>>
>>>>> `BatchTableSource` can only be used for old planner.
>>>>> if you want to use Blink planner to run batch job,
>>>>> your table source should implement `StreamTableSource`
>>>>> and `isBounded` method return true.
>>>>>
>>>>> Best,
>>>>> Godfrey
>>>>>
>>>>>
>>>>>
>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>
>>>>>> Is it correct to do something like this?
>>>>>>
>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>       @Override
>>>>>>       public TableSchema getTableSchema() {
>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>       }
>>>>>>       @Override
>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>       }
>>>>>>     };
>>>>>>
>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> How can you reuse InputFormat to write a TableSource? I think that
>>>>>>> at least initially this could be the simplest way to test the
>>>>>>> migration..then I could try yo implement the new Table Source interface
>>>>>>>
>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> hi Flavio,
>>>>>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>>>>>> to/from DataSet),
>>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>>> Because Blink planner
>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>
>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>> (InputFormat can be reused),
>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>> source[1]
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Godfrey
>>>>>>>>
>>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>>>>
>>>>>>>>> Thanks but I still can't understand how to migrate my legacy code.
>>>>>>>>> The main problem is that I can't create a BatchTableEnv anymore so I can't
>>>>>>>>> call createInput.
>>>>>>>>>
>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>> TableSource instead?
>>>>>>>>>
>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>> ft).finish();
>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>> dsFields));
>>>>>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>     env.execute();
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>>>>> DataStream yet.
>>>>>>>>>>
>>>>>>>>>> A following code should work:
>>>>>>>>>>
>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>
>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Dawid
>>>>>>>>>>
>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>> > Hi to all,
>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I
>>>>>>>>>> was
>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>>>>> >
>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>> >
>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>>>>> >
>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>> >     throw new TableException(
>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>> please use
>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>> > }
>>>>>>>>>> >
>>>>>>>>>> > What should I do here?
>>>>>>>>>> >
>>>>>>>>>> > Thanks in advance,
>>>>>>>>>> > Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>

Re: Table API jobs migration to Flink 1.11

Posted by Jark Wu <im...@gmail.com>.
I agree with you @Flavio Pompermaier <po...@okkam.it> , the exception
message definitely should be improved.
We created a similar issue a long time before
https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might be
complicated.

Best,
Jark

On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <po...@okkam.it>
wrote:

> You're right Jark..sorry I didn't see the typo. The backticks are also
> mandatory.
> Maybe the exception message could be more meaningful and specify the token
> that caused the error instead of a general "SQL parse failed. Non-query
> expression encountered in illegal context".
>
> Thanks a lot for the support,
> Flavio
>
> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <im...@gmail.com> wrote:
>
>> A typo of "INSERTO"? Try this?
>>
>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>
>> Best,
>> Jark
>>
>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Now I'm able to run my code but there's something I don't understand:
>>> what is the difference between the following two?
>>>
>>>      //common code
>>>      final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>> "\t", 1, WriteMode.OVERWRITE);
>>>      tableEnv.registerTableSink("out", dsFields,
>>> myInputformat.getFieldTypes(), outSink);
>>>
>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>                                  --> this works
>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>    MySourceDataset");   --> this does not work
>>>
>>> The second option fails with the following exception:
>>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>> expression encountered in illegal context
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>
>>> Best,
>>> Flavio
>>>
>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com> wrote:
>>>
>>>> hi Flavio,
>>>>
>>>> `BatchTableSource` can only be used for old planner.
>>>> if you want to use Blink planner to run batch job,
>>>> your table source should implement `StreamTableSource`
>>>> and `isBounded` method return true.
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>>
>>>>
>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>
>>>>> Is it correct to do something like this?
>>>>>
>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>       @Override
>>>>>       public TableSchema getTableSchema() {
>>>>>         return new TableSchema(dsFields, ft);
>>>>>       }
>>>>>       @Override
>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>>>         return execEnv.createInput(myInputformat);
>>>>>       }
>>>>>     };
>>>>>
>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> How can you reuse InputFormat to write a TableSource? I think that at
>>>>>> least initially this could be the simplest way to test the migration..then
>>>>>> I could try yo implement the new Table Source interface
>>>>>>
>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hi Flavio,
>>>>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>>>>> to/from DataSet),
>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>> Because Blink planner
>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>> DataStream), instead of DataSet.
>>>>>>>
>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>> (InputFormat can be reused),
>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>> source[1]
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>
>>>>>>> Best,
>>>>>>> Godfrey
>>>>>>>
>>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>>>
>>>>>>>> Thanks but I still can't understand how to migrate my legacy code.
>>>>>>>> The main problem is that I can't create a BatchTableEnv anymore so I can't
>>>>>>>> call createInput.
>>>>>>>>
>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>> TableSource instead?
>>>>>>>>
>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>     ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>> ft).finish();
>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>> dsFields));
>>>>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>     env.execute();
>>>>>>>>   }
>>>>>>>>
>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>>>
>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>>>> DataStream yet.
>>>>>>>>>
>>>>>>>>> A following code should work:
>>>>>>>>>
>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>
>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Dawid
>>>>>>>>>
>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>> > Hi to all,
>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>>>> >
>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>> >
>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>>>> >
>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>> >     throw new TableException(
>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>> please use
>>>>>>>>> > TableEnvironment.");
>>>>>>>>> > }
>>>>>>>>> >
>>>>>>>>> > What should I do here?
>>>>>>>>> >
>>>>>>>>> > Thanks in advance,
>>>>>>>>> > Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>

Re: Table API jobs migration to Flink 1.11

Posted by Flavio Pompermaier <po...@okkam.it>.
You're right Jark..sorry I didn't see the typo. The backticks are also
mandatory.
Maybe the exception message could be more meaningful and specify the token
that caused the error instead of a general "SQL parse failed. Non-query
expression encountered in illegal context".

Thanks a lot for the support,
Flavio

On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <im...@gmail.com> wrote:

> A typo of "INSERTO"? Try this?
>
> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>
> Best,
> Jark
>
> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Now I'm able to run my code but there's something I don't understand:
>> what is the difference between the following two?
>>
>>      //common code
>>      final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>> "\t", 1, WriteMode.OVERWRITE);
>>      tableEnv.registerTableSink("out", dsFields,
>> myInputformat.getFieldTypes(), outSink);
>>
>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>                                --> this works
>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>    MySourceDataset");   --> this does not work
>>
>> The second option fails with the following exception:
>>
>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>> SQL parse failed. Non-query expression encountered in illegal context
>> at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>
>> Best,
>> Flavio
>>
>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com> wrote:
>>
>>> hi Flavio,
>>>
>>> `BatchTableSource` can only be used for old planner.
>>> if you want to use Blink planner to run batch job,
>>> your table source should implement `StreamTableSource`
>>> and `isBounded` method return true.
>>>
>>> Best,
>>> Godfrey
>>>
>>>
>>>
>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>
>>>> Is it correct to do something like this?
>>>>
>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>       @Override
>>>>       public TableSchema getTableSchema() {
>>>>         return new TableSchema(dsFields, ft);
>>>>       }
>>>>       @Override
>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>>         return execEnv.createInput(myInputformat);
>>>>       }
>>>>     };
>>>>
>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> How can you reuse InputFormat to write a TableSource? I think that at
>>>>> least initially this could be the simplest way to test the migration..then
>>>>> I could try yo implement the new Table Source interface
>>>>>
>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hi Flavio,
>>>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>>>> to/from DataSet),
>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>> Because Blink planner
>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>> DataStream), instead of DataSet.
>>>>>>
>>>>>> one approach is you can migrate them to TableSource instead
>>>>>> (InputFormat can be reused),
>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>> source[1]
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>
>>>>>> Best,
>>>>>> Godfrey
>>>>>>
>>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>>
>>>>>>> Thanks but I still can't understand how to migrate my legacy code.
>>>>>>> The main problem is that I can't create a BatchTableEnv anymore so I can't
>>>>>>> call createInput.
>>>>>>>
>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>> TableSource instead?
>>>>>>>
>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>     ExecutionEnvironment env =
>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>> ft).finish();
>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>> dsFields));
>>>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>     env.execute();
>>>>>>>   }
>>>>>>>
>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>>
>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>>> DataStream yet.
>>>>>>>>
>>>>>>>> A following code should work:
>>>>>>>>
>>>>>>>> EnvironmentSettings settings =
>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>
>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Dawid
>>>>>>>>
>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>> > Hi to all,
>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>>> >
>>>>>>>> > EnvironmentSettings settings =
>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>> >
>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>>> >
>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>> >     throw new TableException(
>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now, please
>>>>>>>> use
>>>>>>>> > TableEnvironment.");
>>>>>>>> > }
>>>>>>>> >
>>>>>>>> > What should I do here?
>>>>>>>> >
>>>>>>>> > Thanks in advance,
>>>>>>>> > Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>

Re: Table API jobs migration to Flink 1.11

Posted by Jark Wu <im...@gmail.com>.
A typo of "INSERTO"? Try this?

tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");

Best,
Jark

On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <po...@okkam.it>
wrote:

> Now I'm able to run my code but there's something I don't understand: what
> is the difference between the following two?
>
>      //common code
>      final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
> "\t", 1, WriteMode.OVERWRITE);
>      tableEnv.registerTableSink("out", dsFields,
> myInputformat.getFieldTypes(), outSink);
>
>    - tableEnv.from("MySourceDataset").executeInsert("out");
>                                --> this works
>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>    MySourceDataset");   --> this does not work
>
> The second option fails with the following exception:
>
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Non-query expression encountered in illegal context
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>
> Best,
> Flavio
>
> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com> wrote:
>
>> hi Flavio,
>>
>> `BatchTableSource` can only be used for old planner.
>> if you want to use Blink planner to run batch job,
>> your table source should implement `StreamTableSource`
>> and `isBounded` method return true.
>>
>> Best,
>> Godfrey
>>
>>
>>
>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>
>>> Is it correct to do something like this?
>>>
>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>       @Override
>>>       public TableSchema getTableSchema() {
>>>         return new TableSchema(dsFields, ft);
>>>       }
>>>       @Override
>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>         return execEnv.createInput(myInputformat);
>>>       }
>>>     };
>>>
>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>>> How can you reuse InputFormat to write a TableSource? I think that at
>>>> least initially this could be the simplest way to test the migration..then
>>>> I could try yo implement the new Table Source interface
>>>>
>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com> wrote:
>>>>
>>>>> hi Flavio,
>>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>>> to/from DataSet),
>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>> Because Blink planner
>>>>> convert the batch queries to Transformation (corresponding to
>>>>> DataStream), instead of DataSet.
>>>>>
>>>>> one approach is you can migrate them to TableSource instead
>>>>> (InputFormat can be reused),
>>>>> but TableSource will be deprecated later. you can try new table
>>>>> source[1]
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>
>>>>> Best,
>>>>> Godfrey
>>>>>
>>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>
>>>>>> Thanks but I still can't understand how to migrate my legacy code.
>>>>>> The main problem is that I can't create a BatchTableEnv anymore so I can't
>>>>>> call createInput.
>>>>>>
>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>> TableSource instead?
>>>>>>
>>>>>> public static void main(String[] args) throws Exception {
>>>>>>     ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>     BatchTableEnvironment btEnv =
>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>> ft).finish();
>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>     env.execute();
>>>>>>   }
>>>>>>
>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>
>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>> DataStream yet.
>>>>>>>
>>>>>>> A following code should work:
>>>>>>>
>>>>>>> EnvironmentSettings settings =
>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>
>>>>>>> TableEnvironment.create(settings);
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dawid
>>>>>>>
>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>> > Hi to all,
>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>> >
>>>>>>> > EnvironmentSettings settings =
>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>> >
>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>> >
>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>> >     throw new TableException(
>>>>>>> > "StreamTableEnvironment can not run in batch mode for now, please
>>>>>>> use
>>>>>>> > TableEnvironment.");
>>>>>>> > }
>>>>>>> >
>>>>>>> > What should I do here?
>>>>>>> >
>>>>>>> > Thanks in advance,
>>>>>>> > Flavio
>>>>>>>
>>>>>>>
>>>>>>

Re: Table API jobs migration to Flink 1.11

Posted by Flavio Pompermaier <po...@okkam.it>.
Now I'm able to run my code but there's something I don't understand: what
is the difference between the following two?

     //common code
     final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
"\t", 1, WriteMode.OVERWRITE);
     tableEnv.registerTableSink("out", dsFields,
myInputformat.getFieldTypes(), outSink);

   - tableEnv.from("MySourceDataset").executeInsert("out");
                             --> this works
   - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
   MySourceDataset");   --> this does not work

The second option fails with the following exception:

Exception in thread "main" org.apache.flink.table.api.SqlParserException:
SQL parse failed. Non-query expression encountered in illegal context
at
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)

Best,
Flavio

On Sun, Jul 12, 2020 at 5:04 PM godfrey he <go...@gmail.com> wrote:

> hi Flavio,
>
> `BatchTableSource` can only be used for old planner.
> if you want to use Blink planner to run batch job,
> your table source should implement `StreamTableSource`
> and `isBounded` method return true.
>
> Best,
> Godfrey
>
>
>
> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>
>> Is it correct to do something like this?
>>
>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>       @Override
>>       public TableSchema getTableSchema() {
>>         return new TableSchema(dsFields, ft);
>>       }
>>       @Override
>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>         return execEnv.createInput(myInputformat);
>>       }
>>     };
>>
>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> How can you reuse InputFormat to write a TableSource? I think that at
>>> least initially this could be the simplest way to test the migration..then
>>> I could try yo implement the new Table Source interface
>>>
>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com> wrote:
>>>
>>>> hi Flavio,
>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>> to/from DataSet),
>>>> while Blink planner in batch mode only support TableEnvironment.
>>>> Because Blink planner
>>>> convert the batch queries to Transformation (corresponding to
>>>> DataStream), instead of DataSet.
>>>>
>>>> one approach is you can migrate them to TableSource instead
>>>> (InputFormat can be reused),
>>>> but TableSource will be deprecated later. you can try new table
>>>> source[1]
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>
>>>>> Thanks but I still can't understand how to migrate my legacy code. The
>>>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>>>> call createInput.
>>>>>
>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>> TableSource instead?
>>>>>
>>>>> public static void main(String[] args) throws Exception {
>>>>>     ExecutionEnvironment env =
>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>     BatchTableEnvironment btEnv =
>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>> ft).finish();
>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>     env.execute();
>>>>>   }
>>>>>
>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>> dwysakowicz@apache.org> wrote:
>>>>>
>>>>>> You should be good with using the TableEnvironment. The
>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>> DataStream yet.
>>>>>>
>>>>>> A following code should work:
>>>>>>
>>>>>> EnvironmentSettings settings =
>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>
>>>>>> TableEnvironment.create(settings);
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>> > Hi to all,
>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>> >
>>>>>> > EnvironmentSettings settings =
>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>> >
>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>> >
>>>>>> > if (!settings.isStreamingMode()) {
>>>>>> >     throw new TableException(
>>>>>> > "StreamTableEnvironment can not run in batch mode for now, please
>>>>>> use
>>>>>> > TableEnvironment.");
>>>>>> > }
>>>>>> >
>>>>>> > What should I do here?
>>>>>> >
>>>>>> > Thanks in advance,
>>>>>> > Flavio
>>>>>>
>>>>>>
>>>>>

Re: Table API jobs migration to Flink 1.11

Posted by godfrey he <go...@gmail.com>.
hi Flavio,

`BatchTableSource` can only be used for old planner.
if you want to use Blink planner to run batch job,
your table source should implement `StreamTableSource`
and `isBounded` method return true.

Best,
Godfrey



Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午10:32写道:

> Is it correct to do something like this?
>
> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>       @Override
>       public TableSchema getTableSchema() {
>         return new TableSchema(dsFields, ft);
>       }
>       @Override
>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>         return execEnv.createInput(myInputformat);
>       }
>     };
>
> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> How can you reuse InputFormat to write a TableSource? I think that at
>> least initially this could be the simplest way to test the migration..then
>> I could try yo implement the new Table Source interface
>>
>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com> wrote:
>>
>>> hi Flavio,
>>> Only old planner supports BatchTableEnvironment (which can convert
>>> to/from DataSet),
>>> while Blink planner in batch mode only support TableEnvironment. Because
>>> Blink planner
>>> convert the batch queries to Transformation (corresponding to
>>> DataStream), instead of DataSet.
>>>
>>> one approach is you can migrate them to TableSource instead (InputFormat
>>> can be reused),
>>> but TableSource will be deprecated later. you can try new table source[1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>
>>>> Thanks but I still can't understand how to migrate my legacy code. The
>>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>>> call createInput.
>>>>
>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>> TableSource instead?
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>>     ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>     BatchTableEnvironment btEnv =
>>>> TableEnvironment.getTableEnvironment(env);
>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>> ft).finish();
>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>>>> 1, WriteMode.OVERWRITE);
>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>     env.execute();
>>>>   }
>>>>
>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>> dwysakowicz@apache.org> wrote:
>>>>
>>>>> You should be good with using the TableEnvironment. The
>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>> DataStream. We do not support converting batch Table programs to
>>>>> DataStream yet.
>>>>>
>>>>> A following code should work:
>>>>>
>>>>> EnvironmentSettings settings =
>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>
>>>>> TableEnvironment.create(settings);
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>> > Hi to all,
>>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>> >
>>>>> > EnvironmentSettings settings =
>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>> >
>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>> >
>>>>> > if (!settings.isStreamingMode()) {
>>>>> >     throw new TableException(
>>>>> > "StreamTableEnvironment can not run in batch mode for now, please use
>>>>> > TableEnvironment.");
>>>>> > }
>>>>> >
>>>>> > What should I do here?
>>>>> >
>>>>> > Thanks in advance,
>>>>> > Flavio
>>>>>
>>>>>
>>>>

Re: Table API jobs migration to Flink 1.11

Posted by Flavio Pompermaier <po...@okkam.it>.
Is it correct to do something like this?

TableSource<Row> myTableSource = new BatchTableSource<Row>() {
      @Override
      public TableSchema getTableSchema() {
        return new TableSchema(dsFields, ft);
      }
      @Override
      public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
        return execEnv.createInput(myInputformat);
      }
    };

On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <po...@okkam.it>
wrote:

> How can you reuse InputFormat to write a TableSource? I think that at
> least initially this could be the simplest way to test the migration..then
> I could try yo implement the new Table Source interface
>
> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com> wrote:
>
>> hi Flavio,
>> Only old planner supports BatchTableEnvironment (which can convert
>> to/from DataSet),
>> while Blink planner in batch mode only support TableEnvironment. Because
>> Blink planner
>> convert the batch queries to Transformation (corresponding to
>> DataStream), instead of DataSet.
>>
>> one approach is you can migrate them to TableSource instead (InputFormat
>> can be reused),
>> but TableSource will be deprecated later. you can try new table source[1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>
>>> Thanks but I still can't understand how to migrate my legacy code. The
>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>> call createInput.
>>>
>>> Is there a way to reuse InputFormats? Should I migrate them to
>>> TableSource instead?
>>>
>>> public static void main(String[] args) throws Exception {
>>>     ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>     BatchTableEnvironment btEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>> ft).finish();
>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>>> 1, WriteMode.OVERWRITE);
>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>     env.execute();
>>>   }
>>>
>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>> dwysakowicz@apache.org> wrote:
>>>
>>>> You should be good with using the TableEnvironment. The
>>>> StreamTableEnvironment is needed only if you want to convert to
>>>> DataStream. We do not support converting batch Table programs to
>>>> DataStream yet.
>>>>
>>>> A following code should work:
>>>>
>>>> EnvironmentSettings settings =
>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>
>>>> TableEnvironment.create(settings);
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>> > Hi to all,
>>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>> >
>>>> > EnvironmentSettings settings =
>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>> >
>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>> >
>>>> > if (!settings.isStreamingMode()) {
>>>> >     throw new TableException(
>>>> > "StreamTableEnvironment can not run in batch mode for now, please use
>>>> > TableEnvironment.");
>>>> > }
>>>> >
>>>> > What should I do here?
>>>> >
>>>> > Thanks in advance,
>>>> > Flavio
>>>>
>>>>
>>>

Re: Table API jobs migration to Flink 1.11

Posted by Flavio Pompermaier <po...@okkam.it>.
How can you reuse InputFormat to write a TableSource? I think that at least
initially this could be the simplest way to test the migration..then I
could try yo implement the new Table Source interface

On Fri, Jul 10, 2020 at 3:38 PM godfrey he <go...@gmail.com> wrote:

> hi Flavio,
> Only old planner supports BatchTableEnvironment (which can convert to/from
> DataSet),
> while Blink planner in batch mode only support TableEnvironment. Because
> Blink planner
> convert the batch queries to Transformation (corresponding to DataStream),
> instead of DataSet.
>
> one approach is you can migrate them to TableSource instead (InputFormat
> can be reused),
> but TableSource will be deprecated later. you can try new table source[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>
> Best,
> Godfrey
>
> Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>
>> Thanks but I still can't understand how to migrate my legacy code. The
>> main problem is that I can't create a BatchTableEnv anymore so I can't
>> call createInput.
>>
>> Is there a way to reuse InputFormats? Should I migrate them to
>> TableSource instead?
>>
>> public static void main(String[] args) throws Exception {
>>     ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     BatchTableEnvironment btEnv =
>> TableEnvironment.getTableEnvironment(env);
>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>> ft).finish();
>>     DataSet<Row> rows = env.createInput(myInputformat);
>>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>> 1, WriteMode.OVERWRITE);
>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>     env.execute();
>>   }
>>
>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> You should be good with using the TableEnvironment. The
>>> StreamTableEnvironment is needed only if you want to convert to
>>> DataStream. We do not support converting batch Table programs to
>>> DataStream yet.
>>>
>>> A following code should work:
>>>
>>> EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>
>>> TableEnvironment.create(settings);
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>> > using a BatchTableEnv and now I've tried to use the following:
>>> >
>>> > EnvironmentSettings settings =
>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>> >
>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>> >
>>> > if (!settings.isStreamingMode()) {
>>> >     throw new TableException(
>>> > "StreamTableEnvironment can not run in batch mode for now, please use
>>> > TableEnvironment.");
>>> > }
>>> >
>>> > What should I do here?
>>> >
>>> > Thanks in advance,
>>> > Flavio
>>>
>>>
>>

Re: Table API jobs migration to Flink 1.11

Posted by godfrey he <go...@gmail.com>.
hi Flavio,
Only old planner supports BatchTableEnvironment (which can convert to/from
DataSet),
while Blink planner in batch mode only support TableEnvironment. Because
Blink planner
convert the batch queries to Transformation (corresponding to DataStream),
instead of DataSet.

one approach is you can migrate them to TableSource instead (InputFormat
can be reused),
but TableSource will be deprecated later. you can try new table source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

Best,
Godfrey

Flavio Pompermaier <po...@okkam.it> 于2020年7月10日周五 下午8:54写道:

> Thanks but I still can't understand how to migrate my legacy code. The
> main problem is that I can't create a BatchTableEnv anymore so I can't
> call createInput.
>
> Is there a way to reuse InputFormats? Should I migrate them to TableSource
> instead?
>
> public static void main(String[] args) throws Exception {
>     ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     BatchTableEnvironment btEnv =
> TableEnvironment.getTableEnvironment(env);
>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
> ft).finish();
>     DataSet<Row> rows = env.createInput(myInputformat);
>     Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1,
> WriteMode.OVERWRITE);
>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>     env.execute();
>   }
>
> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> You should be good with using the TableEnvironment. The
>> StreamTableEnvironment is needed only if you want to convert to
>> DataStream. We do not support converting batch Table programs to
>> DataStream yet.
>>
>> A following code should work:
>>
>> EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inBatchMode().build();
>>
>> TableEnvironment.create(settings);
>>
>> Best,
>>
>> Dawid
>>
>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>> > Hi to all,
>> > I was trying to update my legacy code to Flink 1.11. Before I was
>> > using a BatchTableEnv and now I've tried to use the following:
>> >
>> > EnvironmentSettings settings =
>> > EnvironmentSettings.newInstance().inBatchMode().build();
>> >
>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>> >
>> > if (!settings.isStreamingMode()) {
>> >     throw new TableException(
>> > "StreamTableEnvironment can not run in batch mode for now, please use
>> > TableEnvironment.");
>> > }
>> >
>> > What should I do here?
>> >
>> > Thanks in advance,
>> > Flavio
>>
>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>

Re: Table API jobs migration to Flink 1.11

Posted by Flavio Pompermaier <po...@okkam.it>.
Thanks but I still can't understand how to migrate my legacy code. The main
problem is that I can't create a BatchTableEnv anymore so I can't
call createInput.

Is there a way to reuse InputFormats? Should I migrate them to TableSource
instead?

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env);
    MyInputFormat myInputformat =  new MyInputFormat(dsFields, ft).finish();
    DataSet<Row> rows = env.createInput(myInputformat);
    Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
    CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1,
WriteMode.OVERWRITE);
    btEnv.registerTableSink("out", dsFields, ft, outSink);
    btEnv.insertInto(table, "out", btEnv.queryConfig());
    env.execute();
  }

On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> You should be good with using the TableEnvironment. The
> StreamTableEnvironment is needed only if you want to convert to
> DataStream. We do not support converting batch Table programs to
> DataStream yet.
>
> A following code should work:
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>
> TableEnvironment.create(settings);
>
> Best,
>
> Dawid
>
> On 10/07/2020 11:48, Flavio Pompermaier wrote:
> > Hi to all,
> > I was trying to update my legacy code to Flink 1.11. Before I was
> > using a BatchTableEnv and now I've tried to use the following:
> >
> > EnvironmentSettings settings =
> > EnvironmentSettings.newInstance().inBatchMode().build();
> >
> > Unfortunately in the StreamTableEnvironmentImpl code there's :
> >
> > if (!settings.isStreamingMode()) {
> >     throw new TableException(
> > "StreamTableEnvironment can not run in batch mode for now, please use
> > TableEnvironment.");
> > }
> >
> > What should I do here?
> >
> > Thanks in advance,
> > Flavio
>
>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809

Re: Table API jobs migration to Flink 1.11

Posted by Dawid Wysakowicz <dw...@apache.org>.
You should be good with using the TableEnvironment. The
StreamTableEnvironment is needed only if you want to convert to
DataStream. We do not support converting batch Table programs to
DataStream yet.

A following code should work:

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();

TableEnvironment.create(settings);

Best,

Dawid

On 10/07/2020 11:48, Flavio Pompermaier wrote:
> Hi to all,
> I was trying to update my legacy code to Flink 1.11. Before I was
> using a BatchTableEnv and now I've tried to use the following:
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>
> Unfortunately in the StreamTableEnvironmentImpl code there's :
>
> if (!settings.isStreamingMode()) {
>     throw new TableException(
> "StreamTableEnvironment can not run in batch mode for now, please use
> TableEnvironment.");
> }
>
> What should I do here?
>
> Thanks in advance,
> Flavio