You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Clint Mcneil <cl...@gmail.com> on 2015/03/17 16:04:04 UTC

schema.registry.url = null

Hi

I can't get the Kafka/Avro serializer producer example to work.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by clint on 3/17/15.
 */
public class Confluent {

    public static void  main (String[] args){

        KafkaProducer<Object, Object> producer;
        Properties propsKafka = new Properties();

        propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
        propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        propsKafka.put("schema.registry.url", "http://localhost:8081");
        producer = new KafkaProducer<Object, Object>(propsKafka);

        String key = "key1";
        String userSchema = "{\"type\":\"record\"," +
                "\"name\":\"myrecord\"," +
                "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);

        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("f1", "value4");

        ProducerRecord<Object, Object> data = new ProducerRecord<Object,
Object>("test", key , avroRecord);
        producer.send(data);

    }
}


The output is:

Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    acks = 1
    batch.size = 16384
    reconnect.backoff.ms = 10
    bootstrap.servers = [localhost:9092]
    receive.buffer.bytes = 32768
    retry.backoff.ms = 100
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class
io.confluent.kafka.serializers.KafkaAvroSerializer
    retries = 0
    max.request.size = 1048576
    block.on.buffer.full = true
    value.serializer = class
io.confluent.kafka.serializers.KafkaAvroSerializer
    metrics.sample.window.ms = 30000
    send.buffer.bytes = 131072
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    linger.ms = 0
    client.id =

Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
logUnused
WARNING: The configuration schema.registry.url = null was supplied but
isn't a known config.

Please help

Thanks

Re: schema.registry.url = null

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Clint,

Your code looks fine and the output doesn't actually have any errors, but
you're also not waiting for the messages to be published. Try changing

producer.send(data);

to

producer.send(data).get();

to wait block until the message has been acked. If it runs and exits
cleanly, then you should be able to see it using a consumer, e.g
kafka-avro-console-consumer.

The warning that you're seeing is due to the KafkaProducer's configuration
class not using the schema.registry.url setting; the same settings are
passed on to the serializers which do use it. It incorrectly reports the
value as null due to a bug, I filed
https://issues.apache.org/jira/browse/KAFKA-2026 to address that.

By the way, for Confluent stuff that's not part of the Apache Kafka
repository, you might want to ask questions on this list instead:
https://groups.google.com/forum/#!forum/confluent-platform


On Tue, Mar 17, 2015 at 8:04 AM, Clint Mcneil <cl...@gmail.com> wrote:

> Hi
>
> I can't get the Kafka/Avro serializer producer example to work.
>
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import java.util.Properties;
>
> /**
>  * Created by clint on 3/17/15.
>  */
> public class Confluent {
>
>     public static void  main (String[] args){
>
>         KafkaProducer<Object, Object> producer;
>         Properties propsKafka = new Properties();
>
>         propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
>         propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
>         propsKafka.put("schema.registry.url", "http://localhost:8081");
>         producer = new KafkaProducer<Object, Object>(propsKafka);
>
>         String key = "key1";
>         String userSchema = "{\"type\":\"record\"," +
>                 "\"name\":\"myrecord\"," +
>                 "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
>
>         Schema.Parser parser = new Schema.Parser();
>         Schema schema = parser.parse(userSchema);
>
>         GenericRecord avroRecord = new GenericData.Record(schema);
>         avroRecord.put("f1", "value4");
>
>         ProducerRecord<Object, Object> data = new ProducerRecord<Object,
> Object>("test", key , avroRecord);
>         producer.send(data);
>
>     }
> }
>
>
> The output is:
>
> Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig
> logAll
> INFO: ProducerConfig values:
>     compression.type = none
>     metric.reporters = []
>     metadata.max.age.ms = 300000
>     metadata.fetch.timeout.ms = 60000
>     acks = 1
>     batch.size = 16384
>     reconnect.backoff.ms = 10
>     bootstrap.servers = [localhost:9092]
>     receive.buffer.bytes = 32768
>     retry.backoff.ms = 100
>     buffer.memory = 33554432
>     timeout.ms = 30000
>     key.serializer = class
> io.confluent.kafka.serializers.KafkaAvroSerializer
>     retries = 0
>     max.request.size = 1048576
>     block.on.buffer.full = true
>     value.serializer = class
> io.confluent.kafka.serializers.KafkaAvroSerializer
>     metrics.sample.window.ms = 30000
>     send.buffer.bytes = 131072
>     max.in.flight.requests.per.connection = 5
>     metrics.num.samples = 2
>     linger.ms = 0
>     client.id =
>
> Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
> logUnused
> WARNING: The configuration schema.registry.url = null was supplied but
> isn't a known config.
>
> Please help
>
> Thanks
>



-- 
Thanks,
Ewen