You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/08/13 14:31:16 UTC

TypeInformation in Custom Deserializer

Hi,
I'm trying to implement a custom deserialiser to deserialise data from a
kafka sink.
So I'm implementing a KeyedDeserializedSchema<Tuple6&lt;String, String,
Date, String, String, Double>>
which ask me to override the method:

@Override
public TypeInformation<Tuple6&lt;String, String, Date, String, String,
Double>> getProducedType() {
	//to do 
}

Honestly I investigated in  link
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html>  
but I didn't understand what is this method and how to implement it.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: TypeInformation in Custom Deserializer

Posted by AndreaKinn <ki...@hotmail.it>.
Thank you, I solved in this way


Greg Hogan wrote
> You should be able to implement this using a TypeHint (see the Creating a
> TypeInformation or TypeSerializer section from the linked page):
> 
> 	return TypeInformation.of(new TypeHint&lt;Tuple6&lt;String, String, Date,
> String, String, Double&gt;>(){});
> 
> 
>> On Aug 13, 2017, at 10:31 AM, AndreaKinn &lt;

> kinn6aer@

> &gt; wrote:
>> 
>> Hi,
>> I'm trying to implement a custom deserialiser to deserialise data from a
>> kafka sink.
>> So I'm implementing a KeyedDeserializedSchema&lt;Tuple6&amp;lt;String,
>> String,
> &gt; Date, String, String, Double>>
>> which ask me to override the method:
>> 
>> @Override
>> public TypeInformation&lt;Tuple6&amp;lt;String, String, Date, String,
>> String,
> &gt; Double>> getProducedType() {
>> 	//to do 
>> }
>> 
>> Honestly I investigated in  link
>> &lt;https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html&gt;  
>> but I didn't understand what is this method and how to implement it.
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861p14868.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: TypeInformation in Custom Deserializer

Posted by Greg Hogan <co...@greghogan.com>.
You should be able to implement this using a TypeHint (see the Creating a TypeInformation or TypeSerializer section from the linked page):

	return TypeInformation.of(new TypeHint<Tuple6<String, String, Date, String, String, Double>>(){});


> On Aug 13, 2017, at 10:31 AM, AndreaKinn <ki...@hotmail.it> wrote:
> 
> Hi,
> I'm trying to implement a custom deserialiser to deserialise data from a
> kafka sink.
> So I'm implementing a KeyedDeserializedSchema<Tuple6&lt;String, String,
> Date, String, String, Double>>
> which ask me to override the method:
> 
> @Override
> public TypeInformation<Tuple6&lt;String, String, Date, String, String,
> Double>> getProducedType() {
> 	//to do 
> }
> 
> Honestly I investigated in  link
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html>  
> but I didn't understand what is this method and how to implement it.
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: TypeInformation in Custom Deserializer

Posted by Ted Yu <yu...@gmail.com>.
Please take a look at the following
in flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java

  public TypeInformation<Tuple2<K, V>> getProducedType() {
    return new TupleTypeInfo<>(TypeExtractor.createTypeInfo(keyClass),
TypeExtractor.createTypeInfo(valueClass));

FYI

On Sun, Aug 13, 2017 at 8:23 AM, AndreaKinn <ki...@hotmail.it> wrote:

> But I'm using Java primitive type like String, Double plus Date types.
> Flink
> doesn't know how to handle them?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/TypeInformation-
> in-Custom-Deserializer-tp14861p14863.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: TypeInformation in Custom Deserializer

Posted by AndreaKinn <ki...@hotmail.it>.
But I'm using Java primitive type like String, Double plus Date types. Flink
doesn't know how to handle them?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TypeInformation-in-Custom-Deserializer-tp14861p14863.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: TypeInformation in Custom Deserializer

Posted by Ted Yu <yu...@gmail.com>.
From ResultTypeQueryable :

   * Gets the data type (as a {@link TypeInformation}) produced by this
function or input format.
   *
   * @return The data type produced by this function or input format.
   */
  TypeInformation<T> getProducedType();

You can look at classes which implement this method as examples.

flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java


On Sun, Aug 13, 2017 at 7:31 AM, AndreaKinn <ki...@hotmail.it> wrote:

> Hi,
> I'm trying to implement a custom deserialiser to deserialise data from a
> kafka sink.
> So I'm implementing a KeyedDeserializedSchema<Tuple6&lt;String, String,
> Date, String, String, Double>>
> which ask me to override the method:
>
> @Override
> public TypeInformation<Tuple6&lt;String, String, Date, String, String,
> Double>> getProducedType() {
>         //to do
> }
>
> Honestly I investigated in  link
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_
> serialization.html>
> but I didn't understand what is this method and how to implement it.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/TypeInformation-
> in-Custom-Deserializer-tp14861.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>