You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Zhechao Ma <ma...@gmail.com> on 2016/11/07 08:50:22 UTC

KafkaBolt JSON Serializer Problem

I'm using KafkaBolt to write data to kafka. Tuple to kafka map is <String,
JSONObject>.
I set both *key.serializer* and *value.serializer* as
"org.apache.kafka.common.serialization.StringSerializer". I get the
following Exception:

org.apache.kafka.common.errors.SerializationException: Can't convert value
of class org.apache.storm.shade.org.json.simple.JSONObject to class
org.apache.kafka.common.serialization.StringSerializer specified in
value.serializer


I cannot find other serializers related to JSON, and I'm using storm 1.0.2
and kafka 0.8.1.1.

Could anyone help ?

-- 
Thanks
Zhechao Ma

Re: KafkaBolt JSON Serializer Problem

Posted by Zhechao Ma <ma...@gmail.com>.
Change the JSONObject import to

import org.apache.storm.shade.org.json.simple.JSONObject;

Resolved.

The missing exception stack information quiet misleads me !



2016-11-17 12:38 GMT+08:00 Zhechao Ma <ma...@gmail.com>:

> I modify code in KafkaProducer.java to catch the exception stack, and
> finally get the real Exception cause.
>
> java.lang.ClassCastException: org.apache.storm.shade.org.json.simple.JSONObject cannot be cast to org.json.simple.JSONObject
>
>
> So this problem is related to the maven shade plugin.
>
> 2016-11-16 19:20 GMT+08:00 Zhechao Ma <ma...@gmail.com>:
>
>> Hi Amber,
>>
>> Here is the code.
>>
>> Properties prop = new Properties();
>> prop.put("bootstrap.servers", kafkaBrokers);
>> prop.put("ack", ack);
>> prop.put("key.serializer", keySerializer);
>> prop.put("value.serializer", valueSerializer);
>>
>> KafkaBolt kafkaBolt = new KafkaBolt<String, JSONObject>()
>>                 .withProducerProperties(prop)
>>                 .withTopicSelector(new DefaultTopicSelector(outputTopic))
>>                 .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("conn","httpstream"));
>>
>> builder.setBolt("kafkabolt", kafkaBolt, kafkaBoltParallelism).shuffleGrouping("normalizeLog", "origin");
>>
>>
>>
>>
>> And I have found where the Exception throws, that's in *KafkaProduce.java.
>> *That's a ClassCastException. I am confused about the class casting,
>> because there seems no class casting. My own serializer was posted in my
>> last mail.
>>
>> byte[] serializedValue;
>>     try {
>>         serializedValue = valueSerializer.serialize(record.topic(), record.value());
>>     } catch (ClassCastException cce) {
>>       throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
>>          " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
>>          " specified in value.serializer");}
>>
>> Any suggestion?
>>
>> Thanks.
>>
>>
>> 2016-11-16 12:53 GMT+08:00 Amber Kulkarni <am...@gmail.com>:
>>
>>> Hey,
>>>
>>> You want to post json string to kafka right ?
>>> Also can you post code you are using to post to kafka.
>>>
>>> On Tue, Nov 15, 2016 at 12:18 PM, Zhechao Ma
>>> <ma...@gmail.com> wrote:
>>> > Here is the code. I can only get  log in the constructor and configure
>>> > method.
>>> >
>>> > import org.json.simple.JSONObject;
>>> > import org.apache.kafka.common.errors.SerializationException;
>>> > import org.apache.kafka.common.serialization.Serializer;
>>> >
>>> > import java.util.Map;
>>> > import org.slf4j.Logger;
>>> > import org.slf4j.LoggerFactory
>>> >
>>> > public class JsonSerializer implements Serializer<JSONObject> {
>>> >     private static final Logger LOG =
>>> > LoggerFactory.getLogger(JsonSerializer.class);
>>> >     /**
>>> >      * Default constructor needed by Kafka
>>> >      */
>>> >     public JsonSerializer() {
>>> >         LOG.info("===> JsonSerializer constructor !!");
>>> >     }
>>> >
>>> >     @Override
>>> >     public void configure(Map<String, ?> config, boolean isKey) {
>>> >         LOG.info("===> JsonSerializer configure");
>>> >     }
>>> >
>>> >     @Override
>>> >     public byte[] serialize(String topic, JSONObject data) {
>>> >         LOG.info("===> JsonSerializer serialize !!");
>>> >         if (data == null)
>>> >             return null;
>>> >         try {
>>> >             return data.toString().getBytes("utf-8");
>>> >         } catch (Exception e) {
>>> >             LOG.error("===> JsonSerializer serialize EXCEPTION");
>>> >             throw new SerializationException("Error serializing JSON
>>> > message", e);
>>> >         }
>>> >     }
>>> >
>>> >     @Override
>>> >     public void close() {
>>> >         LOG.error("===> JsonSerializer close");
>>> >     }
>>> > }
>>> >
>>> >
>>> >
>>> >
>>> > 2016-11-14 20:01 GMT+08:00 Andrew Xor <an...@gmail.com>:
>>> >>
>>> >> Hi,
>>> >>
>>> >>  Since you can't cast one type to  another and you are not getting a
>>> Null
>>> >> exception in order to be better able to help you could you give us the
>>> >> implementation of your serializer?
>>> >>
>>> >> Cheers,
>>> >>
>>> >> A.
>>> >>
>>> >> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <
>>> mazhechaomaillist@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> Even when I implement my own json serializer, it still throws the
>>> similar
>>> >>> exception, but no more details for debug.:
>>> >>>
>>> >>> org.apache.kafka.common.errors.SerializationException: Can't convert
>>> >>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>>> xxxxxx
>>> >>> specified in value.serializer
>>> >>>
>>> >>> I add a LOG in the overriding method byte[] serialize(String topic,
>>> >>> JSONObject data), and found no logs in worker.log. That is to say
>>> this
>>> >>> exception is throwed before method serialize is called.
>>> >>>
>>> >>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <ma...@gmail.com>:
>>> >>>>
>>> >>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
>>> >>>> <String, JSONObject>.
>>> >>>> I set both key.serializer and value.serializer as
>>> >>>> "org.apache.kafka.common.serialization.StringSerializer". I get the
>>> >>>> following Exception:
>>> >>>>
>>> >>>> org.apache.kafka.common.errors.SerializationException: Can't
>>> convert
>>> >>>> value of class org.apache.storm.shade.org.json.simple.JSONObject
>>> to class
>>> >>>> org.apache.kafka.common.serialization.StringSerializer specified in
>>> >>>> value.serializer
>>> >>>>
>>> >>>>
>>> >>>> I cannot find other serializers related to JSON, and I'm using storm
>>> >>>> 1.0.2 and kafka 0.8.1.1.
>>> >>>>
>>> >>>> Could anyone help ?
>>> >>>>
>>> >>>> --
>>> >>>> Thanks
>>> >>>> Zhechao Ma
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Thanks
>>> >>> Zhechao Ma
>>> >>
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Thanks
>>> > Zhechao Ma
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Amber Kulkarni
>>>
>>
>>
>>
>> --
>> Thanks
>> Zhechao Ma
>>
>
>
>
> --
> Thanks
> Zhechao Ma
>



-- 
Thanks
Zhechao Ma

Re: KafkaBolt JSON Serializer Problem

Posted by Zhechao Ma <ma...@gmail.com>.
I modify code in KafkaProducer.java to catch the exception stack, and
finally get the real Exception cause.

java.lang.ClassCastException:
org.apache.storm.shade.org.json.simple.JSONObject cannot be cast to
org.json.simple.JSONObject


So this problem is related to the maven shade plugin.

2016-11-16 19:20 GMT+08:00 Zhechao Ma <ma...@gmail.com>:

> Hi Amber,
>
> Here is the code.
>
> Properties prop = new Properties();
> prop.put("bootstrap.servers", kafkaBrokers);
> prop.put("ack", ack);
> prop.put("key.serializer", keySerializer);
> prop.put("value.serializer", valueSerializer);
>
> KafkaBolt kafkaBolt = new KafkaBolt<String, JSONObject>()
>                 .withProducerProperties(prop)
>                 .withTopicSelector(new DefaultTopicSelector(outputTopic))
>                 .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("conn","httpstream"));
>
> builder.setBolt("kafkabolt", kafkaBolt, kafkaBoltParallelism).shuffleGrouping("normalizeLog", "origin");
>
>
>
>
> And I have found where the Exception throws, that's in *KafkaProduce.java.
> *That's a ClassCastException. I am confused about the class casting,
> because there seems no class casting. My own serializer was posted in my
> last mail.
>
> byte[] serializedValue;
>     try {
>         serializedValue = valueSerializer.serialize(record.topic(), record.value());
>     } catch (ClassCastException cce) {
>       throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
>          " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
>          " specified in value.serializer");}
>
> Any suggestion?
>
> Thanks.
>
>
> 2016-11-16 12:53 GMT+08:00 Amber Kulkarni <am...@gmail.com>:
>
>> Hey,
>>
>> You want to post json string to kafka right ?
>> Also can you post code you are using to post to kafka.
>>
>> On Tue, Nov 15, 2016 at 12:18 PM, Zhechao Ma
>> <ma...@gmail.com> wrote:
>> > Here is the code. I can only get  log in the constructor and configure
>> > method.
>> >
>> > import org.json.simple.JSONObject;
>> > import org.apache.kafka.common.errors.SerializationException;
>> > import org.apache.kafka.common.serialization.Serializer;
>> >
>> > import java.util.Map;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory
>> >
>> > public class JsonSerializer implements Serializer<JSONObject> {
>> >     private static final Logger LOG =
>> > LoggerFactory.getLogger(JsonSerializer.class);
>> >     /**
>> >      * Default constructor needed by Kafka
>> >      */
>> >     public JsonSerializer() {
>> >         LOG.info("===> JsonSerializer constructor !!");
>> >     }
>> >
>> >     @Override
>> >     public void configure(Map<String, ?> config, boolean isKey) {
>> >         LOG.info("===> JsonSerializer configure");
>> >     }
>> >
>> >     @Override
>> >     public byte[] serialize(String topic, JSONObject data) {
>> >         LOG.info("===> JsonSerializer serialize !!");
>> >         if (data == null)
>> >             return null;
>> >         try {
>> >             return data.toString().getBytes("utf-8");
>> >         } catch (Exception e) {
>> >             LOG.error("===> JsonSerializer serialize EXCEPTION");
>> >             throw new SerializationException("Error serializing JSON
>> > message", e);
>> >         }
>> >     }
>> >
>> >     @Override
>> >     public void close() {
>> >         LOG.error("===> JsonSerializer close");
>> >     }
>> > }
>> >
>> >
>> >
>> >
>> > 2016-11-14 20:01 GMT+08:00 Andrew Xor <an...@gmail.com>:
>> >>
>> >> Hi,
>> >>
>> >>  Since you can't cast one type to  another and you are not getting a
>> Null
>> >> exception in order to be better able to help you could you give us the
>> >> implementation of your serializer?
>> >>
>> >> Cheers,
>> >>
>> >> A.
>> >>
>> >> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <
>> mazhechaomaillist@gmail.com>
>> >> wrote:
>> >>>
>> >>> Even when I implement my own json serializer, it still throws the
>> similar
>> >>> exception, but no more details for debug.:
>> >>>
>> >>> org.apache.kafka.common.errors.SerializationException: Can't convert
>> >>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>> xxxxxx
>> >>> specified in value.serializer
>> >>>
>> >>> I add a LOG in the overriding method byte[] serialize(String topic,
>> >>> JSONObject data), and found no logs in worker.log. That is to say this
>> >>> exception is throwed before method serialize is called.
>> >>>
>> >>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <ma...@gmail.com>:
>> >>>>
>> >>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
>> >>>> <String, JSONObject>.
>> >>>> I set both key.serializer and value.serializer as
>> >>>> "org.apache.kafka.common.serialization.StringSerializer". I get the
>> >>>> following Exception:
>> >>>>
>> >>>> org.apache.kafka.common.errors.SerializationException: Can't convert
>> >>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>> class
>> >>>> org.apache.kafka.common.serialization.StringSerializer specified in
>> >>>> value.serializer
>> >>>>
>> >>>>
>> >>>> I cannot find other serializers related to JSON, and I'm using storm
>> >>>> 1.0.2 and kafka 0.8.1.1.
>> >>>>
>> >>>> Could anyone help ?
>> >>>>
>> >>>> --
>> >>>> Thanks
>> >>>> Zhechao Ma
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Thanks
>> >>> Zhechao Ma
>> >>
>> >>
>> >
>> >
>> >
>> > --
>> > Thanks
>> > Zhechao Ma
>>
>>
>>
>> --
>> Regards,
>> Amber Kulkarni
>>
>
>
>
> --
> Thanks
> Zhechao Ma
>



-- 
Thanks
Zhechao Ma

Re: KafkaBolt JSON Serializer Problem

Posted by Zhechao Ma <ma...@gmail.com>.
Hi Amber,

Here is the code.

Properties prop = new Properties();
prop.put("bootstrap.servers", kafkaBrokers);
prop.put("ack", ack);
prop.put("key.serializer", keySerializer);
prop.put("value.serializer", valueSerializer);

KafkaBolt kafkaBolt = new KafkaBolt<String, JSONObject>()
                .withProducerProperties(prop)
                .withTopicSelector(new DefaultTopicSelector(outputTopic))
                .withTupleToKafkaMapper(new
FieldNameBasedTupleToKafkaMapper("conn","httpstream"));

builder.setBolt("kafkabolt", kafkaBolt,
kafkaBoltParallelism).shuffleGrouping("normalizeLog", "origin");




And I have found where the Exception throws, that's in
*KafkaProduce.java. *That's
a ClassCastException. I am confused about the class casting, because there
seems no class casting. My own serializer was posted in my last mail.

byte[] serializedValue;
    try {
        serializedValue = valueSerializer.serialize(record.topic(),
record.value());
    } catch (ClassCastException cce) {
      throw new SerializationException("Can't convert value of class "
+ record.value().getClass().getName() +
         " to class " +
producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()
+
         " specified in value.serializer");}

Any suggestion?

Thanks.


2016-11-16 12:53 GMT+08:00 Amber Kulkarni <am...@gmail.com>:

> Hey,
>
> You want to post json string to kafka right ?
> Also can you post code you are using to post to kafka.
>
> On Tue, Nov 15, 2016 at 12:18 PM, Zhechao Ma
> <ma...@gmail.com> wrote:
> > Here is the code. I can only get  log in the constructor and configure
> > method.
> >
> > import org.json.simple.JSONObject;
> > import org.apache.kafka.common.errors.SerializationException;
> > import org.apache.kafka.common.serialization.Serializer;
> >
> > import java.util.Map;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory
> >
> > public class JsonSerializer implements Serializer<JSONObject> {
> >     private static final Logger LOG =
> > LoggerFactory.getLogger(JsonSerializer.class);
> >     /**
> >      * Default constructor needed by Kafka
> >      */
> >     public JsonSerializer() {
> >         LOG.info("===> JsonSerializer constructor !!");
> >     }
> >
> >     @Override
> >     public void configure(Map<String, ?> config, boolean isKey) {
> >         LOG.info("===> JsonSerializer configure");
> >     }
> >
> >     @Override
> >     public byte[] serialize(String topic, JSONObject data) {
> >         LOG.info("===> JsonSerializer serialize !!");
> >         if (data == null)
> >             return null;
> >         try {
> >             return data.toString().getBytes("utf-8");
> >         } catch (Exception e) {
> >             LOG.error("===> JsonSerializer serialize EXCEPTION");
> >             throw new SerializationException("Error serializing JSON
> > message", e);
> >         }
> >     }
> >
> >     @Override
> >     public void close() {
> >         LOG.error("===> JsonSerializer close");
> >     }
> > }
> >
> >
> >
> >
> > 2016-11-14 20:01 GMT+08:00 Andrew Xor <an...@gmail.com>:
> >>
> >> Hi,
> >>
> >>  Since you can't cast one type to  another and you are not getting a
> Null
> >> exception in order to be better able to help you could you give us the
> >> implementation of your serializer?
> >>
> >> Cheers,
> >>
> >> A.
> >>
> >> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <
> mazhechaomaillist@gmail.com>
> >> wrote:
> >>>
> >>> Even when I implement my own json serializer, it still throws the
> similar
> >>> exception, but no more details for debug.:
> >>>
> >>> org.apache.kafka.common.errors.SerializationException: Can't convert
> >>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
> xxxxxx
> >>> specified in value.serializer
> >>>
> >>> I add a LOG in the overriding method byte[] serialize(String topic,
> >>> JSONObject data), and found no logs in worker.log. That is to say this
> >>> exception is throwed before method serialize is called.
> >>>
> >>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <ma...@gmail.com>:
> >>>>
> >>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
> >>>> <String, JSONObject>.
> >>>> I set both key.serializer and value.serializer as
> >>>> "org.apache.kafka.common.serialization.StringSerializer". I get the
> >>>> following Exception:
> >>>>
> >>>> org.apache.kafka.common.errors.SerializationException: Can't convert
> >>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
> class
> >>>> org.apache.kafka.common.serialization.StringSerializer specified in
> >>>> value.serializer
> >>>>
> >>>>
> >>>> I cannot find other serializers related to JSON, and I'm using storm
> >>>> 1.0.2 and kafka 0.8.1.1.
> >>>>
> >>>> Could anyone help ?
> >>>>
> >>>> --
> >>>> Thanks
> >>>> Zhechao Ma
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Thanks
> >>> Zhechao Ma
> >>
> >>
> >
> >
> >
> > --
> > Thanks
> > Zhechao Ma
>
>
>
> --
> Regards,
> Amber Kulkarni
>



-- 
Thanks
Zhechao Ma

Re: KafkaBolt JSON Serializer Problem

Posted by Amber Kulkarni <am...@gmail.com>.
Hey,

You want to post json string to kafka right ?
Also can you post code you are using to post to kafka.

On Tue, Nov 15, 2016 at 12:18 PM, Zhechao Ma
<ma...@gmail.com> wrote:
> Here is the code. I can only get  log in the constructor and configure
> method.
>
> import org.json.simple.JSONObject;
> import org.apache.kafka.common.errors.SerializationException;
> import org.apache.kafka.common.serialization.Serializer;
>
> import java.util.Map;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory
>
> public class JsonSerializer implements Serializer<JSONObject> {
>     private static final Logger LOG =
> LoggerFactory.getLogger(JsonSerializer.class);
>     /**
>      * Default constructor needed by Kafka
>      */
>     public JsonSerializer() {
>         LOG.info("===> JsonSerializer constructor !!");
>     }
>
>     @Override
>     public void configure(Map<String, ?> config, boolean isKey) {
>         LOG.info("===> JsonSerializer configure");
>     }
>
>     @Override
>     public byte[] serialize(String topic, JSONObject data) {
>         LOG.info("===> JsonSerializer serialize !!");
>         if (data == null)
>             return null;
>         try {
>             return data.toString().getBytes("utf-8");
>         } catch (Exception e) {
>             LOG.error("===> JsonSerializer serialize EXCEPTION");
>             throw new SerializationException("Error serializing JSON
> message", e);
>         }
>     }
>
>     @Override
>     public void close() {
>         LOG.error("===> JsonSerializer close");
>     }
> }
>
>
>
>
> 2016-11-14 20:01 GMT+08:00 Andrew Xor <an...@gmail.com>:
>>
>> Hi,
>>
>>  Since you can't cast one type to  another and you are not getting a Null
>> exception in order to be better able to help you could you give us the
>> implementation of your serializer?
>>
>> Cheers,
>>
>> A.
>>
>> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <ma...@gmail.com>
>> wrote:
>>>
>>> Even when I implement my own json serializer, it still throws the similar
>>> exception, but no more details for debug.:
>>>
>>> org.apache.kafka.common.errors.SerializationException: Can't convert
>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to xxxxxx
>>> specified in value.serializer
>>>
>>> I add a LOG in the overriding method byte[] serialize(String topic,
>>> JSONObject data), and found no logs in worker.log. That is to say this
>>> exception is throwed before method serialize is called.
>>>
>>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <ma...@gmail.com>:
>>>>
>>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
>>>> <String, JSONObject>.
>>>> I set both key.serializer and value.serializer as
>>>> "org.apache.kafka.common.serialization.StringSerializer". I get the
>>>> following Exception:
>>>>
>>>> org.apache.kafka.common.errors.SerializationException: Can't convert
>>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to class
>>>> org.apache.kafka.common.serialization.StringSerializer specified in
>>>> value.serializer
>>>>
>>>>
>>>> I cannot find other serializers related to JSON, and I'm using storm
>>>> 1.0.2 and kafka 0.8.1.1.
>>>>
>>>> Could anyone help ?
>>>>
>>>> --
>>>> Thanks
>>>> Zhechao Ma
>>>
>>>
>>>
>>>
>>> --
>>> Thanks
>>> Zhechao Ma
>>
>>
>
>
>
> --
> Thanks
> Zhechao Ma



-- 
Regards,
Amber Kulkarni

Re: KafkaBolt JSON Serializer Problem

Posted by Zhechao Ma <ma...@gmail.com>.
Here is the code. I can only get  log in the constructor and configure method.

import org.json.simple.JSONObject;import
org.apache.kafka.common.errors.SerializationException;import
org.apache.kafka.common.serialization.Serializer;
import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory
public class JsonSerializer implements Serializer<JSONObject> {
    private static final Logger LOG =
LoggerFactory.getLogger(JsonSerializer.class);
    /**     * Default constructor needed by Kafka     */
    public JsonSerializer() {
        LOG.info("===> JsonSerializer constructor !!");
    }

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        LOG.info("===> JsonSerializer configure");
    }

    @Override
    public byte[] serialize(String topic, JSONObject data) {
        LOG.info("===> JsonSerializer serialize !!");
        if (data == null)
            return null;
        try {
            return data.toString().getBytes("utf-8");
        } catch (Exception e) {
            LOG.error("===> JsonSerializer serialize EXCEPTION");
            throw new SerializationException("Error serializing JSON
message", e);
        }
    }

    @Override
    public void close() {
        LOG.error("===> JsonSerializer close");
    }}




2016-11-14 20:01 GMT+08:00 Andrew Xor <an...@gmail.com>:

> Hi,
>
>  Since you can't cast one type to  another and you are not getting a Null
> exception in order to be better able to help you could you give us the
> implementation of your serializer?
>
> Cheers,
>
> A.
>
> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <ma...@gmail.com>
> wrote:
>
>> Even when I implement my own json serializer, it still throws the similar
>> exception, but no more details for debug.:
>>
>> org.apache.kafka.common.errors.SerializationException: Can't convert
>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>> xxxxxx specified in value.serializer
>>
>> I add a LOG in the overriding method byte[] serialize(String topic,
>> JSONObject data), and found no logs in worker.log. That is to say this
>> exception is throwed before method serialize is called.
>>
>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <ma...@gmail.com>:
>>
>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
>>> <String, JSONObject>.
>>> I set both *key.serializer* and *value.serializer* as
>>> "org.apache.kafka.common.serialization.StringSerializer". I get the
>>> following Exception:
>>>
>>> org.apache.kafka.common.errors.SerializationException: Can't convert
>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>>> class org.apache.kafka.common.serialization.StringSerializer specified
>>> in value.serializer
>>>
>>>
>>> I cannot find other serializers related to JSON, and I'm using storm
>>> 1.0.2 and kafka 0.8.1.1.
>>>
>>> Could anyone help ?
>>>
>>> --
>>> Thanks
>>> Zhechao Ma
>>>
>>
>>
>>
>> --
>> Thanks
>> Zhechao Ma
>>
>
>


-- 
Thanks
Zhechao Ma

Re: KafkaBolt JSON Serializer Problem

Posted by Yuwei Mu <mo...@gmail.com>.
unsubscribe

2016-11-14 20:01 GMT+08:00 Andrew Xor <an...@gmail.com>:

> Hi,
>
>  Since you can't cast one type to  another and you are not getting a Null
> exception in order to be better able to help you could you give us the
> implementation of your serializer?
>
> Cheers,
>
> A.
>
> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <ma...@gmail.com>
> wrote:
>
>> Even when I implement my own json serializer, it still throws the similar
>> exception, but no more details for debug.:
>>
>> org.apache.kafka.common.errors.SerializationException: Can't convert
>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>> xxxxxx specified in value.serializer
>>
>> I add a LOG in the overriding method byte[] serialize(String topic,
>> JSONObject data), and found no logs in worker.log. That is to say this
>> exception is throwed before method serialize is called.
>>
>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <ma...@gmail.com>:
>>
>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
>>> <String, JSONObject>.
>>> I set both *key.serializer* and *value.serializer* as
>>> "org.apache.kafka.common.serialization.StringSerializer". I get the
>>> following Exception:
>>>
>>> org.apache.kafka.common.errors.SerializationException: Can't convert
>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>>> class org.apache.kafka.common.serialization.StringSerializer specified
>>> in value.serializer
>>>
>>>
>>> I cannot find other serializers related to JSON, and I'm using storm
>>> 1.0.2 and kafka 0.8.1.1.
>>>
>>> Could anyone help ?
>>>
>>> --
>>> Thanks
>>> Zhechao Ma
>>>
>>
>>
>>
>> --
>> Thanks
>> Zhechao Ma
>>
>
>

Re: KafkaBolt JSON Serializer Problem

Posted by Andrew Xor <an...@gmail.com>.
Hi,

 Since you can't cast one type to  another and you are not getting a Null
exception in order to be better able to help you could you give us the
implementation of your serializer?

Cheers,

A.

On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <ma...@gmail.com>
wrote:

> Even when I implement my own json serializer, it still throws the similar
> exception, but no more details for debug.:
>
> org.apache.kafka.common.errors.SerializationException: Can't convert
> value of class org.apache.storm.shade.org.json.simple.JSONObject to
> xxxxxx specified in value.serializer
>
> I add a LOG in the overriding method byte[] serialize(String topic,
> JSONObject data), and found no logs in worker.log. That is to say this
> exception is throwed before method serialize is called.
>
> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <ma...@gmail.com>:
>
>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
>> <String, JSONObject>.
>> I set both *key.serializer* and *value.serializer* as
>> "org.apache.kafka.common.serialization.StringSerializer". I get the
>> following Exception:
>>
>> org.apache.kafka.common.errors.SerializationException: Can't convert
>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>> class org.apache.kafka.common.serialization.StringSerializer specified
>> in value.serializer
>>
>>
>> I cannot find other serializers related to JSON, and I'm using storm
>> 1.0.2 and kafka 0.8.1.1.
>>
>> Could anyone help ?
>>
>> --
>> Thanks
>> Zhechao Ma
>>
>
>
>
> --
> Thanks
> Zhechao Ma
>

Re: KafkaBolt JSON Serializer Problem

Posted by Zhechao Ma <ma...@gmail.com>.
Even when I implement my own json serializer, it still throws the similar
exception, but no more details for debug.:

org.apache.kafka.common.errors.SerializationException: Can't convert value
of class org.apache.storm.shade.org.json.simple.JSONObject to xxxxxx
specified in value.serializer

I add a LOG in the overriding method byte[] serialize(String topic,
JSONObject data), and found no logs in worker.log. That is to say this
exception is throwed before method serialize is called.

2016-11-07 16:50 GMT+08:00 Zhechao Ma <ma...@gmail.com>:

> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is <String,
> JSONObject>.
> I set both *key.serializer* and *value.serializer* as
> "org.apache.kafka.common.serialization.StringSerializer". I get the
> following Exception:
>
> org.apache.kafka.common.errors.SerializationException: Can't convert
> value of class org.apache.storm.shade.org.json.simple.JSONObject to class
> org.apache.kafka.common.serialization.StringSerializer specified in
> value.serializer
>
>
> I cannot find other serializers related to JSON, and I'm using storm 1.0.2
> and kafka 0.8.1.1.
>
> Could anyone help ?
>
> --
> Thanks
> Zhechao Ma
>



-- 
Thanks
Zhechao Ma