You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Whelan <sw...@jwplayer.com> on 2020/10/26 15:10:29 UTC

ValidationException using DataTypeHint in Scalar Function

Hi,

I have a column of type *RAW('java.util.Map', ?)* that I want to pass to a
scalar function UDF. I'm using DataTypeHints but hitting an exception. What
would be the proper DataTypeHint and data type param to achieve this?

  @FunctionHint(
          input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
          output = @DataTypeHint("STRING")
  )
  public static String eval(final Object map, final String key) {
    // business logic
  }


*Exception:*

Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments. Expected signatures are:
MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
... 50 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid argument
type at position 0. Data type RAW('java.lang.Object', '...') expected but
RAW('java.util.Map', ?) passed.
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
... 51 more


Thank you,

Steve

Re: ValidationException using DataTypeHint in Scalar Function

Posted by Jark Wu <im...@gmail.com>.
Hi Steve,

Thanks for reaching out to the Flink community. I am pulling in Timo who
might be able to help you with this question.

Best,
Jark


On Mon, 26 Oct 2020 at 23:10, Steve Whelan <sw...@jwplayer.com> wrote:

> Hi,
>
> I have a column of type *RAW('java.util.Map', ?)* that I want to pass to
> a scalar function UDF. I'm using DataTypeHints but hitting an exception.
> What would be the proper DataTypeHint and data type param to achieve this?
>
>   @FunctionHint(
>           input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
>           output = @DataTypeHint("STRING")
>   )
>   public static String eval(final Object map, final String key) {
>     // business logic
>   }
>
>
> *Exception:*
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
> arguments. Expected signatures are:
> MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type RAW('java.lang.Object', '...')
> expected but RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more
>
>
> Thank you,
>
> Steve
>

Re: ValidationException using DataTypeHint in Scalar Function

Posted by Steve Whelan <sw...@jwplayer.com>.
Timo and Dawid,

Registering my UDF via the deprecated *registerFunction()* instead of the
new *createTemporarySystemFunction()* worked. So it would appear there is
some incompatibility with my implementation and the new registration
system. I will wait for the Flip to be completed and retry then. This
solution works for now.

Thanks,

Steve

On Mon, Nov 9, 2020 at 10:12 AM Timo Walther <tw...@apache.org> wrote:

