You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/13 06:36:44 UTC
kafka git commit: kafka-1797;
(missed parametric in a few files) add the serializer/deserializer
api to the new java client; patched by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 14779dddb -> 6f4dea9db
kafka-1797; (missed parametric in a few files) add the serializer/deserializer api to the new java client; patched by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f4dea9d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f4dea9d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f4dea9d
Branch: refs/heads/trunk
Commit: 6f4dea9dbce5cc9f69a917182981b41a56a98a85
Parents: 14779dd
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Jan 12 18:45:45 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 12 21:29:40 2015 -0800
----------------------------------------------------------------------
.../org/apache/kafka/clients/producer/KafkaProducer.java | 2 +-
.../org/apache/kafka/clients/producer/MockProducerTest.java | 9 +++------
core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +-
3 files changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f4dea9d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a61c56c..d3abeb1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -320,7 +320,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
- ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
+ ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
int partition = partitioner.partition(serializedRecord, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f4dea9d/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 1e2ca03..3676b05 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -25,9 +25,6 @@ import static org.junit.Assert.fail;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;
public class MockProducerTest {
@@ -37,7 +34,7 @@ public class MockProducerTest {
@Test
public void testAutoCompleteMock() throws Exception {
MockProducer producer = new MockProducer(true);
- ProducerRecord record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());
Future<RecordMetadata> metadata = producer.send(record);
assertTrue("Send should be immediately complete", metadata.isDone());
assertFalse("Send should be successful", isError(metadata));
@@ -51,8 +48,8 @@ public class MockProducerTest {
@Test
public void testManualCompletion() throws Exception {
MockProducer producer = new MockProducer(false);
- ProducerRecord record1 = new ProducerRecord<byte[], byte[]>("topic", "key1".getBytes(), "value1".getBytes());
- ProducerRecord record2 = new ProducerRecord<byte[], byte[]>("topic", "key2".getBytes(), "value2".getBytes());
+ ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], byte[]>("topic", "key1".getBytes(), "value1".getBytes());
+ ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], byte[]>("topic", "key2".getBytes(), "value2".getBytes());
Future<RecordMetadata> md1 = producer.send(record1);
assertFalse("Send shouldn't have completed", md1.isDone());
Future<RecordMetadata> md2 = producer.send(record2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f4dea9d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index e302523..5cbc810 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -534,7 +534,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
override def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte]) {
- val record = new ProducerRecord(sourceTopicPartition.topic, key, value)
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](sourceTopicPartition.topic, key, value)
if(sync) {
this.producer.send(record).get()
unackedOffsetsMap.getAndMaybePut(sourceTopicPartition).maybeUpdateMaxOffsetSeen(sourceOffset)