You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by "Raja.Aravapalli" <Ra...@target.com> on 2016/06/06 16:40:09 UTC

avrò deserialization fails when using kafka

Hi,

I am trying to read data from kafka, and my input in kafka is avro messages.

So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!!

But the tuple deserialization is failing with below error in the log…

Can someone pls share your thoughts and help me fix this?



Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
        at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)



Code FYR:


Application.java file:

public void populateDAG(DAG dag, Configuration conf)
{
  //KafkaSinglePortStringInputOperator kafkaInput =  dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);

  KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());

  AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL"));

  HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);

  //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
  dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
  dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);

}


Operator Code:

public class AvroBytesConversionOperator extends BaseOperator {

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroBytesConversionOperator(){

    }

    public AvroBytesConversionOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;
    }

    /**
     * Defines Input Port - DefaultInputPort
     * Accepts data from the upstream operator
     * Type byte[]
     */
    public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
        @Override
        public void process(byte[] tuple)
        {
            processTuple(tuple);
        }
    };


    /**
     * Defines Output Port - DefaultOutputPort
     * Sends data to the down stream operator which can consume this data
     * Type String
     */
    public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();


    /**
     * Setup call
     */
    @Override
    public void setup(OperatorContext context)
    {
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
    }

    /**
     * Begin window call for the operator.
     * @param windowId
     */
    public void beginWindow(long windowId)
    {

    }

    /**
     * Defines what should be done with each incoming tuple
     */
    protected void processTuple(byte[] tuple)
    {
        GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
        output.emit(record.toString());
    }

    /**
     * End window call for the operator
     * If sending per window, emit the updated counts here.
     */
    @Override
    public void endWindow()
    {

    }

}

Re: avrò deserialization fails when using kafka

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Hey Raja,

By default, the kafka input operator and the conversion operator will run
in different containers. You can set the stream locality to thread_local or
container_local. The input operator is io intensive and the your conversion
operator could be cpu intensive, correct me if I am wrong.
Another practice is you can extend the AbstractKafkaSinglePortInputOperator
and override the getTuple method
Here is an example:

public class AvroKafkaInputOperator extends
AbstractKafkaSinglePortInputOperator<GenericRecord>{

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroKafkaInputOperator(){

    }

    public AvroKafkaInputOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;
    }

    /**
     * Setup call
     */
    @Override
    public void setup(OperatorContext context)
    {
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
    }

    public GenericRecord getTuple(Message msg)
    {
       return decoder.fromBytes(msg.payload().toArray());
    }


}




On Mon, Jun 6, 2016 at 11:54 AM, Raja.Aravapalli <Raja.Aravapalli@target.com
> wrote:

>
> After making the variable transient, it worked fine.
>
> Raja.
>
> From: "Raja.Aravapalli" <Ra...@target.com>
> Date: Monday, June 6, 2016 at 1:52 PM
>
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: avrò deserialization fails when using kafka
>
>
> Thanks a lot Thomas & Ramanath.
>
> Your suggestions helped!! My issue is fixed. Thank you.
>
>
> Regards,
> Raja.
>
> From: Thomas Weise <th...@gmail.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Monday, June 6, 2016 at 12:21 PM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: avrò deserialization fails when using kafka
>
> Since you are creating the decoder in setup(), please mark the property
> transient. No need to checkpoint it.
>
> Thanks,
> Thomas
>
>
> On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ra...@datatorrent.com>
> wrote:
>
>>
>> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
>>
>> Please try the suggestions at the above link.
>>
>> It appears from
>>
>> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
>> that the class does not have a default constructor.
>>
>> Ram
>>
>> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <
>> Raja.Aravapalli@target.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to read data from kafka, and my input in kafka is avro
>>> messages.
>>>
>>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
>>> records from kafka.. And in the next operator I am reading input as
>>> "byte[]” and deserializing the message!!
>>>
>>> But the tuple deserialization is failing with below error in the log…
>>>
>>> Can someone pls share your thoughts and help me fix this?
>>>
>>>
>>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
>>> Serialization trace:
>>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>>> 	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>> 	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>
>>>
>>>
>>> *Code FYR:*
>>>
>>>
>>> *Application.java* file:
>>>
>>> public void populateDAG(DAG dag, Configuration conf)
>>> {
>>>   //KafkaSinglePortStringInputOperator kafkaInput =  dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>>>
>>>   KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>>>
>>>   AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(*“*schemaRegURL"));
>>>
>>>   HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>>>
>>>   //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
>>>   dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
>>>   dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>>>
>>> }
>>>
>>>
>>> *Operator Code:*
>>>
>>> public class AvroBytesConversionOperator extends BaseOperator {
>>>
>>>     private String schemaRegURL;
>>>     private KafkaAvroDecoder decoder;
>>>
>>>     public AvroBytesConversionOperator(){
>>>
>>>     }
>>>
>>>     public AvroBytesConversionOperator(String schemaRegURL){
>>>         this.schemaRegURL = schemaRegURL;
>>>     }
>>>
>>>     /**
>>>      * Defines Input Port - DefaultInputPort
>>>      * Accepts data from the upstream operator
>>>      * Type byte[]
>>>      */
>>>     public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
>>>         @Override
>>>         public void process(byte[] tuple)
>>>         {
>>>             processTuple(tuple);
>>>         }
>>>     };
>>>
>>>
>>>     /**
>>>      * Defines Output Port - DefaultOutputPort
>>>      * Sends data to the down stream operator which can consume this data
>>>      * Type String
>>>      */
>>>     public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
>>>
>>>
>>>     /**
>>>      * Setup call
>>>      */
>>>     @Override
>>>     public void setup(OperatorContext context)
>>>     {
>>>         Properties props = new Properties();
>>>         props.setProperty("schema.registry.url", this.schemaRegURL);
>>>         this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
>>>     }
>>>
>>>     /**
>>>      * Begin window call for the operator.
>>>      * @param windowId
>>>      */
>>>     public void beginWindow(long windowId)
>>>     {
>>>
>>>     }
>>>
>>>     /**
>>>      * Defines what should be done with each incoming tuple
>>>      */
>>>     protected void processTuple(byte[] tuple)
>>>     {
>>>         GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>>>         output.emit(record.toString());
>>>     }
>>>
>>>     /**
>>>      * End window call for the operator
>>>      * If sending per window, emit the updated counts here.
>>>      */
>>>     @Override
>>>     public void endWindow()
>>>     {
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>
>

Re: avrò deserialization fails when using kafka

Posted by "Raja.Aravapalli" <Ra...@target.com>.
After making the variable transient, it worked fine.

Raja.

From: "Raja.Aravapalli" <Ra...@target.com>>
Date: Monday, June 6, 2016 at 1:52 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka


Thanks a lot Thomas & Ramanath.

Your suggestions helped!! My issue is fixed. Thank you.


Regards,
Raja.

From: Thomas Weise <th...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 12:21 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka

Since you are creating the decoder in setup(), please mark the property transient. No need to checkpoint it.

Thanks,
Thomas


On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ra...@datatorrent.com>> wrote:
http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception

Please try the suggestions at the above link.

It appears from
  https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
that the class does not have a default constructor.

Ram

On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi,

I am trying to read data from kafka, and my input in kafka is avro messages.

So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!!

But the tuple deserialization is failing with below error in the log…

Can someone pls share your thoughts and help me fix this?



Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
        at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)



Code FYR:


Application.java file:

public void populateDAG(DAG dag, Configuration conf)
{
  //KafkaSinglePortStringInputOperator kafkaInput =  dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);

  KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());

  AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL"));

  HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);

  //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
  dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
  dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);

}


Operator Code:

public class AvroBytesConversionOperator extends BaseOperator {

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroBytesConversionOperator(){

    }