> Sorry for jumping in so late. I think Dawid gave a nice summary.
>
> As he said, integration of the DataStream <> Table integration is still
> under development. Until then I would suggest to option 3) which means
> don't upgrade the functions and use the old registration function
> `registerFunction`. Everything should work as expected there with the
> old types.
>
> Let me know if you need more input.
>
> Regards,
> Timo
>
>
> On 09.11.20 10:28, Dawid Wysakowicz wrote:
> > Hi Steve,
> >
> > Unfortunately the information you posted still does not explain how you
> > ended up with *RAW('java.util.Map', ?)* for your input type. Would be
> > best if you could share an example that I could use to reproduce it.
> >
> > I tried putting down some potential approaches:
> >
> > I tested it with a class generated from an avsc:
> >
> >   {"namespace": "com.ververica.avro.generated",
> >   "type": "record",
> >   "name": "Address",
> >   "fields": [
> >       {"name": "num", "type": "int"},
> >       {"name": "street", "type": {
> >                            "type": "map",
> >                            "values" : "string",
> >                            "default": {}
> >                          }}
> >    ]
> > }
> >
> > which has two fields:
> >
> >    @Deprecated public int num;
> >    @Deprecated public java.util.Map<java.lang.String,java.lang.String>
> > street;
> >
> > 1) From the description you posted the UrlParameters (street in my case)
> > field should have *LEGACY('RAW', 'ANY<java.util.Map>')* type.
> >
> > root
> >   |-- num: INT
> >   |-- street: LEGACY('RAW', 'ANY<java.util.Map>')
> >
> > 2) Using the new type system
> >
> > A more seamless integration of the DataStream <> Table integration is
> > still under development. You can check FLIP-136[1] for it. Therefore
> > you'd need to adjust your types in the input DataStream. Bare in mind
> > this approach changes the way the type is serialized from an Avro based
> > to custom Flink's POJO serialization.
> >
> >      public static void main(String[] args) throws Exception {
> >          StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >          Map<String, TypeInformation<?>> fieldTypes = new HashMap<>();
> >          fieldTypes.put("num", BasicTypeInfo.INT_TYPE_INFO);
> >          fieldTypes.put("street", Types.MAP(Types.STRING, Types.STRING));
> >          SingleOutputStreamOperator<Address> elements = env.fromElements(
> >                  Address.newBuilder()
> >                          .setNum(1)
> >                          .setStreet(new HashMap<>())
> >                          .build()
> >          )
> >          .returns(
> >                  Types.POJO(
> >                          Address.class,
> >                          fieldTypes
> >                  )
> >          );
> >          StreamTableEnvironment tEnv = StreamTableEnvironment.create(
> >                  env,
> > EnvironmentSettings.newInstance().useBlinkPlanner().build());
> >
> >          tEnv.createTemporaryView("test", elements);
> >
> >          tEnv.from("test").select(call(Func.class,
> > $("street"))).execute().print();
> >      }
> >
> >      public static class Func extends ScalarFunction {
> >          @FunctionHint(
> >                  input = {@DataTypeHint(value = "MAP<STRING, STRING>")},
> >                  output = @DataTypeHint("STRING")
> >          )
> >          public String eval(final Map<String, String> map) {
> >              // business logic
> >              return "ABC";
> >          }
> >      }
> >
> > 3) Using the legacy types approach you can query that field like this:
> >
> >      public static class LegacyFunc extends ScalarFunction {
> >          public String eval(final Map<String, String> map) {
> >              // business logic
> >              return "ABC";
> >          }
> >      }
> >
> >      StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >      SingleOutputStreamOperator<Address> elements = env.fromElements(
> >              Address.newBuilder()
> >                      .setNum(1)
> >                      .setStreet(new HashMap<>())
> >                      .build()
> >      );
> >      StreamTableEnvironment tEnv = StreamTableEnvironment.create(
> >              env,
> > EnvironmentSettings.newInstance().useBlinkPlanner().build());
> >
> >      tEnv.createTemporaryView("test", elements);
> >      tEnv.registerFunction("legacyFunc", new LegacyFunc());
> >
> >      tEnv.from("test").select(call("legacyFunc",
> > $("street"))).execute().print();
> >
> > Best,
> >
> > Dawid
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >
> > On 09/11/2020 02:30, Steve Whelan wrote:
> >> Hi Dawid,
> >>
> >> Just wanted to bump this thread in case you had any thoughts.
> >>
> >> Thanks,
> >>
> >> Steve
> >>
> >> On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <swhelan@jwplayer.com
> >> <ma...@jwplayer.com>> wrote:
> >>
> >>     For some background, I am upgrading from Flink v1.9 to v1.11. So
> >>     what I am about to describe is our implementation on v1.9, which
> >>     worked. I am trying to achieve the same functionality on v1.11.
> >>
> >>     I have a DataStream whose type is an avro generated POJO, which
> >>     contains a field *UrlParameters* that is of type *Map<String,
> >>     String>*. I register this stream as a view so I can perform SQL
> >>     queries on it. One of the queries contains the UDF I have
> >>     previously posted. It appears that in the conversion to a view,
> >>     the type of *UrlParameters* is being converted into
> >>     *RAW('java.util.Map', ?)*.
> >>
> >>
> >>     *Code on v1.9*
> >>
> >>     DataStream pings = // a Kafka stream source deserialized into an
> >>     avro generated POJO
> >>     tableEnvironment.registerDataStream("myTable", pings);
> >>     table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> >>     'some_key') FROM myTable");
> >>     // tablesinks...
> >>
> >>
> >>     /The produced type of my deserializer is:/
> >>
> >>     @Override
> >>     public TypeInformation<Ping> getProducedType() {
> >>         // Ping.class is an avro generated POJO
> >>         return TypeInformation.of(Ping.class);
> >>     }
> >>
> >>     /Scalar UDF MAP_VALUE:/
> >>
> >>     public static String eval(final Map<String, String> map, final
> >>     String key) {
> >>         return map.get(key);
> >>     }
> >>
> >>
> >>     I an using a UDF to access fields in the *UrlParameters* map
> >>     because if I try to access them directly in the SQL (i.e.
> >>     `*UrlParameters['some_key']*`), I get the below exception. This
> >>     stackoverflow[1] had suggested the UDF as a work around.
> >>
> >>     Caused by: org.apache.flink.table.api.TableException: Type is not
> >>     supported: ANY
> >>     at
> >>
>  org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
> >>     at
> >>
>  org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
> >>     at
> >>
>  org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
> >>     at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
> >>     at
> >>
>  org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)
> >>
> >>
> >>     This above implementation worked successfully on v1.9. We use a
> >>     stream source instead of a table source b/c we do other non-SQL
> >>     type things with the stream.
> >>
> >>
> >>     *Code on v1.11*
> >>
> >>     The following is the implementation on v1.11 which does not work.
> >>     I was using the Old Planner on v1.9 but have switched to the Blink
> >>     Planner on v1.11, in case that has any relevance here.
> >>
> >>
> >>     DataStream pings = // a Kafka stream source deserialized into an
> >>     avro generated POJO object
> >>     tableEnvironment.createTemporaryView("myTable", pings);
> >>     table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> >>     'some_key') FROM myTable");
> >>     // tablesinks...
> >>
> >>
> >>     The UDF referenced above produced the below error. So I assumed
> >>     adding DataTypeHints was the way to solve it but I was unable to
> >>     get that to work. That is what prompted the initial email to the ML.
> >>
> >>     Caused by: org.apache.flink.table.api.ValidationException: Invalid
> >>     input arguments. Expected signatures are:
> >>     MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
> >>     at
> >>
>  org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> >>     at
> >>
>  org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> >>     at
> >>
>  org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> >>     ... 50 more
> >>     Caused by: org.apache.flink.table.api.ValidationException: Invalid
> >>     argument type at position 0. Data type MAP<STRING, STRING>
> >>     expected but RAW('java.util.Map', ?) passed.
> >>     at
> >>
>  org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> >>     at
> >>
>  org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> >>     at
> >>
>  org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> >>     ... 51 more
> >>
> >>
> >>     I can try creating a concrete reproducible example if this
> >>     explanation isn't enough though its quite a bit with the avro POJO
> >>     and custom deserializer.
> >>
> >>
> >>     Thanks,
> >>
> >>     Steve
> >>
> >>
> >>     [1]
> >>
> https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
> >>     <
> https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
> >
> >>
>
>

