You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nishanth S <ni...@gmail.com> on 2017/08/16 18:51:42 UTC

Avro With Kafka

Hello,
We are investigating  on  ingesting  avro records to kafka using avro kafka
serializer. Our schemas are nested and are of type record .Does the current
avro kafka serializer support avro record type ?.

If not  is there  a way to ingest  records and consume using a consumer
without  using  avro kafka serializer.Is the avro serializer component part
of apache kafka or confuent?.
I did not see  a way to use  this serializer without using schema registry.

Thanks,
Nishanth

Re: Avro With Kafka

Posted by Kidong Lee <my...@gmail.com>.
Classpath Schema Registry and Consul Schema Registry which I wrote can be
found here:
https://github.com/mykidong/kafka-etl-consumer/tree/master/src/main/java/kafka/etl/deserialize

Classpath Schema Registry can be used like this:

// topic and avro schema classpath properties.
// topic key must be in topic list.
Properties topicAndPathProps = new Properties();
topicAndPathProps.put("item-view-event", "/META-INF/avro/item-view-event.avsc");

AvroDeserializeService avroDeserializeService = new
ClasspathAvroDeserializeService(topicAndPathProps);
Schema schema = avroDeserializeService.getSchema("item-view-event");


And Consul Schema Registry:
// topic and avro schema consul key path properties.
// topic key must be in topic list.
Properties topicAndPathProps = new Properties();
topicAndPathProps.put("item-view-event", "avro-schemas/item-view-event");

// consul agent host and port.
String agentHost = "localhost";
int agentPort = 8500;

AvroDeserializeService avroDeserializeService = new
ConsulAvroDeserializeService(topicAndPathProps, agentHost, agentPort);
Schema schema = avroDeserializeService.getSchema("item-view-event");


- Kidong.


2017-08-18 13:56 GMT+09:00 Kidong Lee <my...@gmail.com>:

