You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2020/07/14 14:48:59 UTC
Flink 1.11 test Parquet sink
Hi to all,
I'm trying to test write to parquet using the following code but I have an
error:
final TableEnvironment tableEnv =
DatalinksExecutionEnvironment.getBatchTableEnv();
final Table inputTable = tableEnv.fromValues(//
DataTypes.ROW(//
DataTypes.FIELD("col1", DataTypes.STRING()), //
DataTypes.FIELD("col2", DataTypes.STRING())//
), //
Row.of(1L, "Hello"), //
Row.of(2L, "Hello"), //
Row.of(3L, ""), //
Row.of(4L, "Ciao"));
tableEnv.createTemporaryView("ParquetDataset", inputTable);
tableEnv.executeSql(//
"CREATE TABLE `out` (\n" + //
"col1 STRING,\n" + //
"col2 STRING\n" + //
") WITH (\n" + //
" 'connector' = 'filesystem',\n" + //
// " 'format' = 'parquet',\n" + //
" 'update-mode' = 'append',\n" + //
" 'path' = 'file://" + TEST_FOLDER + "',\n" + //
" 'sink.shuffle-by-partition.enable' = 'true'\n" + //
")");
tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");
---------------------------------
Exception in thread "main" java.lang.AssertionError: Conversion to
relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
rel:
LogicalProject(col1=[$0], col2=[$1])
LogicalUnion(all=[true])
LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
LogicalValues(tuples=[[{ 0 }]])
LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
LogicalValues(tuples=[[{ 0 }]])
LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
LogicalValues(tuples=[[{ 0 }]])
LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
LogicalValues(tuples=[[{ 0 }]])
at
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
What is wrong with my code?
Re: Flink 1.11 test Parquet sink
Posted by Flavio Pompermaier <po...@okkam.it>.
I've just opened a ticket on JIRA:
https://issues.apache.org/jira/browse/FLINK-18608
On Wed, Jul 15, 2020 at 10:10 AM Dawid Wysakowicz <dw...@apache.org>
wrote:
> Hi,
>
> Unfortunately this is a bug.
>
> The problem is in CustomizedConvertRule#convertCast as it drops the
> requested nullability. It was fixed in master as part of FLINK-13784[1].
> Therefore the example works on master.
>
> Could you create a jira issue for 1.11 version? We could backport the
> corresponding part of FLINK-13784. As a workaround you can try using the
> values without registering it in the catalog, as the registration triggers
> the type check. (I know this is not perfect):
>
> final Table inputTable = tableEnv.fromValues(//
> DataTypes.ROW(//
> DataTypes.FIELD("col1", DataTypes.STRING()), //
> DataTypes.FIELD("col2", DataTypes.STRING())//
> ), ...);
> tableEnv.executeSql(//
> "CREATE TABLE `out` (\n" + //
> "col1 STRING,\n" + //
> "col2 STRING\n" + //
> ") WITH (\n" + //
> " 'connector' = 'filesystem',\n" + //
> // " 'format' = 'parquet',\n" + //
> " 'update-mode' = 'append',\n" + //
> " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
> " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
> ")");
>
> inputTable.executeInsert(`out`);
>
> As for the types SQL does not have LONG nor STRING types. Java's long is
> equivalent to SQL's BIGINT. STRING is only an alias for
> VARCHAR(Long.MAX_VALUE), which was added for improved usability so that you
> do not need to type the max long manually. For complete list of supported
> types see the docs[2]
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-13784
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html
>
> Best,
>
> Dawid
> On 15/07/2020 09:40, Flavio Pompermaier wrote:
>
> If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I
> change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1
> FROM ParquetDataset".
> If there is still a bug fill a proper JIRA ticket with the exact
> description of the problem..
>
> Just to conclude this thread there are 2 strange things I found:
>
> 1) Is LONG really not supported yet? If I use as output table LONG,STRING
> I get
> Exception in thread "main" java.lang.UnsupportedOperationException:
> class org.apache.calcite.sql.SqlIdentifier: LONG
> at org.apache.calcite.util.Util.needToImplement(Util.java:967)
>
> 2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?
>
> Best,
> Flavio
>
>
> On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <im...@gmail.com> wrote:
>
>> I think this might be a bug in `tableEnv.fromValues`.
>>
>> Could you try to remove the DataType parameter, and let the framework
>> derive the types?
>>
>> final Table inputTable = tableEnv.fromValues(
>> Row.of(1L, "Hello"), //
>> Row.of(2L, "Hello"), //
>> Row.of(3L, ""), //
>> Row.of(4L, "Ciao"));
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xb...@gmail.com> wrote:
>>
>>> Hi, Flavio
>>>
>>> I reproduced your issue, and I think it should be a bug. But I’m not
>>> sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner
>>> module shaded calcite.
>>>
>>> Maybe Danny can help explain more.
>>>
>>> CC: Danny
>>>
>>> Best
>>> Leonard Xu
>>>
>>> 在 2020年7月14日,23:06,Flavio Pompermaier <po...@okkam.it> 写道:
>>>
>>> If I use
>>>
>>> final Table inputTable = tableEnv.fromValues(
>>> DataTypes.ROW(
>>> DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
>>> DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>>> ), ..
>>> tableEnv.executeSql(//
>>> "CREATE TABLE `out` (" +
>>> "col1 STRING," +
>>> "col2 STRING" +
>>> ") WITH (...)
>>>
>>> the job works as expected but this is wrong IMHO
>>> because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
>>> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE
>>> should be "STRING NOT NULL" . Am I correct?
>>>
>>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>>> Sorry, obviously " 'format' = 'parquet'" + is without comment :D
>>>>
>>>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Hi to all,
>>>>> I'm trying to test write to parquet using the following code but I
>>>>> have an error:
>>>>>
>>>>> final TableEnvironment tableEnv =
>>>>> DatalinksExecutionEnvironment.getBatchTableEnv();
>>>>> final Table inputTable = tableEnv.fromValues(//
>>>>> DataTypes.ROW(//
>>>>> DataTypes.FIELD("col1", DataTypes.STRING()), //
>>>>> DataTypes.FIELD("col2", DataTypes.STRING())//
>>>>> ), //
>>>>> Row.of(1L, "Hello"), //
>>>>> Row.of(2L, "Hello"), //
>>>>> Row.of(3L, ""), //
>>>>> Row.of(4L, "Ciao"));
>>>>> tableEnv.createTemporaryView("ParquetDataset", inputTable);
>>>>> tableEnv.executeSql(//
>>>>> "CREATE TABLE `out` (\n" + //
>>>>> "col1 STRING,\n" + //
>>>>> "col2 STRING\n" + //
>>>>> ") WITH (\n" + //
>>>>> " 'connector' = 'filesystem',\n" + //
>>>>> // " 'format' = 'parquet',\n" + //
>>>>> " 'update-mode' = 'append',\n" + //
>>>>> " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>>>>> " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>>>>> ")");
>>>>>
>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>>>> ParquetDataset");
>>>>>
>>>>> ---------------------------------
>>>>>
>>>>> Exception in thread "main" java.lang.AssertionError: Conversion to
>>>>> relational algebra failed to preserve datatypes:
>>>>> validated type:
>>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
>>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
>>>>> converted type:
>>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
>>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>>>>> rel:
>>>>> LogicalProject(col1=[$0], col2=[$1])
>>>>> LogicalUnion(all=[true])
>>>>> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER
>>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>>>> "UTF-16LE"])
>>>>> LogicalValues(tuples=[[{ 0 }]])
>>>>> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER
>>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>>>> "UTF-16LE"])
>>>>> LogicalValues(tuples=[[{ 0 }]])
>>>>> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER
>>>>> SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>>>>> "UTF-16LE"])
>>>>> LogicalValues(tuples=[[{ 0 }]])
>>>>> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER
>>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
>>>>> "UTF-16LE"])
>>>>> LogicalValues(tuples=[[{ 0 }]])
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>
>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>>>> at
>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>>>>> at
>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>
>>>>>
>>>>> What is wrong with my code?
>>>>>
>>>>
>>>>
Re: Flink 1.11 test Parquet sink
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,
Unfortunately this is a bug.
The problem is in CustomizedConvertRule#convertCast as it drops the
requested nullability. It was fixed in master as part of FLINK-13784[1].
Therefore the example works on master.
Could you create a jira issue for 1.11 version? We could backport the
corresponding part of FLINK-13784. As a workaround you can try using the
values without registering it in the catalog, as the registration
triggers the type check. (I know this is not perfect):
final Table inputTable = tableEnv.fromValues(//
DataTypes.ROW(//
DataTypes.FIELD("col1", DataTypes.STRING()), //
DataTypes.FIELD("col2", DataTypes.STRING())//
), ...);
tableEnv.executeSql(//
"CREATE TABLE `out` (\n" + //
"col1 STRING,\n" + //
"col2 STRING\n" + //
") WITH (\n" + //
" 'connector' = 'filesystem',\n" + //
// " 'format' = 'parquet',\n" + //
" 'update-mode' = 'append',\n" + //
" 'path' = 'file://" + TEST_FOLDER + "',\n" + //
" 'sink.shuffle-by-partition.enable' = 'true'\n" + //
")");
inputTable.executeInsert(`out`);
As for the types SQL does not have LONG nor STRING types. Java's long is
equivalent to SQL's BIGINT. STRING is only an alias for
VARCHAR(Long.MAX_VALUE), which was added for improved usability so that
you do not need to type the max long manually. For complete list of
supported types see the docs[2]
[1] https://issues.apache.org/jira/browse/FLINK-13784
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html
Best,
Dawid
On 15/07/2020 09:40, Flavio Pompermaier wrote:
> If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if
> I change also the query to "INSERT INTO `out` SELECT CAST(f0 AS
> STRING) ,f1 FROM ParquetDataset".
> If there is still a bug fill a proper JIRA ticket with the exact
> description of the problem..
>
> Just to conclude this thread there are 2 strange things I found:
>
> 1) Is LONG really not supported yet? If I use as output table
> LONG,STRING I get
> Exception in thread "main"
> java.lang.UnsupportedOperationException: class
> org.apache.calcite.sql.SqlIdentifier: LONG
> at org.apache.calcite.util.Util.needToImplement(Util.java:967)
>
> 2) The new planner translates STRING to VARCHAR(2147483647). Is it
> correct?
>
> Best,
> Flavio
>
>
> On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <imjark@gmail.com
> <ma...@gmail.com>> wrote:
>
> I think this might be a bug in `tableEnv.fromValues`.
>
> Could you try to remove the DataType parameter, and let the
> framework derive the types?
>
> final Table inputTable = tableEnv.fromValues(
> Row.of(1L, "Hello"), //
> Row.of(2L, "Hello"), //
> Row.of(3L, ""), //
> Row.of(4L, "Ciao"));
>
> Best,
> Jark
>
>
> On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xbjtdcq@gmail.com
> <ma...@gmail.com>> wrote:
>
> Hi, Flavio
>
> I reproduced your issue, and I think it should be a bug. But
> I’m not sure it comes from Calcite or Flink shaded Calcite,
> Flink Table Planner module shaded calcite.
>
> Maybe Danny can help explain more.
>
> CC: Danny
>
> Best
> Leonard Xu
>
>> 在 2020年7月14日,23:06,Flavio Pompermaier <pompermaier@okkam.it
>> <ma...@okkam.it>> 写道:
>>
>> If I use
>>
>> final Table inputTable = tableEnv.fromValues(
>> DataTypes.ROW(
>> DataTypes.FIELD("col1",
>> DataTypes.STRING().notNull()),
>> DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>> ), ..
>> tableEnv.executeSql(//
>> "CREATE TABLE `out` (" +
>> "col1 STRING," +
>> "col2 STRING" +
>> ") WITH (...)
>>
>> the job works as expected but this is wrong IMHO
>> because DataTypes.STRING() = DataTypes.STRING().nullable() by
>> default.
>> If I have DataTypes.STRING().notNull() the type in the CREATE
>> TABLE should be "STRING NOT NULL" . Am I correct?
>>
>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier
>> <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>>
>> Sorry, obviously " 'format' = 'parquet'" + is
>> without comment :D
>>
>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier
>> <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>>
>> Hi to all,
>> I'm trying to test write to parquet using the
>> following code but I have an error:
>>
>> final TableEnvironment tableEnv =
>> DatalinksExecutionEnvironment.getBatchTableEnv();
>> final Table inputTable = tableEnv.fromValues(//
>> DataTypes.ROW(//
>> DataTypes.FIELD("col1",
>> DataTypes.STRING()), //
>> DataTypes.FIELD("col2", DataTypes.STRING())//
>> ), //
>> Row.of(1L, "Hello"), //
>> Row.of(2L, "Hello"), //
>> Row.of(3L, ""), //
>> Row.of(4L, "Ciao"));
>> tableEnv.createTemporaryView("ParquetDataset",
>> inputTable);
>> tableEnv.executeSql(//
>> "CREATE TABLE `out` (\n" + //
>> "col1 STRING,\n" + //
>> "col2 STRING\n" + //
>> ") WITH (\n" + //
>> " 'connector' = 'filesystem',\n" + //
>> // " 'format' = 'parquet',\n" + //
>> " 'update-mode' = 'append',\n" + //
>> " 'path' = 'file://" + TEST_FOLDER +
>> "',\n" + //
>> " 'sink.shuffle-by-partition.enable' =
>> 'true'\n" + //
>> ")");
>>
>> tableEnv.executeSql("INSERT INTO `out` SELECT *
>> FROM ParquetDataset");
>>
>> ---------------------------------
>>
>> Exception in thread "main" java.lang.AssertionError:
>> Conversion to relational algebra failed to preserve
>> datatypes:
>> validated type:
>> RecordType(VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE" col2) NOT NULL
>> converted type:
>> RecordType(VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE" NOT NULL col1, VARCHAR(2147483647)
>> CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>> rel:
>> LogicalProject(col1=[$0], col2=[$1])
>> LogicalUnion(all=[true])
>>
>> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647)
>> CHARACTER SET "UTF-16LE"],
>> col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER
>> SET "UTF-16LE"])
>> LogicalValues(tuples=[[{ 0 }]])
>>
>> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647)
>> CHARACTER SET "UTF-16LE"],
>> col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER
>> SET "UTF-16LE"])
>> LogicalValues(tuples=[[{ 0 }]])
>>
>> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647)
>> CHARACTER SET "UTF-16LE"],
>> col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>> LogicalValues(tuples=[[{ 0 }]])
>>
>> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647)
>> CHARACTER SET "UTF-16LE"],
>> col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER
>> SET "UTF-16LE"])
>> LogicalValues(tuples=[[{ 0 }]])
>>
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>
>>
>> What is wrong with my code?
>>
>>
>
Re: Flink 1.11 test Parquet sink
Posted by Flavio Pompermaier <po...@okkam.it>.
If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I
change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1
FROM ParquetDataset".
If there is still a bug fill a proper JIRA ticket with the exact
description of the problem..
Just to conclude this thread there are 2 strange things I found:
1) Is LONG really not supported yet? If I use as output table LONG,STRING I
get
Exception in thread "main" java.lang.UnsupportedOperationException:
class org.apache.calcite.sql.SqlIdentifier: LONG
at org.apache.calcite.util.Util.needToImplement(Util.java:967)
2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?
Best,
Flavio
On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <im...@gmail.com> wrote:
> I think this might be a bug in `tableEnv.fromValues`.
>
> Could you try to remove the DataType parameter, and let the framework
> derive the types?
>
> final Table inputTable = tableEnv.fromValues(
> Row.of(1L, "Hello"), //
> Row.of(2L, "Hello"), //
> Row.of(3L, ""), //
> Row.of(4L, "Ciao"));
>
> Best,
> Jark
>
>
> On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xb...@gmail.com> wrote:
>
>> Hi, Flavio
>>
>> I reproduced your issue, and I think it should be a bug. But I’m not sure
>> it comes from Calcite or Flink shaded Calcite, Flink Table Planner module
>> shaded calcite.
>>
>> Maybe Danny can help explain more.
>>
>> CC: Danny
>>
>> Best
>> Leonard Xu
>>
>> 在 2020年7月14日,23:06,Flavio Pompermaier <po...@okkam.it> 写道:
>>
>> If I use
>>
>> final Table inputTable = tableEnv.fromValues(
>> DataTypes.ROW(
>> DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
>> DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>> ), ..
>> tableEnv.executeSql(//
>> "CREATE TABLE `out` (" +
>> "col1 STRING," +
>> "col2 STRING" +
>> ") WITH (...)
>>
>> the job works as expected but this is wrong IMHO
>> because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
>> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE
>> should be "STRING NOT NULL" . Am I correct?
>>
>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Sorry, obviously " 'format' = 'parquet'" + is without comment :D
>>>
>>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>>> Hi to all,
>>>> I'm trying to test write to parquet using the following code but I have
>>>> an error:
>>>>
>>>> final TableEnvironment tableEnv =
>>>> DatalinksExecutionEnvironment.getBatchTableEnv();
>>>> final Table inputTable = tableEnv.fromValues(//
>>>> DataTypes.ROW(//
>>>> DataTypes.FIELD("col1", DataTypes.STRING()), //
>>>> DataTypes.FIELD("col2", DataTypes.STRING())//
>>>> ), //
>>>> Row.of(1L, "Hello"), //
>>>> Row.of(2L, "Hello"), //
>>>> Row.of(3L, ""), //
>>>> Row.of(4L, "Ciao"));
>>>> tableEnv.createTemporaryView("ParquetDataset", inputTable);
>>>> tableEnv.executeSql(//
>>>> "CREATE TABLE `out` (\n" + //
>>>> "col1 STRING,\n" + //
>>>> "col2 STRING\n" + //
>>>> ") WITH (\n" + //
>>>> " 'connector' = 'filesystem',\n" + //
>>>> // " 'format' = 'parquet',\n" + //
>>>> " 'update-mode' = 'append',\n" + //
>>>> " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>>>> " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>>>> ")");
>>>>
>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>>> ParquetDataset");
>>>>
>>>> ---------------------------------
>>>>
>>>> Exception in thread "main" java.lang.AssertionError: Conversion to
>>>> relational algebra failed to preserve datatypes:
>>>> validated type:
>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
>>>> converted type:
>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>>>> rel:
>>>> LogicalProject(col1=[$0], col2=[$1])
>>>> LogicalUnion(all=[true])
>>>> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"])
>>>> LogicalValues(tuples=[[{ 0 }]])
>>>> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"])
>>>> LogicalValues(tuples=[[{ 0 }]])
>>>> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"])
>>>> LogicalValues(tuples=[[{ 0 }]])
>>>> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
>>>> "UTF-16LE"])
>>>> LogicalValues(tuples=[[{ 0 }]])
>>>>
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>>>> at
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>
>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>>> at
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>
>>>>
>>>> What is wrong with my code?
>>>>
>>>
>>>
Re: Flink 1.11 test Parquet sink
Posted by Jark Wu <im...@gmail.com>.
I think this might be a bug in `tableEnv.fromValues`.
Could you try to remove the DataType parameter, and let the framework
derive the types?
final Table inputTable = tableEnv.fromValues(
Row.of(1L, "Hello"), //
Row.of(2L, "Hello"), //
Row.of(3L, ""), //
Row.of(4L, "Ciao"));
Best,
Jark
On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xb...@gmail.com> wrote:
> Hi, Flavio
>
> I reproduced your issue, and I think it should be a bug. But I’m not sure
> it comes from Calcite or Flink shaded Calcite, Flink Table Planner module
> shaded calcite.
>
> Maybe Danny can help explain more.
>
> CC: Danny
>
> Best
> Leonard Xu
>
> 在 2020年7月14日,23:06,Flavio Pompermaier <po...@okkam.it> 写道:
>
> If I use
>
> final Table inputTable = tableEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
> DataTypes.FIELD("col2", DataTypes.STRING().notNull())
> ), ..
> tableEnv.executeSql(//
> "CREATE TABLE `out` (" +
> "col1 STRING," +
> "col2 STRING" +
> ") WITH (...)
>
> the job works as expected but this is wrong IMHO
> because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should
> be "STRING NOT NULL" . Am I correct?
>
> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Sorry, obviously " 'format' = 'parquet'" + is without comment :D
>>
>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Hi to all,
>>> I'm trying to test write to parquet using the following code but I have
>>> an error:
>>>
>>> final TableEnvironment tableEnv =
>>> DatalinksExecutionEnvironment.getBatchTableEnv();
>>> final Table inputTable = tableEnv.fromValues(//
>>> DataTypes.ROW(//
>>> DataTypes.FIELD("col1", DataTypes.STRING()), //
>>> DataTypes.FIELD("col2", DataTypes.STRING())//
>>> ), //
>>> Row.of(1L, "Hello"), //
>>> Row.of(2L, "Hello"), //
>>> Row.of(3L, ""), //
>>> Row.of(4L, "Ciao"));
>>> tableEnv.createTemporaryView("ParquetDataset", inputTable);
>>> tableEnv.executeSql(//
>>> "CREATE TABLE `out` (\n" + //
>>> "col1 STRING,\n" + //
>>> "col2 STRING\n" + //
>>> ") WITH (\n" + //
>>> " 'connector' = 'filesystem',\n" + //
>>> // " 'format' = 'parquet',\n" + //
>>> " 'update-mode' = 'append',\n" + //
>>> " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>>> " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>>> ")");
>>>
>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>> ParquetDataset");
>>>
>>> ---------------------------------
>>>
>>> Exception in thread "main" java.lang.AssertionError: Conversion to
>>> relational algebra failed to preserve datatypes:
>>> validated type:
>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
>>> converted type:
>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>>> rel:
>>> LogicalProject(col1=[$0], col2=[$1])
>>> LogicalUnion(all=[true])
>>> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"])
>>> LogicalValues(tuples=[[{ 0 }]])
>>> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"])
>>> LogicalValues(tuples=[[{ 0 }]])
>>> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"])
>>> LogicalValues(tuples=[[{ 0 }]])
>>> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE"])
>>> LogicalValues(tuples=[[{ 0 }]])
>>>
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>> at
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>
>>>
>>> What is wrong with my code?
>>>
>>
>>
>
Re: Flink 1.11 test Parquet sink
Posted by Leonard Xu <xb...@gmail.com>.
Hi, Flavio
I reproduced your issue, and I think it should be a bug. But I’m not sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner module shaded calcite.
Maybe Danny can help explain more.
CC: Danny
Best
Leonard Xu
> 在 2020年7月14日,23:06,Flavio Pompermaier <po...@okkam.it> 写道:
>
> If I use
>
> final Table inputTable = tableEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
> DataTypes.FIELD("col2", DataTypes.STRING().notNull())
> ), ..
> tableEnv.executeSql(//
> "CREATE TABLE `out` (" +
> "col1 STRING," +
> "col2 STRING" +
> ") WITH (...)
>
> the job works as expected but this is wrong IMHO because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should be "STRING NOT NULL" . Am I correct?
>
> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
> Sorry, obviously " 'format' = 'parquet'" + is without comment :D
>
> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
> Hi to all,
> I'm trying to test write to parquet using the following code but I have an error:
>
> final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv();
> final Table inputTable = tableEnv.fromValues(//
> DataTypes.ROW(//
> DataTypes.FIELD("col1", DataTypes.STRING()), //
> DataTypes.FIELD("col2", DataTypes.STRING())//
> ), //
> Row.of(1L, "Hello"), //
> Row.of(2L, "Hello"), //
> Row.of(3L, ""), //
> Row.of(4L, "Ciao"));
> tableEnv.createTemporaryView("ParquetDataset", inputTable);
> tableEnv.executeSql(//
> "CREATE TABLE `out` (\n" + //
> "col1 STRING,\n" + //
> "col2 STRING\n" + //
> ") WITH (\n" + //
> " 'connector' = 'filesystem',\n" + //
> // " 'format' = 'parquet',\n" + //
> " 'update-mode' = 'append',\n" + //
> " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
> " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
> ")");
>
> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");
>
> ---------------------------------
>
> Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
> validated type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
> converted type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
> rel:
> LogicalProject(col1=[$0], col2=[$1])
> LogicalUnion(all=[true])
> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
> LogicalValues(tuples=[[{ 0 }]])
> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
> LogicalValues(tuples=[[{ 0 }]])
> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
> LogicalValues(tuples=[[{ 0 }]])
> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
> LogicalValues(tuples=[[{ 0 }]])
>
> at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>
>
> What is wrong with my code?
>
Re: Flink 1.11 test Parquet sink
Posted by Flavio Pompermaier <po...@okkam.it>.
If I use
final Table inputTable = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
DataTypes.FIELD("col2", DataTypes.STRING().notNull())
), ..
tableEnv.executeSql(//
"CREATE TABLE `out` (" +
"col1 STRING," +
"col2 STRING" +
") WITH (...)
the job works as expected but this is wrong IMHO because DataTypes.STRING()
= DataTypes.STRING().nullable() by default.
If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should
be "STRING NOT NULL" . Am I correct?
On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <po...@okkam.it>
wrote:
> Sorry, obviously " 'format' = 'parquet'" + is without comment :D
>
> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Hi to all,
>> I'm trying to test write to parquet using the following code but I have
>> an error:
>>
>> final TableEnvironment tableEnv =
>> DatalinksExecutionEnvironment.getBatchTableEnv();
>> final Table inputTable = tableEnv.fromValues(//
>> DataTypes.ROW(//
>> DataTypes.FIELD("col1", DataTypes.STRING()), //
>> DataTypes.FIELD("col2", DataTypes.STRING())//
>> ), //
>> Row.of(1L, "Hello"), //
>> Row.of(2L, "Hello"), //
>> Row.of(3L, ""), //
>> Row.of(4L, "Ciao"));
>> tableEnv.createTemporaryView("ParquetDataset", inputTable);
>> tableEnv.executeSql(//
>> "CREATE TABLE `out` (\n" + //
>> "col1 STRING,\n" + //
>> "col2 STRING\n" + //
>> ") WITH (\n" + //
>> " 'connector' = 'filesystem',\n" + //
>> // " 'format' = 'parquet',\n" + //
>> " 'update-mode' = 'append',\n" + //
>> " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>> " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>> ")");
>>
>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");
>>
>> ---------------------------------
>>
>> Exception in thread "main" java.lang.AssertionError: Conversion to
>> relational algebra failed to preserve datatypes:
>> validated type:
>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
>> converted type:
>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>> rel:
>> LogicalProject(col1=[$0], col2=[$1])
>> LogicalUnion(all=[true])
>> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>> LogicalValues(tuples=[[{ 0 }]])
>> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>> LogicalValues(tuples=[[{ 0 }]])
>> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>> LogicalValues(tuples=[[{ 0 }]])
>> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>> LogicalValues(tuples=[[{ 0 }]])
>>
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>
>>
>> What is wrong with my code?
>>
>
>
Re: Flink 1.11 test Parquet sink
Posted by Flavio Pompermaier <po...@okkam.it>.
Sorry, obviously " 'format' = 'parquet'" + is without comment :D
On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <po...@okkam.it>
wrote:
> Hi to all,
> I'm trying to test write to parquet using the following code but I have an
> error:
>
> final TableEnvironment tableEnv =
> DatalinksExecutionEnvironment.getBatchTableEnv();
> final Table inputTable = tableEnv.fromValues(//
> DataTypes.ROW(//
> DataTypes.FIELD("col1", DataTypes.STRING()), //
> DataTypes.FIELD("col2", DataTypes.STRING())//
> ), //
> Row.of(1L, "Hello"), //
> Row.of(2L, "Hello"), //
> Row.of(3L, ""), //
> Row.of(4L, "Ciao"));
> tableEnv.createTemporaryView("ParquetDataset", inputTable);
> tableEnv.executeSql(//
> "CREATE TABLE `out` (\n" + //
> "col1 STRING,\n" + //
> "col2 STRING\n" + //
> ") WITH (\n" + //
> " 'connector' = 'filesystem',\n" + //
> // " 'format' = 'parquet',\n" + //
> " 'update-mode' = 'append',\n" + //
> " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
> " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
> ")");
>
> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");
>
> ---------------------------------
>
> Exception in thread "main" java.lang.AssertionError: Conversion to
> relational algebra failed to preserve datatypes:
> validated type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
> converted type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
> rel:
> LogicalProject(col1=[$0], col2=[$1])
> LogicalUnion(all=[true])
> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"])
> LogicalValues(tuples=[[{ 0 }]])
> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"])
> LogicalValues(tuples=[[{ 0 }]])
> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"])
> LogicalValues(tuples=[[{ 0 }]])
> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE"])
> LogicalValues(tuples=[[{ 0 }]])
>
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>
>
> What is wrong with my code?
>