Re: ValidationException using DataTypeHint in Scalar Function

Posted by Timo Walther <tw...@apache.org>.
Sorry for jumping in so late. I think Dawid gave a nice summary.

As he said, integration of the DataStream <> Table integration is still 
under development. Until then I would suggest to option 3) which means 
don't upgrade the functions and use the old registration function 
`registerFunction`. Everything should work as expected there with the 
old types.

Let me know if you need more input.

Regards,
Timo


On 09.11.20 10:28, Dawid Wysakowicz wrote:
> Hi Steve,
> 
> Unfortunately the information you posted still does not explain how you 
> ended up with *RAW('java.util.Map', ?)* for your input type. Would be 
> best if you could share an example that I could use to reproduce it.
> 
> I tried putting down some potential approaches:
> 
> I tested it with a class generated from an avsc:
> 
>   {"namespace": "com.ververica.avro.generated",
>   "type": "record",
>   "name": "Address",
>   "fields": [
>       {"name": "num", "type": "int"},
>       {"name": "street", "type": {
>                            "type": "map",
>                            "values" : "string",
>                            "default": {}
>                          }}
>    ]
> }
> 
> which has two fields:
> 
>    @Deprecated public int num;
>    @Deprecated public java.util.Map<java.lang.String,java.lang.String> 
> street;
> 
> 1) From the description you posted the UrlParameters (street in my case) 
> field should have *LEGACY('RAW', 'ANY<java.util.Map>')* type.
> 
> root
>   |-- num: INT
>   |-- street: LEGACY('RAW', 'ANY<java.util.Map>')
> 
> 2) Using the new type system
> 
> A more seamless integration of the DataStream <> Table integration is 
> still under development. You can check FLIP-136[1] for it. Therefore 
> you'd need to adjust your types in the input DataStream. Bare in mind 
> this approach changes the way the type is serialized from an Avro based 
> to custom Flink's POJO serialization.
> 
>      public static void main(String[] args) throws Exception {
>          StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          Map<String, TypeInformation<?>> fieldTypes = new HashMap<>();
>          fieldTypes.put("num", BasicTypeInfo.INT_TYPE_INFO);
>          fieldTypes.put("street", Types.MAP(Types.STRING, Types.STRING));
>          SingleOutputStreamOperator<Address> elements = env.fromElements(
>                  Address.newBuilder()
>                          .setNum(1)
>                          .setStreet(new HashMap<>())
>                          .build()
>          )
>          .returns(
>                  Types.POJO(
>                          Address.class,
>                          fieldTypes
>                  )
>          );
>          StreamTableEnvironment tEnv = StreamTableEnvironment.create(
>                  env,
> EnvironmentSettings.newInstance().useBlinkPlanner().build());
> 
>          tEnv.createTemporaryView("test", elements);
> 
>          tEnv.from("test").select(call(Func.class, 
> $("street"))).execute().print();
>      }
> 
>      public static class Func extends ScalarFunction {
>          @FunctionHint(
>                  input = {@DataTypeHint(value = "MAP<STRING, STRING>")},
>                  output = @DataTypeHint("STRING")
>          )
>          public String eval(final Map<String, String> map) {
>              // business logic
>              return "ABC";
>          }
>      }
> 
> 3) Using the legacy types approach you can query that field like this:
> 
>      public static class LegacyFunc extends ScalarFunction {
>          public String eval(final Map<String, String> map) {
>              // business logic
>              return "ABC";
>          }
>      }
> 
>      StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>      SingleOutputStreamOperator<Address> elements = env.fromElements(
>              Address.newBuilder()
>                      .setNum(1)
>                      .setStreet(new HashMap<>())
>                      .build()
>      );
>      StreamTableEnvironment tEnv = StreamTableEnvironment.create(
>              env,
> EnvironmentSettings.newInstance().useBlinkPlanner().build());
> 
>      tEnv.createTemporaryView("test", elements);
>      tEnv.registerFunction("legacyFunc", new LegacyFunc());
> 
>      tEnv.from("test").select(call("legacyFunc", 
> $("street"))).execute().print();
> 
> Best,
> 
> Dawid
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> 
> On 09/11/2020 02:30, Steve Whelan wrote:
>> Hi Dawid,
>>
>> Just wanted to bump this thread in case you had any thoughts.
>>
>> Thanks,
>>
>> Steve
>>
>> On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <swhelan@jwplayer.com 
>> <ma...@jwplayer.com>> wrote:
>>
>>     For some background, I am upgrading from Flink v1.9 to v1.11. So
>>     what I am about to describe is our implementation on v1.9, which
>>     worked. I am trying to achieve the same functionality on v1.11.
>>
>>     I have a DataStream whose type is an avro generated POJO, which
>>     contains a field *UrlParameters* that is of type *Map<String,
>>     String>*. I register this stream as a view so I can perform SQL
>>     queries on it. One of the queries contains the UDF I have
>>     previously posted. It appears that in the conversion to a view,
>>     the type of *UrlParameters* is being converted into
>>     *RAW('java.util.Map', ?)*.
>>
>>
>>     *Code on v1.9*
>>
>>     DataStream pings = // a Kafka stream source deserialized into an
>>     avro generated POJO
>>     tableEnvironment.registerDataStream("myTable", pings);
>>     table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
>>     'some_key') FROM myTable");
>>     // tablesinks...
>>
>>
>>     /The produced type of my deserializer is:/
>>
>>     @Override
>>     public TypeInformation<Ping> getProducedType() {
>>         // Ping.class is an avro generated POJO
>>         return TypeInformation.of(Ping.class);
>>     }
>>
>>     /Scalar UDF MAP_VALUE:/
>>
>>     public static String eval(final Map<String, String> map, final
>>     String key) {
>>         return map.get(key);
>>     }
>>
>>
>>     I an using a UDF to access fields in the *UrlParameters* map
>>     because if I try to access them directly in the SQL (i.e.
>>     `*UrlParameters['some_key']*`), I get the below exception. This
>>     stackoverflow[1] had suggested the UDF as a work around.
>>
>>     Caused by: org.apache.flink.table.api.TableException: Type is not
>>     supported: ANY
>>     at
>>     org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
>>     at
>>     org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
>>     at
>>     org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>>     at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>>     at
>>     org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)
>>
>>
>>     This above implementation worked successfully on v1.9. We use a
>>     stream source instead of a table source b/c we do other non-SQL
>>     type things with the stream.
>>
>>
>>     *Code on v1.11*
>>
>>     The following is the implementation on v1.11 which does not work.
>>     I was using the Old Planner on v1.9 but have switched to the Blink
>>     Planner on v1.11, in case that has any relevance here.
>>
>>
>>     DataStream pings = // a Kafka stream source deserialized into an
>>     avro generated POJO object
>>     tableEnvironment.createTemporaryView("myTable", pings);
>>     table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
>>     'some_key') FROM myTable");
>>     // tablesinks...
>>
>>
>>     The UDF referenced above produced the below error. So I assumed
>>     adding DataTypeHints was the way to solve it but I was unable to
>>     get that to work. That is what prompted the initial email to the ML.
>>
>>     Caused by: org.apache.flink.table.api.ValidationException: Invalid
>>     input arguments. Expected signatures are:
>>     MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
>>     at
>>     org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
>>     at
>>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
>>     at
>>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
>>     ... 50 more
>>     Caused by: org.apache.flink.table.api.ValidationException: Invalid
>>     argument type at position 0. Data type MAP<STRING, STRING>
>>     expected but RAW('java.util.Map', ?) passed.
>>     at
>>     org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
>>     at
>>     org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
>>     at
>>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
>>     ... 51 more
>>
>>
>>     I can try creating a concrete reproducible example if this
>>     explanation isn't enough though its quite a bit with the avro POJO
>>     and custom deserializer.
>>
>>
>>     Thanks,
>>
>>     Steve
>>
>>
>>     [1]
>>     https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
>>     <https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types>
>>