> You can send avro record to kafka and consume it without schema registry.
>
> In my approach, avro schema file avsc must be in the classpath on both
> producer and consumer side.
>
> On producer side, first write value avro serializer and set the properties
> of key.serializer and value.serializer to kafka producer configuration.
>
> For instance, the following class is avro serializer for value:
>
> import domain.Event;
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumWriter;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.io.DatumWriter;
> import org.apache.avro.io.Encoder;
> import org.apache.avro.io.EncoderFactory;
> import org.apache.kafka.common.serialization.Serializer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.ByteArrayOutputStream;
> import java.io.IOException;
> import java.util.Map;
>
>
> /**
>  * Created by mykidong on 2016-05-17.
>  */
> public class KafkaAvroEventSerializer implements Serializer<Event> {
>
>     private static Logger log = LoggerFactory.getLogger(KafkaAvroEventSerializer.class);
>
>     private Schema schema;
>
>
>     @Override
>     public void configure(Map<String, ?> map, boolean b) {
>
>         // get avro avsc schema path from kafka configuration.
>
>         String avroSchemaPath = (String) map.get("event.avro.schema.path");
>
>         Schema.Parser parser = new Schema.Parser();
>         try {
>
>             // construct avro schema instance from the classpath.
>
>             schema = parser.parse(getClass().getResourceAsStream(avroSchemaPath));
>         }catch (IOException e)
>         {
>             throw new RuntimeException(e);
>         }
>     }
>
>     @Override
>     public byte[] serialize(String s, Event event) {
>         try {
>             GenericRecord datum = new GenericData.Record(schema);
>             datum.put("eventType", event.getEventType());
>             datum.put("value", event.getValue());
>
>             ByteArrayOutputStream out = new ByteArrayOutputStream();
>             DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
>             Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>             writer.write(datum, encoder);
>             encoder.flush();
>
>             byte[] avroBytes = out.toByteArray();
>             out.close();
>
>             return avroBytes;
>         }
>         catch (Exception e)
>         {
>             throw new RuntimeException(e);
>         }
>     }
>
>     @Override
>     public void close() {
>
>     }
> }
>
>
> Kafka producer will send Event which should be replaced with your message:
>
> Properties kafkaProp = new Properties();
> ..
> kafkaProp.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
>
> // the value avro serializer written in the above.
>
> kafkaProp.put("value.serializer", "your.package.KafkaAvroEventSerializer");
> ..
>
> producer = new KafkaProducer<>(kafkaProp);
>
> producer.send(new ProducerRecord<Integer, Event>(eventType, event));
>
>
> On consumer side, avro schema instance should be cached, because the messages consumed from kafka must be deserialized, which costs some latency.
>
> Avro schema instance can be constructed from the classpath and mapped with event type key like this:
>
> import api.dao.AvroSchemaRegistryDao;
> import org.apache.avro.Schema;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.springframework.beans.factory.InitializingBean;
>
> import java.util.Properties;
> import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.ConcurrentMap;
>
>
> public class MapAvroSchemaRegistryDao implements AvroSchemaRegistryDao, InitializingBean {
>
>     private static Logger log = LoggerFactory.getLogger(MapAvroSchemaRegistryDao.class);
>
>     private final Object LOCK = new Object();
>     private ConcurrentMap<String, Schema> schemaMap = new ConcurrentHashMap<>();
>
>     private Properties eventTypeProps;
>
>     public void setEventTypeProps(Properties eventTypeProps) {
>         this.eventTypeProps = eventTypeProps;
>     }
>
>     @Override
>     public void afterPropertiesSet() throws Exception {
>         for(String eventType : eventTypeProps.stringPropertyNames())
>         {
>             String schemaPath = eventTypeProps.getProperty(eventType);
>
>             Schema.Parser parser = new Schema.Parser();
>             try {
>                 Schema schema = parser.parse(getClass().getResourceAsStream(schemaPath));
>
>                 schemaMap.put(eventType, schema);
>
>                 log.info("loaded avro schema " + eventType);
>
>             }catch (Exception e)
>             {
>                 log.error("load fail avro schema " + eventType);
>                 throw new RuntimeException(e);
>             }
>         }
>     }
>
>
>     @Override
>     public Schema getSchema(String eventType) {
>         return schemaMap.get(eventType);
>     }
>
>     @Override
>     public void update(String eventType, String jsonSchema) {
>
>         synchronized (LOCK) {
>             Schema.Parser parser = new Schema.Parser();
>             try {
>                 Schema schema = parser.parse(jsonSchema);
>                 schemaMap.put(eventType, schema);
>
>             } catch (Exception e) {
>                 throw new RuntimeException(e);
>             }
>         }
>
>     }
> }
>
> Avro Deserialize Service Implementation:
>
> import ieiot.api.dao.AvroSchemaRegistryDao;
> import ieiot.api.service.AvroDeserializeService;
> import ieiot.domain.avro.DeserializedEvent;
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.io.DatumReader;
> import org.apache.avro.io.Decoder;
> import org.apache.avro.io.DecoderFactory;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.nio.ByteBuffer;
> import java.util.List;
>
> /**
>  * Created by mykidong on 2016-05-19.
>  */
> public class AvroDeserializeServiceImpl implements AvroDeserializeService {
>
>     private static Logger log = LoggerFactory.getLogger(AvroDeserializeServiceImpl.class);
>
>     private AvroSchemaRegistryDao avroSchemaRegistryDao;
>
>     // MapAvroSchemaRegistryDao above injected.
>
>     public void setAvroSchemaRegistryDao(AvroSchemaRegistryDao avroSchemaRegistryDao) {
>         this.avroSchemaRegistryDao = avroSchemaRegistryDao;
>     }
>
>
>     /*
>         Kafka Message Deserialize
>     */
>     @Override
>     public GenericRecord deserializeAvro(String eventType, byte[] avroBytes) {
>         // get Avro schema instance by MapAvroSchemaRegistryDao.
>         Schema schema = this.avroSchemaRegistryDao.getSchema(eventType);
>
>         DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
>         Decoder decoder = DecoderFactory.get().binaryDecoder(avroBytes, null);
>
>         try {
>             GenericRecord genericRecord = reader.read(null, decoder);
>
>             return genericRecord;
>         }catch (Exception e)
>         {
>             throw new RuntimeException(e);
>         }
>
>     }
> }
>
>
> Kafka Consumer consumes avro bytes from kafka:
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "xxx");
> props.put("group.id", "xxx");
> ...
> props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
> props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> ...
>
> KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
> consumer.subscribe(topics);
> while(true) {
>     ConsumerRecords<Integer, byte[]> records = consumer.poll(this.timeout);
>
>     for (ConsumerRecord<Integer, byte[]> record : records) {
>
>         // deserialize avro bytes to GenericRecord, but you can get your own message type with modifying deserialize service implementation.
>
>         GenericRecord genericRecord = avroDeserializeService.deserializeAvro(topic, value);
>         ...
>
>     }
> }
>
> In my case, Avro bytes consumed from kafka are deserialized to GenericRecord,
> but you can get your own message type with modifying deserialize service implementation.
>
> I had also questions about if schema registry from confluent should be used or not.
> I have written and used classpath schema registry like the above or the schema registry using consul.
>
> Good luck.
>
> Cheers,
>
> - Kidong.
>
>
>
>
>
>
>
>
>
>
>
> 2017-08-17 3:51 GMT+09:00 Nishanth S <ni...@gmail.com>:
>
>> Hello,
>> We are investigating  on  ingesting  avro records to kafka using avro
>> kafka
>> serializer. Our schemas are nested and are of type record .Does the
>> current
>> avro kafka serializer support avro record type ?.
>>
>> If not  is there  a way to ingest  records and consume using a consumer
>> without  using  avro kafka serializer.Is the avro serializer component
>> part
>> of apache kafka or confuent?.
>> I did not see  a way to use  this serializer without using schema
>> registry.
>>
>> Thanks,
>> Nishanth
>>
>
>

