You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by "Raja.Aravapalli" <Ra...@target.com> on 2016/06/06 16:40:09 UTC
avrò deserialization fails when using kafka
Hi,
I am trying to read data from kafka, and my input in kafka is avro messages.
So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!!
But the tuple deserialization is failing with below error in the log…
Can someone pls share your thoughts and help me fix this?
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
Code FYR:
Application.java file:
public void populateDAG(DAG dag, Configuration conf)
{
//KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL"));
HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
//dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
}
Operator Code:
public class AvroBytesConversionOperator extends BaseOperator {
private String schemaRegURL;
private KafkaAvroDecoder decoder;
public AvroBytesConversionOperator(){
}
public AvroBytesConversionOperator(String schemaRegURL){
this.schemaRegURL = schemaRegURL;
}
/**
* Defines Input Port - DefaultInputPort
* Accepts data from the upstream operator
* Type byte[]
*/
public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
@Override
public void process(byte[] tuple)
{
processTuple(tuple);
}
};
/**
* Defines Output Port - DefaultOutputPort
* Sends data to the down stream operator which can consume this data
* Type String
*/
public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
/**
* Setup call
*/
@Override
public void setup(OperatorContext context)
{
Properties props = new Properties();
props.setProperty("schema.registry.url", this.schemaRegURL);
this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
}
/**
* Begin window call for the operator.
* @param windowId
*/
public void beginWindow(long windowId)
{
}
/**
* Defines what should be done with each incoming tuple
*/
protected void processTuple(byte[] tuple)
{
GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
output.emit(record.toString());
}
/**
* End window call for the operator
* If sending per window, emit the updated counts here.
*/
@Override
public void endWindow()
{
}
}
Re: avrò deserialization fails when using kafka
Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Hey Raja,
By default, the kafka input operator and the conversion operator will run
in different containers. You can set the stream locality to thread_local or
container_local. The input operator is io intensive and the your conversion
operator could be cpu intensive, correct me if I am wrong.
Another practice is you can extend the AbstractKafkaSinglePortInputOperator
and override the getTuple method
Here is an example:
public class AvroKafkaInputOperator extends
AbstractKafkaSinglePortInputOperator<GenericRecord>{
private String schemaRegURL;
private KafkaAvroDecoder decoder;
public AvroKafkaInputOperator(){
}
public AvroKafkaInputOperator(String schemaRegURL){
this.schemaRegURL = schemaRegURL;
}
/**
* Setup call
*/
@Override
public void setup(OperatorContext context)
{
Properties props = new Properties();
props.setProperty("schema.registry.url", this.schemaRegURL);
this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
}
public GenericRecord getTuple(Message msg)
{
return decoder.fromBytes(msg.payload().toArray());
}
}
On Mon, Jun 6, 2016 at 11:54 AM, Raja.Aravapalli <Raja.Aravapalli@target.com
> wrote:
>
> After making the variable transient, it worked fine.
>
> Raja.
>
> From: "Raja.Aravapalli" <Ra...@target.com>
> Date: Monday, June 6, 2016 at 1:52 PM
>
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: avrò deserialization fails when using kafka
>
>
> Thanks a lot Thomas & Ramanath.
>
> Your suggestions helped!! My issue is fixed. Thank you.
>
>
> Regards,
> Raja.
>
> From: Thomas Weise <th...@gmail.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Monday, June 6, 2016 at 12:21 PM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: avrò deserialization fails when using kafka
>
> Since you are creating the decoder in setup(), please mark the property
> transient. No need to checkpoint it.
>
> Thanks,
> Thomas
>
>
> On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ra...@datatorrent.com>
> wrote:
>
>>
>> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
>>
>> Please try the suggestions at the above link.
>>
>> It appears from
>>
>> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
>> that the class does not have a default constructor.
>>
>> Ram
>>
>> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <
>> Raja.Aravapalli@target.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to read data from kafka, and my input in kafka is avro
>>> messages.
>>>
>>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
>>> records from kafka.. And in the next operator I am reading input as
>>> "byte[]” and deserializing the message!!
>>>
>>> But the tuple deserialization is failing with below error in the log…
>>>
>>> Can someone pls share your thoughts and help me fix this?
>>>
>>>
>>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
>>> Serialization trace:
>>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>>> at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>> at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>>> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>
>>>
>>>
>>> *Code FYR:*
>>>
>>>
>>> *Application.java* file:
>>>
>>> public void populateDAG(DAG dag, Configuration conf)
>>> {
>>> //KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>>>
>>> KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>>>
>>> AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(*“*schemaRegURL"));
>>>
>>> HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>>>
>>> //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
>>> dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
>>> dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>>>
>>> }
>>>
>>>
>>> *Operator Code:*
>>>
>>> public class AvroBytesConversionOperator extends BaseOperator {
>>>
>>> private String schemaRegURL;
>>> private KafkaAvroDecoder decoder;
>>>
>>> public AvroBytesConversionOperator(){
>>>
>>> }
>>>
>>> public AvroBytesConversionOperator(String schemaRegURL){
>>> this.schemaRegURL = schemaRegURL;
>>> }
>>>
>>> /**
>>> * Defines Input Port - DefaultInputPort
>>> * Accepts data from the upstream operator
>>> * Type byte[]
>>> */
>>> public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
>>> @Override
>>> public void process(byte[] tuple)
>>> {
>>> processTuple(tuple);
>>> }
>>> };
>>>
>>>
>>> /**
>>> * Defines Output Port - DefaultOutputPort
>>> * Sends data to the down stream operator which can consume this data
>>> * Type String
>>> */
>>> public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
>>>
>>>
>>> /**
>>> * Setup call
>>> */
>>> @Override
>>> public void setup(OperatorContext context)
>>> {
>>> Properties props = new Properties();
>>> props.setProperty("schema.registry.url", this.schemaRegURL);
>>> this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
>>> }
>>>
>>> /**
>>> * Begin window call for the operator.
>>> * @param windowId
>>> */
>>> public void beginWindow(long windowId)
>>> {
>>>
>>> }
>>>
>>> /**
>>> * Defines what should be done with each incoming tuple
>>> */
>>> protected void processTuple(byte[] tuple)
>>> {
>>> GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>>> output.emit(record.toString());
>>> }
>>>
>>> /**
>>> * End window call for the operator
>>> * If sending per window, emit the updated counts here.
>>> */
>>> @Override
>>> public void endWindow()
>>> {
>>>
>>> }
>>>
>>> }
>>>
>>>
>>
>
Re: avrò deserialization fails when using kafka
Posted by "Raja.Aravapalli" <Ra...@target.com>.
After making the variable transient, it worked fine.
Raja.
From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Monday, June 6, 2016 at 1:52 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka
Thanks a lot Thomas & Ramanath.
Your suggestions helped!! My issue is fixed. Thank you.
Regards,
Raja.
From: Thomas Weise <th...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 12:21 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka
Since you are creating the decoder in setup(), please mark the property transient. No need to checkpoint it.
Thanks,
Thomas
On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ra...@datatorrent.com>> wrote:
http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
Please try the suggestions at the above link.
It appears from
https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
that the class does not have a default constructor.
Ram
On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <Ra...@target.com>> wrote:
Hi,
I am trying to read data from kafka, and my input in kafka is avro messages.
So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!!
But the tuple deserialization is failing with below error in the log…
Can someone pls share your thoughts and help me fix this?
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
Code FYR:
Application.java file:
public void populateDAG(DAG dag, Configuration conf)
{
//KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL"));
HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
//dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
}
Operator Code:
public class AvroBytesConversionOperator extends BaseOperator {
private String schemaRegURL;
private KafkaAvroDecoder decoder;
public AvroBytesConversionOperator(){
}
public AvroBytesConversionOperator(String schemaRegURL){
this.schemaRegURL = schemaRegURL;
}
/**
* Defines Input Port - DefaultInputPort
* Accepts data from the upstream operator
* Type byte[]
*/
public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
@Override
public void process(byte[] tuple)
{
processTuple(tuple);
}
};
/**
* Defines Output Port - DefaultOutputPort
* Sends data to the down stream operator which can consume this data
* Type String
*/
public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
/**
* Setup call
*/
@Override
public void setup(OperatorContext context)
{
Properties props = new Properties();
props.setProperty("schema.registry.url", this.schemaRegURL);
this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
}
/**
* Begin window call for the operator.
* @param windowId
*/
public void beginWindow(long windowId)
{
}
/**
* Defines what should be done with each incoming tuple
*/
protected void processTuple(byte[] tuple)
{
GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
output.emit(record.toString());
}
/**
* End window call for the operator
* If sending per window, emit the updated counts here.
*/
@Override
public void endWindow()
{
}
}
Re: avrò deserialization fails when using kafka
Posted by "Raja.Aravapalli" <Ra...@target.com>.
Thanks a lot Thomas & Ramanath.
Your suggestions helped!! My issue is fixed. Thank you.
Regards,
Raja.
From: Thomas Weise <th...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 12:21 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka
Since you are creating the decoder in setup(), please mark the property transient. No need to checkpoint it.
Thanks,
Thomas
On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ra...@datatorrent.com>> wrote:
http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
Please try the suggestions at the above link.
It appears from
https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
that the class does not have a default constructor.
Ram
On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <Ra...@target.com>> wrote:
Hi,
I am trying to read data from kafka, and my input in kafka is avro messages.
So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!!
But the tuple deserialization is failing with below error in the log…
Can someone pls share your thoughts and help me fix this?
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
Code FYR:
Application.java file:
public void populateDAG(DAG dag, Configuration conf)
{
//KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL"));
HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
//dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
}
Operator Code:
public class AvroBytesConversionOperator extends BaseOperator {
private String schemaRegURL;
private KafkaAvroDecoder decoder;
public AvroBytesConversionOperator(){
}
public AvroBytesConversionOperator(String schemaRegURL){
this.schemaRegURL = schemaRegURL;
}
/**
* Defines Input Port - DefaultInputPort
* Accepts data from the upstream operator
* Type byte[]
*/
public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
@Override
public void process(byte[] tuple)
{
processTuple(tuple);
}
};
/**
* Defines Output Port - DefaultOutputPort
* Sends data to the down stream operator which can consume this data
* Type String
*/
public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
/**
* Setup call
*/
@Override
public void setup(OperatorContext context)
{
Properties props = new Properties();
props.setProperty("schema.registry.url", this.schemaRegURL);
this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
}
/**
* Begin window call for the operator.
* @param windowId
*/
public void beginWindow(long windowId)
{
}
/**
* Defines what should be done with each incoming tuple
*/
protected void processTuple(byte[] tuple)
{
GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
output.emit(record.toString());
}
/**
* End window call for the operator
* If sending per window, emit the updated counts here.
*/
@Override
public void endWindow()
{
}
}
Re: avrò deserialization fails when using kafka
Posted by Thomas Weise <th...@gmail.com>.
Since you are creating the decoder in setup(), please mark the property
transient. No need to checkpoint it.
Thanks,
Thomas
On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ra...@datatorrent.com>
wrote:
>
> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
>
> Please try the suggestions at the above link.
>
> It appears from
>
> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
> that the class does not have a default constructor.
>
> Ram
>
> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <
> Raja.Aravapalli@target.com> wrote:
>
>>
>> Hi,
>>
>> I am trying to read data from kafka, and my input in kafka is avro
>> messages.
>>
>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
>> records from kafka.. And in the next operator I am reading input as
>> "byte[]” and deserializing the message!!
>>
>> But the tuple deserialization is failing with below error in the log…
>>
>> Can someone pls share your thoughts and help me fix this?
>>
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
>> Serialization trace:
>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>> at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>
>>
>>
>> *Code FYR:*
>>
>>
>> *Application.java* file:
>>
>> public void populateDAG(DAG dag, Configuration conf)
>> {
>> //KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>>
>> KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>>
>> AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(*“*schemaRegURL"));
>>
>> HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>>
>> //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
>> dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
>> dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>>
>> }
>>
>>
>> *Operator Code:*
>>
>> public class AvroBytesConversionOperator extends BaseOperator {
>>
>> private String schemaRegURL;
>> private KafkaAvroDecoder decoder;
>>
>> public AvroBytesConversionOperator(){
>>
>> }
>>
>> public AvroBytesConversionOperator(String schemaRegURL){
>> this.schemaRegURL = schemaRegURL;
>> }
>>
>> /**
>> * Defines Input Port - DefaultInputPort
>> * Accepts data from the upstream operator
>> * Type byte[]
>> */
>> public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
>> @Override
>> public void process(byte[] tuple)
>> {
>> processTuple(tuple);
>> }
>> };
>>
>>
>> /**
>> * Defines Output Port - DefaultOutputPort
>> * Sends data to the down stream operator which can consume this data
>> * Type String
>> */
>> public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
>>
>>
>> /**
>> * Setup call
>> */
>> @Override
>> public void setup(OperatorContext context)
>> {
>> Properties props = new Properties();
>> props.setProperty("schema.registry.url", this.schemaRegURL);
>> this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
>> }
>>
>> /**
>> * Begin window call for the operator.
>> * @param windowId
>> */
>> public void beginWindow(long windowId)
>> {
>>
>> }
>>
>> /**
>> * Defines what should be done with each incoming tuple
>> */
>> protected void processTuple(byte[] tuple)
>> {
>> GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>> output.emit(record.toString());
>> }
>>
>> /**
>> * End window call for the operator
>> * If sending per window, emit the updated counts here.
>> */
>> @Override
>> public void endWindow()
>> {
>>
>> }
>>
>> }
>>
>>
>
Re: avrò deserialization fails when using kafka
Posted by Munagala Ramanath <ra...@datatorrent.com>.
http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
Please try the suggestions at the above link.
It appears from
https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
that the class does not have a default constructor.
Ram
On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <Ra...@target.com>
wrote:
>
> Hi,
>
> I am trying to read data from kafka, and my input in kafka is avro
> messages.
>
> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
> records from kafka.. And in the next operator I am reading input as
> "byte[]” and deserializing the message!!
>
> But the tuple deserialization is failing with below error in the log…
>
> Can someone pls share your thoughts and help me fix this?
>
>
> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
> Serialization trace:
> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
> at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>
>
>
> *Code FYR:*
>
>
> *Application.java* file:
>
> public void populateDAG(DAG dag, Configuration conf)
> {
> //KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>
> KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>
> AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(*“*schemaRegURL"));
>
> HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>
> //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
> dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
> dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>
> }
>
>
> *Operator Code:*
>
> public class AvroBytesConversionOperator extends BaseOperator {
>
> private String schemaRegURL;
> private KafkaAvroDecoder decoder;
>
> public AvroBytesConversionOperator(){
>
> }
>
> public AvroBytesConversionOperator(String schemaRegURL){
> this.schemaRegURL = schemaRegURL;
> }
>
> /**
> * Defines Input Port - DefaultInputPort
> * Accepts data from the upstream operator
> * Type byte[]
> */
> public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
> @Override
> public void process(byte[] tuple)
> {
> processTuple(tuple);
> }
> };
>
>
> /**
> * Defines Output Port - DefaultOutputPort
> * Sends data to the down stream operator which can consume this data
> * Type String
> */
> public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
>
>
> /**
> * Setup call
> */
> @Override
> public void setup(OperatorContext context)
> {
> Properties props = new Properties();
> props.setProperty("schema.registry.url", this.schemaRegURL);
> this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
> }
>
> /**
> * Begin window call for the operator.
> * @param windowId
> */
> public void beginWindow(long windowId)
> {
>
> }
>
> /**
> * Defines what should be done with each incoming tuple
> */
> protected void processTuple(byte[] tuple)
> {
> GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
> output.emit(record.toString());
> }
>
> /**
> * End window call for the operator
> * If sending per window, emit the updated counts here.
> */
> @Override
> public void endWindow()
> {
>
> }
>
> }
>
>