Re: ValidationException using DataTypeHint in Scalar Function

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Steve,

Unfortunately the information you posted still does not explain how you
ended up with *RAW('java.util.Map', ?)* for your input type. Would be
best if you could share an example that I could use to reproduce it.

I tried putting down some potential approaches:

I tested it with a class generated from an avsc:

 {"namespace": "com.ververica.avro.generated",
 "type": "record",
 "name": "Address",
 "fields": [
     {"name": "num", "type": "int"},
     {"name": "street", "type": {
                          "type": "map",
                          "values" : "string",
                          "default": {}
                        }}
  ]
}

which has two fields:

  @Deprecated public int num;
  @Deprecated public java.util.Map<java.lang.String,java.lang.String>
street;

1) From the description you posted the UrlParameters (street in my case)
field should have *LEGACY('RAW', 'ANY<java.util.Map>')* type.

root
 |-- num: INT
 |-- street: LEGACY('RAW', 'ANY<java.util.Map>')

2) Using the new type system

A more seamless integration of the DataStream <> Table integration is
still under development. You can check FLIP-136[1] for it. Therefore
you'd need to adjust your types in the input DataStream. Bare in mind
this approach changes the way the type is serialized from an Avro based
to custom Flink's POJO serialization.

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        Map<String, TypeInformation<?>> fieldTypes = new HashMap<>();
        fieldTypes.put("num", BasicTypeInfo.INT_TYPE_INFO);
        fieldTypes.put("street", Types.MAP(Types.STRING, Types.STRING));
        SingleOutputStreamOperator<Address> elements = env.fromElements(
                Address.newBuilder()
                        .setNum(1)
                        .setStreet(new HashMap<>())
                        .build()
        )
        .returns(
                Types.POJO(
                        Address.class,
                        fieldTypes
                )
        );
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(
                env,
               
EnvironmentSettings.newInstance().useBlinkPlanner().build());

        tEnv.createTemporaryView("test", elements);

        tEnv.from("test").select(call(Func.class,
$("street"))).execute().print();
    }

    public static class Func extends ScalarFunction {
        @FunctionHint(
                input = {@DataTypeHint(value = "MAP<STRING, STRING>")},
                output = @DataTypeHint("STRING")
        )
        public String eval(final Map<String, String> map) {
            // business logic
            return "ABC";
        }
    }