Re: Avro With Kafka

Posted by Stephen Durfey <sj...@gmail.com>.
Yes, the confluent SerDe's support nested avro records. Underneath the covers they are using avro classes (DatumReader and DatumWriter) to carry out those operations. So, as long as you're sending valid avro data to be produced or consumed, the confluent SerDe's will handle it just fine.

________________________________
From: Kidong Lee <my...@gmail.com>
Sent: Thursday, August 17, 2017 11:56:16 PM
To: users@kafka.apache.org
Subject: Re: Avro With Kafka

You can send avro record to kafka and consume it without schema registry.

In my approach, avro schema file avsc must be in the classpath on both
producer and consumer side.

On producer side, first write value avro serializer and set the properties
of key.serializer and value.serializer to kafka producer configuration.

For instance, the following class is avro serializer for value:

import domain.Event;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;


/**
 * Created by mykidong on 2016-05-17.
 */
public class KafkaAvroEventSerializer implements Serializer<Event> {

    private static Logger log =
LoggerFactory.getLogger(KafkaAvroEventSerializer.class);

    private Schema schema;


    @Override
    public void configure(Map<String, ?> map, boolean b) {

        // get avro avsc schema path from kafka configuration.

        String avroSchemaPath = (String) map.get("event.avro.schema.path");

        Schema.Parser parser = new Schema.Parser();
        try {

            // construct avro schema instance from the classpath.

            schema =
parser.parse(getClass().getResourceAsStream(avroSchemaPath));
        }catch (IOException e)
        {
            throw new RuntimeException(e);
        }
    }

