You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Clint Mcneil <cl...@gmail.com> on 2015/03/17 16:04:04 UTC
schema.registry.url = null
Hi
I can't get the Kafka/Avro serializer producer example to work.
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by clint on 3/17/15.
*/
public class Confluent {
public static void main (String[] args){
KafkaProducer<Object, Object> producer;
Properties propsKafka = new Properties();
propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
propsKafka.put("schema.registry.url", "http://localhost:8081");
producer = new KafkaProducer<Object, Object>(propsKafka);
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value4");
ProducerRecord<Object, Object> data = new ProducerRecord<Object,
Object>("test", key , avroRecord);
producer.send(data);
}
}
The output is:
Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
acks = 1
batch.size = 16384
reconnect.backoff.ms = 10
bootstrap.servers = [localhost:9092]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class
io.confluent.kafka.serializers.KafkaAvroSerializer
retries = 0
max.request.size = 1048576
block.on.buffer.full = true
value.serializer = class
io.confluent.kafka.serializers.KafkaAvroSerializer
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 0
client.id =
Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
logUnused
WARNING: The configuration schema.registry.url = null was supplied but
isn't a known config.
Please help
Thanks
Re: schema.registry.url = null
Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Clint,
Your code looks fine and the output doesn't actually have any errors, but
you're also not waiting for the messages to be published. Try changing
producer.send(data);
to
producer.send(data).get();
to wait block until the message has been acked. If it runs and exits
cleanly, then you should be able to see it using a consumer, e.g
kafka-avro-console-consumer.
The warning that you're seeing is due to the KafkaProducer's configuration
class not using the schema.registry.url setting; the same settings are
passed on to the serializers which do use it. It incorrectly reports the
value as null due to a bug, I filed
https://issues.apache.org/jira/browse/KAFKA-2026 to address that.
By the way, for Confluent stuff that's not part of the Apache Kafka
repository, you might want to ask questions on this list instead:
https://groups.google.com/forum/#!forum/confluent-platform
On Tue, Mar 17, 2015 at 8:04 AM, Clint Mcneil <cl...@gmail.com> wrote:
> Hi
>
> I can't get the Kafka/Avro serializer producer example to work.
>
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import java.util.Properties;
>
> /**
> * Created by clint on 3/17/15.
> */
> public class Confluent {
>
> public static void main (String[] args){
>
> KafkaProducer<Object, Object> producer;
> Properties propsKafka = new Properties();
>
> propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
> propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
> propsKafka.put("schema.registry.url", "http://localhost:8081");
> producer = new KafkaProducer<Object, Object>(propsKafka);
>
> String key = "key1";
> String userSchema = "{\"type\":\"record\"," +
> "\"name\":\"myrecord\"," +
> "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
>
> Schema.Parser parser = new Schema.Parser();
> Schema schema = parser.parse(userSchema);
>
> GenericRecord avroRecord = new GenericData.Record(schema);
> avroRecord.put("f1", "value4");
>
> ProducerRecord<Object, Object> data = new ProducerRecord<Object,
> Object>("test", key , avroRecord);
> producer.send(data);
>
> }
> }
>
>
> The output is:
>
> Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig
> logAll
> INFO: ProducerConfig values:
> compression.type = none
> metric.reporters = []
> metadata.max.age.ms = 300000
> metadata.fetch.timeout.ms = 60000
> acks = 1
> batch.size = 16384
> reconnect.backoff.ms = 10
> bootstrap.servers = [localhost:9092]
> receive.buffer.bytes = 32768
> retry.backoff.ms = 100
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> io.confluent.kafka.serializers.KafkaAvroSerializer
> retries = 0
> max.request.size = 1048576
> block.on.buffer.full = true
> value.serializer = class
> io.confluent.kafka.serializers.KafkaAvroSerializer
> metrics.sample.window.ms = 30000
> send.buffer.bytes = 131072
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> linger.ms = 0
> client.id =
>
> Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
> logUnused
> WARNING: The configuration schema.registry.url = null was supplied but
> isn't a known config.
>
> Please help
>
> Thanks
>
--
Thanks,
Ewen