3) Using the legacy types approach you can query that field like this:

    public static class LegacyFunc extends ScalarFunction {
        public String eval(final Map<String, String> map) {
            // business logic
            return "ABC";
        }
    }

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    SingleOutputStreamOperator<Address> elements = env.fromElements(
            Address.newBuilder()
                    .setNum(1)
                    .setStreet(new HashMap<>())
                    .build()
    );
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(
            env,
            EnvironmentSettings.newInstance().useBlinkPlanner().build());

    tEnv.createTemporaryView("test", elements);
    tEnv.registerFunction("legacyFunc", new LegacyFunc());

    tEnv.from("test").select(call("legacyFunc",
$("street"))).execute().print();

Best,

Dawid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

On 09/11/2020 02:30, Steve Whelan wrote:
> Hi Dawid,
>
> Just wanted to bump this thread in case you had any thoughts.
>
> Thanks,
>
> Steve
>
> On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <swhelan@jwplayer.com
> <ma...@jwplayer.com>> wrote:
>
>     For some background, I am upgrading from Flink v1.9 to v1.11. So
>     what I am about to describe is our implementation on v1.9, which
>     worked. I am trying to achieve the same functionality on v1.11.
>
>     I have a DataStream whose type is an avro generated POJO, which
>     contains a field *UrlParameters* that is of type *Map<String,
>     String>*. I register this stream as a view so I can perform SQL
>     queries on it. One of the queries contains the UDF I have
>     previously posted. It appears that in the conversion to a view,
>     the type of *UrlParameters* is being converted
>     into *RAW('java.util.Map', ?)*.
>
>
>     *Code on v1.9*
>
>     DataStream pings = // a Kafka stream source deserialized into an
>     avro generated POJO
>     tableEnvironment.registerDataStream("myTable", pings);
>     table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
>     'some_key') FROM myTable");
>     // tablesinks...
>
>
>     /The produced type of my deserializer is:/
>
>     @Override
>     public TypeInformation<Ping> getProducedType() {
>         // Ping.class is an avro generated POJO
>         return TypeInformation.of(Ping.class);
>     }
>
>     /Scalar UDF MAP_VALUE:/
>      
>     public static String eval(final Map<String, String> map, final
>     String key) {
>         return map.get(key);
>     }
>
>
>     I an using a UDF to access fields in the *UrlParameters* map
>     because if I try to access them directly in the SQL (i.e.
>     `*UrlParameters['some_key']*`), I get the below exception. This
>     stackoverflow[1] had suggested the UDF as a work around.
>
>     Caused by: org.apache.flink.table.api.TableException: Type is not
>     supported: ANY
>     at
>     org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
>     at
>     org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
>     at
>     org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>     at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>     at
>     org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)
>
>
>     This above implementation worked successfully on v1.9. We use a
>     stream source instead of a table source b/c we do other non-SQL
>     type things with the stream.
>
>
>     *Code on v1.11*
>
>     The following is the implementation on v1.11 which does not work.
>     I was using the Old Planner on v1.9 but have switched to the Blink
>     Planner on v1.11, in case that has any relevance here.
>
>
>     DataStream pings = // a Kafka stream source deserialized into an
>     avro generated POJO object
>     tableEnvironment.createTemporaryView("myTable", pings);
>     table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
>     'some_key') FROM myTable");
>     // tablesinks...
>
>
>     The UDF referenced above produced the below error. So I assumed
>     adding DataTypeHints was the way to solve it but I was unable to
>     get that to work. That is what prompted the initial email to the ML.
>
>     Caused by: org.apache.flink.table.api.ValidationException: Invalid
>     input arguments. Expected signatures are:
>     MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
>     at
>     org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
>     at
>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
>     at
>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
>     ... 50 more
>     Caused by: org.apache.flink.table.api.ValidationException: Invalid
>     argument type at position 0. Data type MAP<STRING, STRING>
>     expected but RAW('java.util.Map', ?) passed.
>     at
>     org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
>     at
>     org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
>     at
>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
>     ... 51 more
>
>
>     I can try creating a concrete reproducible example if this
>     explanation isn't enough though its quite a bit with the avro POJO
>     and custom deserializer.
>
>
>     Thanks,
>
>     Steve
>
>
>     [1]
>     https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
>

