You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/06 21:10:23 UTC

kafka git commit: kafka-1797; (delta follow-up patch) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 50b734690 -> 517503db2


kafka-1797; (delta follow-up patch) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/517503db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/517503db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/517503db

Branch: refs/heads/trunk
Commit: 517503db2616531b08ee4d08d39c0e1c0bd19e97
Parents: 50b7346
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 6 12:10:04 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 6 12:10:04 2015 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/producer/KafkaProducer.java     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/517503db/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 91c672d..a61c56c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -76,6 +76,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
     private final Time time;
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
+    private final ProducerConfig producerConfig;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -152,6 +153,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
 
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         log.trace("Starting the Kafka producer");
+        this.producerConfig = config;
         this.time = new SystemTime();
         MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                                                       .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
@@ -307,14 +309,16 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
                 serializedKey = keySerializer.serialize(record.topic(), record.key());
             } catch (ClassCastException cce) {
                 throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
-                        " to the one specified in key.serializer");
+                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
+                        " specified in key.serializer");
             }
             byte[] serializedValue;
             try {
                 serializedValue = valueSerializer.serialize(record.topic(), record.value());
             } catch (ClassCastException cce) {
                 throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
-                        " to the one specified in value.serializer");
+                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
+                        " specified in value.serializer");
             }
             ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
             int partition = partitioner.partition(serializedRecord, metadata.fetch());