You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Rakesh Nair <ra...@gmail.com> on 2012/08/29 22:27:06 UTC

SyncProducer on Kafka 0.8

I am trying to use the SyncProducer in Kafka 0.8. Here is a simple prog
which i wrote to send data.


                String msgString = "testmsg";
 Message msg = new Message(msgString.getBytes());
List<Message> allMessages = new ArrayList<Message>();
 allMessages.add(msg);
 ByteBufferMessageSet messages = new ByteBufferMessageSet(allMessages);
 PartitionData[] pData = new PartitionData[1];
pData[0] = new PartitionData(0, messages.underlying());
 TopicData[] tData = new TopicData[1];
tData[0] = new TopicData("testTopic", pData);
                 short ack = 1;
ProducerRequest pRequest = new
ProducerRequest(SyncProducerConfig.DefaultCorrelationId(),
SyncProducerConfig.DefaultClientId(), ack,
SyncProducerConfig.DefaultAckTimeoutMs(), tData);
ProducerResponse pr = prod.send(pRequest);

However i am getting an exception here
ERROR [KafkaApi on Broker 0], Error processing ProducerRequest on
testTopic:0 (kafka.server.KafkaApis)
kafka.common.UnknownTopicOrPartitionException: Topic testTopic partition 0
doesn't exist on 0
at
kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:94)
at
kafka.server.KafkaApis$$anonfun$produceToLocalLog$2$$anonfun$apply$3.apply(KafkaApis.scala:184)
at
kafka.server.KafkaApis$$anonfun$produceToLocalLog$2$$anonfun$apply$3.apply(KafkaApis.scala:179)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
        ........


Could you tell me what i am doing wrong.
-- 
Regards
Rakesh Nair

Re: SyncProducer on Kafka 0.8

Posted by Jun Rao <ju...@gmail.com>.
In 0.8, topics need to be created first before you can produce to them. Try
using Producer instead, which has the logic to trigger the auto creation of
a topic.

Thanks,

Jun

On Wed, Aug 29, 2012 at 1:27 PM, Rakesh Nair <ra...@gmail.com> wrote:

> I am trying to use the SyncProducer in Kafka 0.8. Here is a simple prog
> which i wrote to send data.
>
>
>                 String msgString = "testmsg";
>  Message msg = new Message(msgString.getBytes());
> List<Message> allMessages = new ArrayList<Message>();
>  allMessages.add(msg);
>  ByteBufferMessageSet messages = new ByteBufferMessageSet(allMessages);
>  PartitionData[] pData = new PartitionData[1];
> pData[0] = new PartitionData(0, messages.underlying());
>  TopicData[] tData = new TopicData[1];
> tData[0] = new TopicData("testTopic", pData);
>                  short ack = 1;
> ProducerRequest pRequest = new
> ProducerRequest(SyncProducerConfig.DefaultCorrelationId(),
> SyncProducerConfig.DefaultClientId(), ack,
> SyncProducerConfig.DefaultAckTimeoutMs(), tData);
> ProducerResponse pr = prod.send(pRequest);
>
> However i am getting an exception here
> ERROR [KafkaApi on Broker 0], Error processing ProducerRequest on
> testTopic:0 (kafka.server.KafkaApis)
> kafka.common.UnknownTopicOrPartitionException: Topic testTopic partition 0
> doesn't exist on 0
> at
>
> kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:94)
> at
>
> kafka.server.KafkaApis$$anonfun$produceToLocalLog$2$$anonfun$apply$3.apply(KafkaApis.scala:184)
> at
>
> kafka.server.KafkaApis$$anonfun$produceToLocalLog$2$$anonfun$apply$3.apply(KafkaApis.scala:179)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>         ........
>
>
> Could you tell me what i am doing wrong.
> --
> Regards
> Rakesh Nair
>