Re: ValidationException using DataTypeHint in Scalar Function

Posted by Steve Whelan <sw...@jwplayer.com>.
Hi Dawid,

Just wanted to bump this thread in case you had any thoughts.

Thanks,

Steve

On Thu, Oct 29, 2020 at 2:42 PM Steve Whelan <sw...@jwplayer.com> wrote:

> For some background, I am upgrading from Flink v1.9 to v1.11. So what I am
> about to describe is our implementation on v1.9, which worked. I am trying
> to achieve the same functionality on v1.11.
>
> I have a DataStream whose type is an avro generated POJO, which contains a
> field *UrlParameters* that is of type *Map<String, String>*. I register
> this stream as a view so I can perform SQL queries on it. One of the
> queries contains the UDF I have previously posted. It appears that in the
> conversion to a view, the type of *UrlParameters* is being converted into *RAW('java.util.Map',
> ?)*.
>
>
> *Code on v1.9*
>
> DataStream pings = // a Kafka stream source deserialized into an avro
> generated POJO
> tableEnvironment.registerDataStream("myTable", pings);
> table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> 'some_key') FROM myTable");
> // tablesinks...
>
>
> *The produced type of my deserializer is:*
>
> @Override
> public TypeInformation<Ping> getProducedType() {
>     // Ping.class is an avro generated POJO
>     return TypeInformation.of(Ping.class);
> }
>
> *Scalar UDF MAP_VALUE:*
>
> public static String eval(final Map<String, String> map, final String key)
> {
>     return map.get(key);
> }
>
>
> I an using a UDF to access fields in the *UrlParameters* map because if I
> try to access them directly in the SQL (i.e. `*UrlParameters['some_key']*`),
> I get the below exception. This stackoverflow[1] had suggested the UDF as a
> work around.
>
> Caused by: org.apache.flink.table.api.TableException: Type is not
> supported: ANY
> at
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)
>
>
> This above implementation worked successfully on v1.9. We use a stream
> source instead of a table source b/c we do other non-SQL type things with
> the stream.
>
>
> *Code on v1.11*
>
> The following is the implementation on v1.11 which does not work. I was
> using the Old Planner on v1.9 but have switched to the Blink Planner on
> v1.11, in case that has any relevance here.
>
>
> DataStream pings = // a Kafka stream source deserialized into an avro
> generated POJO object
> tableEnvironment.createTemporaryView("myTable", pings);
> table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
> 'some_key') FROM myTable");
> // tablesinks...
>
>
> The UDF referenced above produced the below error. So I assumed adding
> DataTypeHints was the way to solve it but I was unable to get that to work.
> That is what prompted the initial email to the ML.
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
> arguments. Expected signatures are:
> MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type MAP<STRING, STRING> expected but
> RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more
>
>
> I can try creating a concrete reproducible example if this explanation
> isn't enough though its quite a bit with the avro POJO and custom
> deserializer.
>
>
> Thanks,
>
> Steve
>
>
> [1]
> https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types
>
>>

