You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mohit Anchlia <mo...@gmail.com> on 2017/02/24 01:07:33 UTC

Serialization schema

I wrote a key serialization class to write to kafka however I am getting
this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException:
com.sy.flink.test.Tuple2Serializerr$1
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

*public* *class* *Tuple2Serializerr* *implements*

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

*new* Tuple2Serializerr()); // serialization schema

Re: Serialization schema

Posted by Mohit Anchlia <mo...@gmail.com>.
There was a private member variable that was not serializable and was not
marked transient. Thanks for the pointer.

On Thu, Feb 23, 2017 at 11:44 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Thanks for clarifying.
>
> From the looks of your exception:
>
> Caused by: java.io.NotSerializableException:
>>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>>>         at java.io.ObjectOutputStream.wri
>>>>> teObject0(ObjectOutputStream.java:1184)
>>>>>         at java.io.ObjectOutputStream.def
>>>>> aultWriteFields(ObjectOutputStream.java:1548)
>>>>>
>>>>
> com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous
> inner class in `Tuple2Serializerr` is not serializable.
>
> Could you check if that’s the case?
>
>
>
> On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanchlia@gmail.com)
> wrote:
>
> But it is not an inner class.
>
> On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
> > wrote:
>
>> Since I don’t have your complete code, I’m guessing this is the problem:
>> Is your `Tuple2Serializer` an inner class? If yes, you should be able to
>> solve the problem by declaring `Tuple2Serializer` to be `static`.
>>
>> This is more of a Java problem -
>> It isn’t serializable if it isn’t static, because it will contain an
>> implicit reference to the enclosing outer class, and therefore serializing
>> it will result in serializing the outer class instance as well.
>>
>>
>> On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanchlia@gmail.com)
>> wrote:
>>
>> This is at high level what I am doing:
>>
>> Serialize:
>>
>> String s = tuple.getPos(0) + "," + tuple.getPos(1);
>> return s.getBytes()
>>
>> Deserialize:
>>
>> String s = new String(message);
>> String [] sarr = s.split(",");
>> Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
>> Integer.valueOf(sarr[1]));
>>
>> return tuple;
>>
>>
>> On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <
>> tzulitai@apache.org> wrote:
>>
>>> Hi Mohit,
>>>
>>> As 刘彪 pointed out in his reply, the problem is that your
>>> `Tuple2Serializer` contains fields that are not serializable, so
>>> `Tuple2Serializer` itself is not serializable.
>>> Could you perhaps share your `Tuple2Serializer` implementation with us
>>> so we can pinpoint the problem?
>>>
>>> A snippet of the class fields and constructor will do, so you don’t have
>>> to provide the whole `serialize` / `deserialize` implementation if you
>>> don’t want to.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (
>>> mohitanchlia@gmail.com) wrote:
>>>
>>> I am using String inside to convert into bytes.
>>>
>>> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mm...@gmail.com> wrote:
>>>
>>>> Hi Mohit
>>>> As you did not give the whole codes of Tuple2Serializerr. I guess the
>>>> reason is some fields of Tuple2Serializerr do not implement Serializable.
>>>>
>>>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>>>
>>>>> I wrote a key serialization class to write to kafka however I am
>>>>> getting this error. Not sure why as I've already implemented the interfaces.
>>>>>
>>>>> Caused by: java.io.NotSerializableException:
>>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>>>         at java.io.ObjectOutputStream.wri
>>>>> teObject0(ObjectOutputStream.java:1184)
>>>>>         at java.io.ObjectOutputStream.def
>>>>> aultWriteFields(ObjectOutputStream.java:1548)
>>>>>
>>>>> And the class implements the following:
>>>>>
>>>>> *public* *class* *Tuple2Serializerr* *implements*
>>>>>
>>>>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>>>>
>>>>> SerializationSchema<Tuple2<Integer, Integer>> {
>>>>>
>>>>> And called like this:
>>>>>
>>>>>
>>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>>>>
>>>>> "10.22.4.15:9092", // broker list
>>>>>
>>>>> "my-topic", // target topic
>>>>>
>>>>> *new* Tuple2Serializerr()); // serialization schema
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Serialization schema

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Thanks for clarifying. 

From the looks of your exception:

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous inner class in `Tuple2Serializerr` is not serializable.

Could you check if that’s the case?


On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanchlia@gmail.com) wrote:

But it is not an inner class.

On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit reference to the enclosing outer class, and therefore serializing it will result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanchlia@gmail.com) wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanchlia@gmail.com) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mm...@gmail.com> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:



FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema










Re: Serialization schema

Posted by Mohit Anchlia <mo...@gmail.com>.
But it is not an inner class.

