You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/06 21:10:23 UTC
kafka git commit: kafka-1797;
(delta follow-up patch) add the serializer/deserializer api to the
new java client; patched by Jun Rao; reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 50b734690 -> 517503db2
kafka-1797; (delta follow-up patch) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/517503db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/517503db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/517503db
Branch: refs/heads/trunk
Commit: 517503db2616531b08ee4d08d39c0e1c0bd19e97
Parents: 50b7346
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 6 12:10:04 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 6 12:10:04 2015 -0800
----------------------------------------------------------------------
.../org/apache/kafka/clients/producer/KafkaProducer.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/517503db/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 91c672d..a61c56c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -76,6 +76,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
private final Time time;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
+ private final ProducerConfig producerConfig;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -152,6 +153,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
log.trace("Starting the Kafka producer");
+ this.producerConfig = config;
this.time = new SystemTime();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
@@ -307,14 +309,16 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
- " to the one specified in key.serializer");
+ " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
+ " specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
- " to the one specified in value.serializer");
+ " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
+ " specified in value.serializer");
}
ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
int partition = partitioner.partition(serializedRecord, metadata.fetch());