Re: ValidationException using DataTypeHint in Scalar Function

Posted by Steve Whelan <sw...@jwplayer.com>.
For some background, I am upgrading from Flink v1.9 to v1.11. So what I am
about to describe is our implementation on v1.9, which worked. I am trying
to achieve the same functionality on v1.11.

I have a DataStream whose type is an avro generated POJO, which contains a
field *UrlParameters* that is of type *Map<String, String>*. I register
this stream as a view so I can perform SQL queries on it. One of the
queries contains the UDF I have previously posted. It appears that in the
conversion to a view, the type of *UrlParameters* is being converted
into *RAW('java.util.Map',
?)*.


*Code on v1.9*

DataStream pings = // a Kafka stream source deserialized into an avro
generated POJO
tableEnvironment.registerDataStream("myTable", pings);
table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
'some_key') FROM myTable");
// tablesinks...


*The produced type of my deserializer is:*

@Override
public TypeInformation<Ping> getProducedType() {
    // Ping.class is an avro generated POJO
    return TypeInformation.of(Ping.class);
}

*Scalar UDF MAP_VALUE:*

public static String eval(final Map<String, String> map, final String key) {
    return map.get(key);
}


I an using a UDF to access fields in the *UrlParameters* map because if I
try to access them directly in the SQL (i.e. `*UrlParameters['some_key']*`),
I get the below exception. This stackoverflow[1] had suggested the UDF as a
work around.

Caused by: org.apache.flink.table.api.TableException: Type is not
supported: ANY
at
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)


This above implementation worked successfully on v1.9. We use a stream
source instead of a table source b/c we do other non-SQL type things with
the stream.


*Code on v1.11*

The following is the implementation on v1.11 which does not work. I was
using the Old Planner on v1.9 but have switched to the Blink Planner on
v1.11, in case that has any relevance here.


DataStream pings = // a Kafka stream source deserialized into an avro
generated POJO object
tableEnvironment.createTemporaryView("myTable", pings);
table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
'some_key') FROM myTable");
// tablesinks...


The UDF referenced above produced the below error. So I assumed adding
DataTypeHints was the way to solve it but I was unable to get that to work.
That is what prompted the initial email to the ML.

Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments. Expected signatures are:
MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
... 50 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid argument
type at position 0. Data type MAP<STRING, STRING> expected but
RAW('java.util.Map', ?) passed.
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
... 51 more


I can try creating a concrete reproducible example if this explanation
isn't enough though its quite a bit with the avro POJO and custom
deserializer.


Thanks,

Steve


[1]
https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types

>

Re: ValidationException using DataTypeHint in Scalar Function

Posted by Dawid Wysakowicz <dw...@apache.org>.
Sorry for a late reply.

Could you share a complete, reproducible example? I am mostly interested
in where do you get the input RAW('java.util.Map', '...') type that you
are passing into your UDF.

Raw types are equal/equivalent only if both the class and the serializer
are equal.

A side note: Have you tried using the MAP type instead of
RAW('java.util.Map', '...')? Why did you decide to use a RAW type in
your case?

Best,

Dawid