On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Since I don’t have your complete code, I’m guessing this is the problem:
> Is your `Tuple2Serializer` an inner class? If yes, you should be able to
> solve the problem by declaring `Tuple2Serializer` to be `static`.
>
> This is more of a Java problem -
> It isn’t serializable if it isn’t static, because it will contain an
> implicit reference to the enclosing outer class, and therefore serializing
> it will result in serializing the outer class instance as well.
>
>
> On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanchlia@gmail.com)
> wrote:
>
> This is at high level what I am doing:
>
> Serialize:
>
> String s = tuple.getPos(0) + "," + tuple.getPos(1);
> return s.getBytes()
>
> Deserialize:
>
> String s = new String(message);
> String [] sarr = s.split(",");
> Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
> Integer.valueOf(sarr[1]));
>
> return tuple;
>
>
> On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
> > wrote:
>
>> Hi Mohit,
>>
>> As 刘彪 pointed out in his reply, the problem is that your
>> `Tuple2Serializer` contains fields that are not serializable, so
>> `Tuple2Serializer` itself is not serializable.
>> Could you perhaps share your `Tuple2Serializer` implementation with us so
>> we can pinpoint the problem?
>>
>> A snippet of the class fields and constructor will do, so you don’t have
>> to provide the whole `serialize` / `deserialize` implementation if you
>> don’t want to.
>>
>> Cheers,
>> Gordon
>>
>>
>> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (
>> mohitanchlia@gmail.com) wrote:
>>
>> I am using String inside to convert into bytes.
>>
>> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mm...@gmail.com> wrote:
>>
>>> Hi Mohit
>>> As you did not give the whole codes of Tuple2Serializerr. I guess the
>>> reason is some fields of Tuple2Serializerr do not implement Serializable.
>>>
>>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>>
>>>> I wrote a key serialization class to write to kafka however I am
>>>> getting this error. Not sure why as I've already implemented the interfaces.
>>>>
>>>> Caused by: java.io.NotSerializableException:
>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>>> ava:1184)
>>>>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>> ream.java:1548)
>>>>
>>>> And the class implements the following:
>>>>
>>>> *public* *class* *Tuple2Serializerr* *implements*
>>>>
>>>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>>>
>>>> SerializationSchema<Tuple2<Integer, Integer>> {
>>>>
>>>> And called like this:
>>>>
>>>>
>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>>>
>>>> "10.22.4.15:9092", // broker list
>>>>
>>>> "my-topic", // target topic
>>>>
>>>> *new* Tuple2Serializerr()); // serialization schema
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Serialization schema

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit reference to the enclosing outer class, and therefore serializing it will result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanchlia@gmail.com) wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanchlia@gmail.com) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mm...@gmail.com> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:



FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema









Re: Serialization schema

Posted by Mohit Anchlia <mo...@gmail.com>.
This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Mohit,
>
> As 刘彪 pointed out in his reply, the problem is that your
> `Tuple2Serializer` contains fields that are not serializable, so
> `Tuple2Serializer` itself is not serializable.
> Could you perhaps share your `Tuple2Serializer` implementation with us so
> we can pinpoint the problem?
>
> A snippet of the class fields and constructor will do, so you don’t have
> to provide the whole `serialize` / `deserialize` implementation if you
> don’t want to.
>
> Cheers,
> Gordon
>
>
> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanchlia@gmail.com)
> wrote:
>
> I am using String inside to convert into bytes.
>
> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mm...@gmail.com> wrote:
>
>> Hi Mohit
>> As you did not give the whole codes of Tuple2Serializerr. I guess the
>> reason is some fields of Tuple2Serializerr do not implement Serializable.
>>
>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>>
>>> I wrote a key serialization class to write to kafka however I am getting
>>> this error. Not sure why as I've already implemented the interfaces.
>>>
>>> Caused by: java.io.NotSerializableException:
>>> com.sy.flink.test.Tuple2Serializerr$1
>>>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>> ava:1184)
>>>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>> ream.java:1548)
>>>
>>> And the class implements the following:
>>>
>>> *public* *class* *Tuple2Serializerr* *implements*
>>>
>>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>>
>>> SerializationSchema<Tuple2<Integer, Integer>> {
>>>
>>> And called like this:
>>>
>>>
>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>>
>>> "10.22.4.15:9092", // broker list
>>>
>>> "my-topic", // target topic
>>>
>>> *new* Tuple2Serializerr()); // serialization schema
>>>
>>>
>>>
>>>
>>
>

Re: Serialization schema

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanchlia@gmail.com) wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mm...@gmail.com> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:



FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema








Re: Serialization schema

Posted by Mohit Anchlia <mo...@gmail.com>.
I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mm...@gmail.com> wrote:

> Hi Mohit
> As you did not give the whole codes of Tuple2Serializerr. I guess the
> reason is some fields of Tuple2Serializerr do not implement Serializable.
>
> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:
>
>> I wrote a key serialization class to write to kafka however I am getting
>> this error. Not sure why as I've already implemented the interfaces.
>>
>> Caused by: java.io.NotSerializableException:
>> com.sy.flink.test.Tuple2Serializerr$1
>>         at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1184)
>>         at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>>
>> And the class implements the following:
>>
>> *public* *class* *Tuple2Serializerr* *implements*
>>
>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>
>> SerializationSchema<Tuple2<Integer, Integer>> {
>>
>> And called like this:
>>
>>
>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>
>> "10.22.4.15:9092", // broker list
>>
>> "my-topic", // target topic
>>
>> *new* Tuple2Serializerr()); // serialization schema
>>
>>
>>
>>
>

Re: Serialization schema

Posted by 刘彪 <mm...@gmail.com>.
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the
reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mo...@gmail.com>:

> I wrote a key serialization class to write to kafka however I am getting
> this error. Not sure why as I've already implemented the interfaces.
>
> Caused by: java.io.NotSerializableException: com.sy.flink.test.
> Tuple2Serializerr$1
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
> And the class implements the following:
>
> *public* *class* *Tuple2Serializerr* *implements*
>
> DeserializationSchema<Tuple2<Integer, Integer>>,
>
> SerializationSchema<Tuple2<Integer, Integer>> {
>
> And called like this:
>
>
> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>
> "10.22.4.15:9092", // broker list
>
> "my-topic", // target topic
>
> *new* Tuple2Serializerr()); // serialization schema
>
>
>
>