You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joe San <co...@gmail.com> on 2016/02/04 07:11:40 UTC

Producer code to a partition

Kafka users,

The code below is something that I have to write to a Topic!

def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
  Future {
    logger.info(s"Persisting ${tsDataPoints.length} data-points in
Kafka topic ${producerConfig.topic}")
    val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
    val jsonMessage = Json.toJson(dataPoints).toString()
    val recordMetaDataF = producer.send(
      new ProducerRecord[String, String](producerConfig.topic, jsonMessage)
    )
    // if we don't make it to Kafka within 3 seconds, we timeout
    val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
    logger.info(
      s"persisted ${tsDataPoints.length} data-points to kafka topic:  " +
        s"${recordMetaData.topic()} partition:
${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
    )
    ()
  }
}


How could I make this code write to a specific partition? currently my
topic does not have partitions, so by default this code write to partition
0 of the topic!

I'm using Kafka 0.9.0.0! Any suggestions?

Regards,
Joe

Re: Producer code to a partition

Posted by Manikumar Reddy <ku...@nmsworks.co.in>.
In kafka, each record can have a key.  This key is used to distribute
records to partitions.
All non-keyed records will be distributed in round-robin fashion.
All keyed records will be distributed based on the hash of the key / or can
write a custom partitioner.
or we can specify partition number for each message using "ProducerRecord"
constructor.

https://kafka.apache.org/documentation.html#theproducer

On Thu, Feb 4, 2016 at 11:53 AM, Joe San <co...@gmail.com> wrote:

> What is the partition key? Why do I need to specify the partition key and a
> partition number?
>
> On Thu, Feb 4, 2016 at 7:17 AM, Manikumar Reddy <manikumar.reddy@gmail.com
> >
> wrote:
>
> > Hi,
> >
> >  You can use ProducerRecord(java.lang.String topic, java.lang.Integer
> > partition, K key, V value) constructor
> >   to pass partition number.
> >
> >
> >
> >
> https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > Kumar
> >
> > On Thu, Feb 4, 2016 at 11:41 AM, Joe San <co...@gmail.com>
> wrote:
> >
> > > Kafka users,
> > >
> > > The code below is something that I have to write to a Topic!
> > >
> > > def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
> > >   Future {
> > >     logger.info(s"Persisting ${tsDataPoints.length} data-points in
> > > Kafka topic ${producerConfig.topic}")
> > >     val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
> > >     val jsonMessage = Json.toJson(dataPoints).toString()
> > >     val recordMetaDataF = producer.send(
> > >       new ProducerRecord[String, String](producerConfig.topic,
> > jsonMessage)
> > >     )
> > >     // if we don't make it to Kafka within 3 seconds, we timeout
> > >     val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
> > >     logger.info(
> > >       s"persisted ${tsDataPoints.length} data-points to kafka topic:
> " +
> > >         s"${recordMetaData.topic()} partition:
> > > ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
> > >     )
> > >     ()
> > >   }
> > > }
> > >
> > >
> > > How could I make this code write to a specific partition? currently my
> > > topic does not have partitions, so by default this code write to
> > partition
> > > 0 of the topic!
> > >
> > > I'm using Kafka 0.9.0.0! Any suggestions?
> > >
> > > Regards,
> > > Joe
> > >
> >
>

Re: Producer code to a partition

Posted by Joe San <co...@gmail.com>.
What is the partition key? Why do I need to specify the partition key and a
partition number?

On Thu, Feb 4, 2016 at 7:17 AM, Manikumar Reddy <ma...@gmail.com>
wrote:

> Hi,
>
>  You can use ProducerRecord(java.lang.String topic, java.lang.Integer
> partition, K key, V value) constructor
>   to pass partition number.
>
>
>
> https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>
> Kumar
>
> On Thu, Feb 4, 2016 at 11:41 AM, Joe San <co...@gmail.com> wrote:
>
> > Kafka users,
> >
> > The code below is something that I have to write to a Topic!
> >
> > def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
> >   Future {
> >     logger.info(s"Persisting ${tsDataPoints.length} data-points in
> > Kafka topic ${producerConfig.topic}")
> >     val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
> >     val jsonMessage = Json.toJson(dataPoints).toString()
> >     val recordMetaDataF = producer.send(
> >       new ProducerRecord[String, String](producerConfig.topic,
> jsonMessage)
> >     )
> >     // if we don't make it to Kafka within 3 seconds, we timeout
> >     val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
> >     logger.info(
> >       s"persisted ${tsDataPoints.length} data-points to kafka topic:  " +
> >         s"${recordMetaData.topic()} partition:
> > ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
> >     )
> >     ()
> >   }
> > }
> >
> >
> > How could I make this code write to a specific partition? currently my
> > topic does not have partitions, so by default this code write to
> partition
> > 0 of the topic!
> >
> > I'm using Kafka 0.9.0.0! Any suggestions?
> >
> > Regards,
> > Joe
> >
>

Re: Producer code to a partition

Posted by Manikumar Reddy <ma...@gmail.com>.
Hi,

 You can use ProducerRecord(java.lang.String topic, java.lang.Integer
partition, K key, V value) constructor
  to pass partition number.


https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html

Kumar

On Thu, Feb 4, 2016 at 11:41 AM, Joe San <co...@gmail.com> wrote:

> Kafka users,
>
> The code below is something that I have to write to a Topic!
>
> def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
>   Future {
>     logger.info(s"Persisting ${tsDataPoints.length} data-points in
> Kafka topic ${producerConfig.topic}")
>     val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
>     val jsonMessage = Json.toJson(dataPoints).toString()
>     val recordMetaDataF = producer.send(
>       new ProducerRecord[String, String](producerConfig.topic, jsonMessage)
>     )
>     // if we don't make it to Kafka within 3 seconds, we timeout
>     val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
>     logger.info(
>       s"persisted ${tsDataPoints.length} data-points to kafka topic:  " +
>         s"${recordMetaData.topic()} partition:
> ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
>     )
>     ()
>   }
> }
>
>
> How could I make this code write to a specific partition? currently my
> topic does not have partitions, so by default this code write to partition
> 0 of the topic!
>
> I'm using Kafka 0.9.0.0! Any suggestions?
>
> Regards,
> Joe
>