    public AvroBytesConversionOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;
    }

    /**
     * Defines Input Port - DefaultInputPort
     * Accepts data from the upstream operator
     * Type byte[]
     */
    public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
        @Override
        public void process(byte[] tuple)
        {
            processTuple(tuple);
        }
    };


    /**
     * Defines Output Port - DefaultOutputPort
     * Sends data to the down stream operator which can consume this data
     * Type String
     */
    public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();


    /**
     * Setup call
     */
    @Override
    public void setup(OperatorContext context)
    {
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
    }

    /**
     * Begin window call for the operator.
     * @param windowId
     */
    public void beginWindow(long windowId)
    {

    }

    /**
     * Defines what should be done with each incoming tuple
     */
    protected void processTuple(byte[] tuple)
    {
        GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
        output.emit(record.toString());
    }

    /**
     * End window call for the operator
     * If sending per window, emit the updated counts here.
     */
    @Override
    public void endWindow()
    {

    }

}



Re: avrò deserialization fails when using kafka

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Thanks a lot Thomas & Ramanath.

Your suggestions helped!! My issue is fixed. Thank you.


Regards,
Raja.

From: Thomas Weise <th...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 12:21 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka

Since you are creating the decoder in setup(), please mark the property transient. No need to checkpoint it.

Thanks,
Thomas


On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ra...@datatorrent.com>> wrote:
http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception

Please try the suggestions at the above link.

It appears from
  https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
that the class does not have a default constructor.

Ram

On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi,

I am trying to read data from kafka, and my input in kafka is avro messages.

So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!!

But the tuple deserialization is failing with below error in the log…

Can someone pls share your thoughts and help me fix this?



Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
        at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)



Code FYR:


Application.java file:

public void populateDAG(DAG dag, Configuration conf)
{
  //KafkaSinglePortStringInputOperator kafkaInput =  dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);

  KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());

  AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL"));

  HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);

  //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
  dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
  dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);

}


Operator Code:

public class AvroBytesConversionOperator extends BaseOperator {

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroBytesConversionOperator(){

    }

    public AvroBytesConversionOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;
    }

    /**
     * Defines Input Port - DefaultInputPort
     * Accepts data from the upstream operator
     * Type byte[]
     */
    public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
        @Override
        public void process(byte[] tuple)
        {
            processTuple(tuple);
        }
    };


    /**
     * Defines Output Port - DefaultOutputPort
     * Sends data to the down stream operator which can consume this data
     * Type String
     */
    public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();


    /**
     * Setup call
     */
    @Override
    public void setup(OperatorContext context)
    {
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
    }

    /**
     * Begin window call for the operator.
     * @param windowId
     */
    public void beginWindow(long windowId)
    {

    }

    /**
     * Defines what should be done with each incoming tuple
     */
    protected void processTuple(byte[] tuple)
    {
        GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
        output.emit(record.toString());
    }

    /**
     * End window call for the operator
     * If sending per window, emit the updated counts here.
     */
    @Override
    public void endWindow()
    {

    }

}



Re: avrò deserialization fails when using kafka

Posted by Thomas Weise <th...@gmail.com>.
Since you are creating the decoder in setup(), please mark the property
transient. No need to checkpoint it.

Thanks,
Thomas


On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ra...@datatorrent.com>
wrote:

>
> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
>
> Please try the suggestions at the above link.
>
> It appears from
>
> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
> that the class does not have a default constructor.
>
> Ram
>
> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <
> Raja.Aravapalli@target.com> wrote:
>
>>
>> Hi,
>>
>> I am trying to read data from kafka, and my input in kafka is avro
>> messages.
>>
>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
>> records from kafka.. And in the next operator I am reading input as
>> "byte[]” and deserializing the message!!
>>
>> But the tuple deserialization is failing with below error in the log…
>>
>> Can someone pls share your thoughts and help me fix this?
>>
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
>> Serialization trace:
>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>> 	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>> 	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>
>>
>>
>> *Code FYR:*
>>
>>
>> *Application.java* file:
>>
>> public void populateDAG(DAG dag, Configuration conf)
>> {
>>   //KafkaSinglePortStringInputOperator kafkaInput =  dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>>
>>   KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>>
>>   AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(*“*schemaRegURL"));
>>
>>   HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>>
>>   //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
>>   dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
>>   dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>>
>> }
>>
>>
>> *Operator Code:*
>>
>> public class AvroBytesConversionOperator extends BaseOperator {
>>
>>     private String schemaRegURL;
>>     private KafkaAvroDecoder decoder;
>>
>>     public AvroBytesConversionOperator(){
>>
>>     }
>>
>>     public AvroBytesConversionOperator(String schemaRegURL){
>>         this.schemaRegURL = schemaRegURL;
>>     }
>>
>>     /**
>>      * Defines Input Port - DefaultInputPort
>>      * Accepts data from the upstream operator
>>      * Type byte[]
>>      */
>>     public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
>>         @Override
>>         public void process(byte[] tuple)
>>         {
>>             processTuple(tuple);
>>         }
>>     };
>>
>>
>>     /**
>>      * Defines Output Port - DefaultOutputPort
>>      * Sends data to the down stream operator which can consume this data
>>      * Type String
>>      */
>>     public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
>>
>>
>>     /**
>>      * Setup call
>>      */
>>     @Override
>>     public void setup(OperatorContext context)
>>     {
>>         Properties props = new Properties();
>>         props.setProperty("schema.registry.url", this.schemaRegURL);
>>         this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
>>     }
>>
>>     /**
>>      * Begin window call for the operator.
>>      * @param windowId
>>      */
>>     public void beginWindow(long windowId)
>>     {
>>
>>     }
>>
>>     /**
>>      * Defines what should be done with each incoming tuple
>>      */
>>     protected void processTuple(byte[] tuple)
>>     {
>>         GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>>         output.emit(record.toString());
>>     }
>>
>>     /**
>>      * End window call for the operator
>>      * If sending per window, emit the updated counts here.
>>      */
>>     @Override
>>     public void endWindow()
>>     {
>>
>>     }
>>
>> }
>>
>>
>

Re: avrò deserialization fails when using kafka

Posted by Munagala Ramanath <ra...@datatorrent.com>.
http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception

Please try the suggestions at the above link.

It appears from

https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
that the class does not have a default constructor.

Ram

On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <Ra...@target.com>
wrote:

>
> Hi,
>
> I am trying to read data from kafka, and my input in kafka is avro
> messages.
>
> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
> records from kafka.. And in the next operator I am reading input as
> "byte[]” and deserializing the message!!
>
> But the tuple deserialization is failing with below error in the log…
>
> Can someone pls share your thoughts and help me fix this?
>
>
> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
> Serialization trace:
> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
> 	at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> 	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>
>
>
> *Code FYR:*
>
>
> *Application.java* file:
>
> public void populateDAG(DAG dag, Configuration conf)
> {
>   //KafkaSinglePortStringInputOperator kafkaInput =  dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>
>   KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>
>   AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(*“*schemaRegURL"));
>
>   HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>
>   //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
>   dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
>   dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>
> }
>
>
> *Operator Code:*
>
> public class AvroBytesConversionOperator extends BaseOperator {
>
>     private String schemaRegURL;
>     private KafkaAvroDecoder decoder;
>
>     public AvroBytesConversionOperator(){
>
>     }
>
>     public AvroBytesConversionOperator(String schemaRegURL){
>         this.schemaRegURL = schemaRegURL;
>     }
>
>     /**
>      * Defines Input Port - DefaultInputPort
>      * Accepts data from the upstream operator
>      * Type byte[]
>      */
>     public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() {
>         @Override
>         public void process(byte[] tuple)
>         {
>             processTuple(tuple);
>         }
>     };
>
>
>     /**
>      * Defines Output Port - DefaultOutputPort
>      * Sends data to the down stream operator which can consume this data
>      * Type String
>      */
>     public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
>
>
>     /**
>      * Setup call
>      */
>     @Override
>     public void setup(OperatorContext context)
>     {
>         Properties props = new Properties();
>         props.setProperty("schema.registry.url", this.schemaRegURL);
>         this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
>     }
>
>     /**
>      * Begin window call for the operator.
>      * @param windowId
>      */
>     public void beginWindow(long windowId)
>     {
>
>     }
>
>     /**
>      * Defines what should be done with each incoming tuple
>      */
>     protected void processTuple(byte[] tuple)
>     {
>         GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>         output.emit(record.toString());
>     }
>
>     /**
>      * End window call for the operator
>      * If sending per window, emit the updated counts here.
>      */
>     @Override
>     public void endWindow()
>     {
>
>     }
>
> }
>
>