You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/13 06:36:44 UTC

kafka git commit: kafka-1797; (missed parametric in a few files) add the serializer/deserializer api to the new java client; patched by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 14779dddb -> 6f4dea9db


kafka-1797; (missed parametric in a few files) add the serializer/deserializer api to the new java client; patched by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f4dea9d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f4dea9d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f4dea9d

Branch: refs/heads/trunk
Commit: 6f4dea9dbce5cc9f69a917182981b41a56a98a85
Parents: 14779dd
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Jan 12 18:45:45 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 12 21:29:40 2015 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/producer/KafkaProducer.java    | 2 +-
 .../org/apache/kafka/clients/producer/MockProducerTest.java | 9 +++------
 core/src/main/scala/kafka/tools/MirrorMaker.scala           | 2 +-
 3 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f4dea9d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a61c56c..d3abeb1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -320,7 +320,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in value.serializer");
             }
-            ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
+            ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
             int partition = partitioner.partition(serializedRecord, metadata.fetch());
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f4dea9d/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 1e2ca03..3676b05 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -25,9 +25,6 @@ import static org.junit.Assert.fail;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.junit.Test;
 
 public class MockProducerTest {
@@ -37,7 +34,7 @@ public class MockProducerTest {
     @Test
     public void testAutoCompleteMock() throws Exception {
         MockProducer producer = new MockProducer(true);
-        ProducerRecord record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());
         Future<RecordMetadata> metadata = producer.send(record);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
@@ -51,8 +48,8 @@ public class MockProducerTest {
     @Test
     public void testManualCompletion() throws Exception {
         MockProducer producer = new MockProducer(false);
-        ProducerRecord record1 = new ProducerRecord<byte[], byte[]>("topic", "key1".getBytes(), "value1".getBytes());
-        ProducerRecord record2 = new ProducerRecord<byte[], byte[]>("topic", "key2".getBytes(), "value2".getBytes());
+        ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], byte[]>("topic", "key1".getBytes(), "value1".getBytes());
+        ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], byte[]>("topic", "key2".getBytes(), "value2".getBytes());
         Future<RecordMetadata> md1 = producer.send(record1);
         assertFalse("Send shouldn't have completed", md1.isDone());
         Future<RecordMetadata> md2 = producer.send(record2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f4dea9d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index e302523..5cbc810 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -534,7 +534,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
 
     override def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte]) {
-      val record = new ProducerRecord(sourceTopicPartition.topic, key, value)
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](sourceTopicPartition.topic, key, value)
       if(sync) {
         this.producer.send(record).get()
         unackedOffsetsMap.getAndMaybePut(sourceTopicPartition).maybeUpdateMaxOffsetSeen(sourceOffset)