On 28/10/2020 00:23, Steve Whelan wrote:
> Hi Dawid,
>
> I added `/bridgedTo = Map.class/` as you suggested and got a slightly
> different exception. I also tried passing a rawSerializer (an
> implementation similar to MapSerializer[1] with String type key and
> value) but got the same exception as without it. I am using Flink
> v1.11 for reference.
>
>
> @FunctionHint(
>       input = {
>               @DataTypeHint(value="RAW", bridgedTo=Map.class,
> rawSerializer=MyMapSerializer.class),
>               @DataTypeHint("STRING")
>       },
>       output = @DataTypeHint("STRING")
> )
> public static String eval(final Object map, final String key)
>
>
> *Exception:*
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> input arguments. Expected signatures are:
> MAP_VALUE(RAW('java.util.Map', '...'), STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 49 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type RAW('java.util.Map', '...')
> expected but RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 50 more*
> *
>
>
> [1] https://github.com/apache/flink/blob/release-1.11.0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
>
> On Tue, Oct 27, 2020 at 6:13 AM Dawid Wysakowicz
> <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
>     Hey Steve,
>
>     You should be able to do via the bridgedTo parameter. You can
>     additionally specify a serializer you want to use via
>     rawSerializer parameter:
>
>             @FunctionHint(
>                     input = {
>                             @DataTypeHint(value = "RAW", bridgedTo =
>     Map.class[, rawSerializer = ... ]),
>                             @DataTypeHint("STRING")},
>                     output = @DataTypeHint("STRING")
>             )
>             public static String eval(final Object map, final String key)
>
>     Best,
>
>     Dawid
>
>     On 26/10/2020 16:10, Steve Whelan wrote:
>>     Hi,
>>
>>     I have a column of type *RAW('java.util.Map', ?)* that I want to
>>     pass to a scalar function UDF. I'm using DataTypeHints but
>>     hitting an exception. What would be the proper DataTypeHint and
>>     data type param to achieve this?
>>
>>       @FunctionHint(
>>               input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
>>               output = @DataTypeHint("STRING")
>>       )
>>       public static String eval(final Object map, final String key) {
>>         // business logic
>>       }
>>
>>
>>     *Exception:*
>>     *
>>     *
>>     Caused by: org.apache.flink.table.api.ValidationException:
>>     Invalid input arguments. Expected signatures are:
>>     MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
>>     at
>>     org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
>>     at
>>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
>>     at
>>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
>>     ... 50 more
>>     Caused by: org.apache.flink.table.api.ValidationException:
>>     Invalid argument type at position 0. Data type
>>     RAW('java.lang.Object', '...') expected but RAW('java.util.Map',
>>     ?) passed.
>>     at
>>     org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
>>     at
>>     org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
>>     at
>>     org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
>>     ... 51 more*
>>     *
>>
>>
>>     Thank you,
>>
>>     Steve
>

Re: ValidationException using DataTypeHint in Scalar Function

Posted by Steve Whelan <sw...@jwplayer.com>.
Hi Dawid,

I added `*bridgedTo = Map.class*` as you suggested and got a slightly
different exception. I also tried passing a rawSerializer (an
implementation similar to MapSerializer[1] with String type key and value)
but got the same exception as without it. I am using Flink v1.11 for
reference.


@FunctionHint(
      input = {
              @DataTypeHint(value="RAW", bridgedTo=Map.class,
rawSerializer=MyMapSerializer.class),
              @DataTypeHint("STRING")
      },
      output = @DataTypeHint("STRING")
)
public static String eval(final Object map, final String key)


*Exception:*

Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments. Expected signatures are:
MAP_VALUE(RAW('java.util.Map', '...'), STRING)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
... 49 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid argument
type at position 0. Data type RAW('java.util.Map', '...') expected but
RAW('java.util.Map', ?) passed.
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
... 50 more


[1]
https://github.com/apache/flink/blob/release-1.11.0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java

On Tue, Oct 27, 2020 at 6:13 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hey Steve,
>
> You should be able to do via the bridgedTo parameter. You can additionally
> specify a serializer you want to use via rawSerializer parameter:
>
>         @FunctionHint(
>                 input = {
>                         @DataTypeHint(value = "RAW", bridgedTo =
> Map.class[, rawSerializer = ... ]),
>                         @DataTypeHint("STRING")},
>                 output = @DataTypeHint("STRING")
>         )
>         public static String eval(final Object map, final String key)
>
> Best,
>
> Dawid
> On 26/10/2020 16:10, Steve Whelan wrote:
>
> Hi,
>
> I have a column of type *RAW('java.util.Map', ?)* that I want to pass to
> a scalar function UDF. I'm using DataTypeHints but hitting an exception.
> What would be the proper DataTypeHint and data type param to achieve this?
>
>   @FunctionHint(
>           input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
>           output = @DataTypeHint("STRING")
>   )
>   public static String eval(final Object map, final String key) {
>     // business logic
>   }
>
>
> *Exception:*
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
> arguments. Expected signatures are:
> MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type RAW('java.lang.Object', '...')
> expected but RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more
>
>
> Thank you,
>
> Steve
>
>

Re: ValidationException using DataTypeHint in Scalar Function

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Steve,

You should be able to do via the bridgedTo parameter. You can
additionally specify a serializer you want to use via rawSerializer
parameter:

        @FunctionHint(
                input = {
                        @DataTypeHint(value = "RAW", bridgedTo =
Map.class[, rawSerializer = ... ]),
                        @DataTypeHint("STRING")},
                output = @DataTypeHint("STRING")
        )
        public static String eval(final Object map, final String key)

Best,

Dawid

On 26/10/2020 16:10, Steve Whelan wrote:
> Hi,
>
> I have a column of type *RAW('java.util.Map', ?)* that I want to pass
> to a scalar function UDF. I'm using DataTypeHints but hitting an
> exception. What would be the proper DataTypeHint and data type param
> to achieve this?
>
>   @FunctionHint(
>           input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
>           output = @DataTypeHint("STRING")
>   )
>   public static String eval(final Object map, final String key) {
>     // business logic
>   }
>
>
> *Exception:*
> *
> *
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> input arguments. Expected signatures are:
> MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type RAW('java.lang.Object', '...')
> expected but RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more*
> *
>
>
> Thank you,
>
> Steve