    @Override
    public byte[] serialize(String s, Event event) {
        try {
            GenericRecord datum = new GenericData.Record(schema);
            datum.put("eventType", event.getEventType());
            datum.put("value", event.getValue());

            ByteArrayOutputStream out = new ByteArrayOutputStream();
            DatumWriter<GenericRecord> writer = new
GenericDatumWriter<GenericRecord>(schema);
            Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
            writer.write(datum, encoder);
            encoder.flush();

            byte[] avroBytes = out.toByteArray();
            out.close();

            return avroBytes;
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {

    }
}


Kafka producer will send Event which should be replaced with your message:

Properties kafkaProp = new Properties();
..
kafkaProp.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");

// the value avro serializer written in the above.

kafkaProp.put("value.serializer", "your.package.KafkaAvroEventSerializer");
..

producer = new KafkaProducer<>(kafkaProp);

producer.send(new ProducerRecord<Integer, Event>(eventType, event));


On consumer side, avro schema instance should be cached, because the
messages consumed from kafka must be deserialized, which costs some
latency.

Avro schema instance can be constructed from the classpath and mapped
with event type key like this:

import api.dao.AvroSchemaRegistryDao;
import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


public class MapAvroSchemaRegistryDao implements
AvroSchemaRegistryDao, InitializingBean {

    private static Logger log =
LoggerFactory.getLogger(MapAvroSchemaRegistryDao.class);

    private final Object LOCK = new Object();
    private ConcurrentMap<String, Schema> schemaMap = new ConcurrentHashMap<>();

    private Properties eventTypeProps;

    public void setEventTypeProps(Properties eventTypeProps) {
        this.eventTypeProps = eventTypeProps;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        for(String eventType : eventTypeProps.stringPropertyNames())
        {
            String schemaPath = eventTypeProps.getProperty(eventType);

            Schema.Parser parser = new Schema.Parser();
            try {
                Schema schema =
parser.parse(getClass().getResourceAsStream(schemaPath));

                schemaMap.put(eventType, schema);

                log.info("loaded avro schema " + eventType);

            }catch (Exception e)
            {
                log.error("load fail avro schema " + eventType);
                throw new RuntimeException(e);
            }
        }
    }


    @Override
    public Schema getSchema(String eventType) {
        return schemaMap.get(eventType);
    }

    @Override
    public void update(String eventType, String jsonSchema) {

        synchronized (LOCK) {
            Schema.Parser parser = new Schema.Parser();
            try {
                Schema schema = parser.parse(jsonSchema);
                schemaMap.put(eventType, schema);

            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

    }
}

Avro Deserialize Service Implementation:

import ieiot.api.dao.AvroSchemaRegistryDao;
import ieiot.api.service.AvroDeserializeService;
import ieiot.domain.avro.DeserializedEvent;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.List;

/**
 * Created by mykidong on 2016-05-19.
 */
public class AvroDeserializeServiceImpl implements AvroDeserializeService {

    private static Logger log =
LoggerFactory.getLogger(AvroDeserializeServiceImpl.class);

    private AvroSchemaRegistryDao avroSchemaRegistryDao;

    // MapAvroSchemaRegistryDao above injected.

    public void setAvroSchemaRegistryDao(AvroSchemaRegistryDao
avroSchemaRegistryDao) {
        this.avroSchemaRegistryDao = avroSchemaRegistryDao;
    }


    /*
        Kafka Message Deserialize
    */
    @Override
    public GenericRecord deserializeAvro(String eventType, byte[] avroBytes) {
        // get Avro schema instance by MapAvroSchemaRegistryDao.
        Schema schema = this.avroSchemaRegistryDao.getSchema(eventType);

        DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(schema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(avroBytes, null);

        try {
            GenericRecord genericRecord = reader.read(null, decoder);

            return genericRecord;
        }catch (Exception e)
        {
            throw new RuntimeException(e);
        }

    }
}


Kafka Consumer consumes avro bytes from kafka:

Properties props = new Properties();
props.put("bootstrap.servers", "xxx");
props.put("group.id", "xxx");
...
props.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
...

KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
while(true) {
    ConsumerRecords<Integer, byte[]> records = consumer.poll(this.timeout);

    for (ConsumerRecord<Integer, byte[]> record : records) {

        // deserialize avro bytes to GenericRecord, but you can get
your own message type with modifying deserialize service
implementation.

        GenericRecord genericRecord =
avroDeserializeService.deserializeAvro(topic, value);
        ...

    }
}

In my case, Avro bytes consumed from kafka are deserialized to GenericRecord,
but you can get your own message type with modifying deserialize
service implementation.

I had also questions about if schema registry from confluent should be
used or not.
I have written and used classpath schema registry like the above or
the schema registry using consul.

Good luck.

Cheers,

- Kidong.











2017-08-17 3:51 GMT+09:00 Nishanth S <ni...@gmail.com>:

> Hello,
> We are investigating  on  ingesting  avro records to kafka using avro kafka
> serializer. Our schemas are nested and are of type record .Does the current
> avro kafka serializer support avro record type ?.
>
> If not  is there  a way to ingest  records and consume using a consumer
> without  using  avro kafka serializer.Is the avro serializer component part
> of apache kafka or confuent?.
> I did not see  a way to use  this serializer without using schema registry.
>
> Thanks,
> Nishanth
>

Re: Avro With Kafka

Posted by Kidong Lee <my...@gmail.com>.
You can send avro record to kafka and consume it without schema registry.

In my approach, avro schema file avsc must be in the classpath on both
producer and consumer side.

On producer side, first write value avro serializer and set the properties
of key.serializer and value.serializer to kafka producer configuration.

For instance, the following class is avro serializer for value:

import domain.Event;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;


/**
 * Created by mykidong on 2016-05-17.
 */
public class KafkaAvroEventSerializer implements Serializer<Event> {

    private static Logger log =
LoggerFactory.getLogger(KafkaAvroEventSerializer.class);

    private Schema schema;


    @Override
    public void configure(Map<String, ?> map, boolean b) {

        // get avro avsc schema path from kafka configuration.

        String avroSchemaPath = (String) map.get("event.avro.schema.path");

        Schema.Parser parser = new Schema.Parser();
        try {

            // construct avro schema instance from the classpath.

            schema =
parser.parse(getClass().getResourceAsStream(avroSchemaPath));
        }catch (IOException e)
        {
            throw new RuntimeException(e);
        }
    }

    @Override
    public byte[] serialize(String s, Event event) {
        try {
            GenericRecord datum = new GenericData.Record(schema);
            datum.put("eventType", event.getEventType());
            datum.put("value", event.getValue());

            ByteArrayOutputStream out = new ByteArrayOutputStream();
            DatumWriter<GenericRecord> writer = new
GenericDatumWriter<GenericRecord>(schema);
            Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
            writer.write(datum, encoder);
            encoder.flush();

            byte[] avroBytes = out.toByteArray();
            out.close();

            return avroBytes;
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {

    }
}


Kafka producer will send Event which should be replaced with your message:

Properties kafkaProp = new Properties();
..
kafkaProp.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");

// the value avro serializer written in the above.

kafkaProp.put("value.serializer", "your.package.KafkaAvroEventSerializer");
..

producer = new KafkaProducer<>(kafkaProp);

producer.send(new ProducerRecord<Integer, Event>(eventType, event));


On consumer side, avro schema instance should be cached, because the
messages consumed from kafka must be deserialized, which costs some
latency.

Avro schema instance can be constructed from the classpath and mapped
with event type key like this:

import api.dao.AvroSchemaRegistryDao;
import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


public class MapAvroSchemaRegistryDao implements
AvroSchemaRegistryDao, InitializingBean {

    private static Logger log =
LoggerFactory.getLogger(MapAvroSchemaRegistryDao.class);

    private final Object LOCK = new Object();
    private ConcurrentMap<String, Schema> schemaMap = new ConcurrentHashMap<>();

    private Properties eventTypeProps;

    public void setEventTypeProps(Properties eventTypeProps) {
        this.eventTypeProps = eventTypeProps;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        for(String eventType : eventTypeProps.stringPropertyNames())
        {
            String schemaPath = eventTypeProps.getProperty(eventType);

            Schema.Parser parser = new Schema.Parser();
            try {
                Schema schema =
parser.parse(getClass().getResourceAsStream(schemaPath));

                schemaMap.put(eventType, schema);

                log.info("loaded avro schema " + eventType);

            }catch (Exception e)
            {
                log.error("load fail avro schema " + eventType);
                throw new RuntimeException(e);
            }
        }
    }


    @Override
    public Schema getSchema(String eventType) {
        return schemaMap.get(eventType);
    }

    @Override
    public void update(String eventType, String jsonSchema) {

        synchronized (LOCK) {
            Schema.Parser parser = new Schema.Parser();
            try {
                Schema schema = parser.parse(jsonSchema);
                schemaMap.put(eventType, schema);

            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

    }
}

Avro Deserialize Service Implementation:

import ieiot.api.dao.AvroSchemaRegistryDao;
import ieiot.api.service.AvroDeserializeService;
import ieiot.domain.avro.DeserializedEvent;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.List;

/**
 * Created by mykidong on 2016-05-19.
 */
public class AvroDeserializeServiceImpl implements AvroDeserializeService {

    private static Logger log =
LoggerFactory.getLogger(AvroDeserializeServiceImpl.class);

    private AvroSchemaRegistryDao avroSchemaRegistryDao;

    // MapAvroSchemaRegistryDao above injected.

    public void setAvroSchemaRegistryDao(AvroSchemaRegistryDao
avroSchemaRegistryDao) {
        this.avroSchemaRegistryDao = avroSchemaRegistryDao;
    }


    /*
        Kafka Message Deserialize
    */
    @Override
    public GenericRecord deserializeAvro(String eventType, byte[] avroBytes) {
        // get Avro schema instance by MapAvroSchemaRegistryDao.
        Schema schema = this.avroSchemaRegistryDao.getSchema(eventType);

        DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(schema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(avroBytes, null);

        try {
            GenericRecord genericRecord = reader.read(null, decoder);

            return genericRecord;
        }catch (Exception e)
        {
            throw new RuntimeException(e);
        }

    }
}


Kafka Consumer consumes avro bytes from kafka:

Properties props = new Properties();
props.put("bootstrap.servers", "xxx");
props.put("group.id", "xxx");
...
props.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
...

KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
while(true) {
    ConsumerRecords<Integer, byte[]> records = consumer.poll(this.timeout);

    for (ConsumerRecord<Integer, byte[]> record : records) {

        // deserialize avro bytes to GenericRecord, but you can get
your own message type with modifying deserialize service
implementation.

        GenericRecord genericRecord =
avroDeserializeService.deserializeAvro(topic, value);
        ...

    }
}

In my case, Avro bytes consumed from kafka are deserialized to GenericRecord,
but you can get your own message type with modifying deserialize
service implementation.

I had also questions about if schema registry from confluent should be
used or not.
I have written and used classpath schema registry like the above or
the schema registry using consul.

Good luck.

Cheers,

- Kidong.











2017-08-17 3:51 GMT+09:00 Nishanth S <ni...@gmail.com>:

> Hello,
> We are investigating  on  ingesting  avro records to kafka using avro kafka
> serializer. Our schemas are nested and are of type record .Does the current
> avro kafka serializer support avro record type ?.
>
> If not  is there  a way to ingest  records and consume using a consumer
> without  using  avro kafka serializer.Is the avro serializer component part
> of apache kafka or confuent?.
> I did not see  a way to use  this serializer without using schema registry.
>
> Thanks,
> Nishanth
>