You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Rakesh Nair <ra...@gmail.com> on 2012/08/29 22:27:06 UTC
SyncProducer on Kafka 0.8
I am trying to use the SyncProducer in Kafka 0.8. Here is a simple prog
which i wrote to send data.
String msgString = "testmsg";
Message msg = new Message(msgString.getBytes());
List<Message> allMessages = new ArrayList<Message>();
allMessages.add(msg);
ByteBufferMessageSet messages = new ByteBufferMessageSet(allMessages);
PartitionData[] pData = new PartitionData[1];
pData[0] = new PartitionData(0, messages.underlying());
TopicData[] tData = new TopicData[1];
tData[0] = new TopicData("testTopic", pData);
short ack = 1;
ProducerRequest pRequest = new
ProducerRequest(SyncProducerConfig.DefaultCorrelationId(),
SyncProducerConfig.DefaultClientId(), ack,
SyncProducerConfig.DefaultAckTimeoutMs(), tData);
ProducerResponse pr = prod.send(pRequest);
However i am getting an exception here
ERROR [KafkaApi on Broker 0], Error processing ProducerRequest on
testTopic:0 (kafka.server.KafkaApis)
kafka.common.UnknownTopicOrPartitionException: Topic testTopic partition 0
doesn't exist on 0
at
kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:94)
at
kafka.server.KafkaApis$$anonfun$produceToLocalLog$2$$anonfun$apply$3.apply(KafkaApis.scala:184)
at
kafka.server.KafkaApis$$anonfun$produceToLocalLog$2$$anonfun$apply$3.apply(KafkaApis.scala:179)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
........
Could you tell me what i am doing wrong.
--
Regards
Rakesh Nair
Re: SyncProducer on Kafka 0.8
Posted by Jun Rao <ju...@gmail.com>.
In 0.8, topics need to be created first before you can produce to them. Try
using Producer instead, which has the logic to trigger the auto creation of
a topic.
Thanks,
Jun
On Wed, Aug 29, 2012 at 1:27 PM, Rakesh Nair <ra...@gmail.com> wrote:
> I am trying to use the SyncProducer in Kafka 0.8. Here is a simple prog
> which i wrote to send data.
>
>
> String msgString = "testmsg";
> Message msg = new Message(msgString.getBytes());
> List<Message> allMessages = new ArrayList<Message>();
> allMessages.add(msg);
> ByteBufferMessageSet messages = new ByteBufferMessageSet(allMessages);
> PartitionData[] pData = new PartitionData[1];
> pData[0] = new PartitionData(0, messages.underlying());
> TopicData[] tData = new TopicData[1];
> tData[0] = new TopicData("testTopic", pData);
> short ack = 1;
> ProducerRequest pRequest = new
> ProducerRequest(SyncProducerConfig.DefaultCorrelationId(),
> SyncProducerConfig.DefaultClientId(), ack,
> SyncProducerConfig.DefaultAckTimeoutMs(), tData);
> ProducerResponse pr = prod.send(pRequest);
>
> However i am getting an exception here
> ERROR [KafkaApi on Broker 0], Error processing ProducerRequest on
> testTopic:0 (kafka.server.KafkaApis)
> kafka.common.UnknownTopicOrPartitionException: Topic testTopic partition 0
> doesn't exist on 0
> at
>
> kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:94)
> at
>
> kafka.server.KafkaApis$$anonfun$produceToLocalLog$2$$anonfun$apply$3.apply(KafkaApis.scala:184)
> at
>
> kafka.server.KafkaApis$$anonfun$produceToLocalLog$2$$anonfun$apply$3.apply(KafkaApis.scala:179)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> ........
>
>
> Could you tell me what i am doing wrong.
> --
> Regards
> Rakesh Nair
>