You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joe San <co...@gmail.com> on 2016/02/04 07:11:40 UTC
Producer code to a partition
Kafka users,
The code below is something that I have to write to a Topic!
def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
Future {
logger.info(s"Persisting ${tsDataPoints.length} data-points in
Kafka topic ${producerConfig.topic}")
val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
val jsonMessage = Json.toJson(dataPoints).toString()
val recordMetaDataF = producer.send(
new ProducerRecord[String, String](producerConfig.topic, jsonMessage)
)
// if we don't make it to Kafka within 3 seconds, we timeout
val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
logger.info(
s"persisted ${tsDataPoints.length} data-points to kafka topic: " +
s"${recordMetaData.topic()} partition:
${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
)
()
}
}
How could I make this code write to a specific partition? currently my
topic does not have partitions, so by default this code write to partition
0 of the topic!
I'm using Kafka 0.9.0.0! Any suggestions?
Regards,
Joe
Re: Producer code to a partition
Posted by Manikumar Reddy <ku...@nmsworks.co.in>.
In kafka, each record can have a key. This key is used to distribute
records to partitions.
All non-keyed records will be distributed in round-robin fashion.
All keyed records will be distributed based on the hash of the key / or can
write a custom partitioner.
or we can specify partition number for each message using "ProducerRecord"
constructor.
https://kafka.apache.org/documentation.html#theproducer
On Thu, Feb 4, 2016 at 11:53 AM, Joe San <co...@gmail.com> wrote:
> What is the partition key? Why do I need to specify the partition key and a
> partition number?
>
> On Thu, Feb 4, 2016 at 7:17 AM, Manikumar Reddy <manikumar.reddy@gmail.com
> >
> wrote:
>
> > Hi,
> >
> > You can use ProducerRecord(java.lang.String topic, java.lang.Integer
> > partition, K key, V value) constructor
> > to pass partition number.
> >
> >
> >
> >
> https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > Kumar
> >
> > On Thu, Feb 4, 2016 at 11:41 AM, Joe San <co...@gmail.com>
> wrote:
> >
> > > Kafka users,
> > >
> > > The code below is something that I have to write to a Topic!
> > >
> > > def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
> > > Future {
> > > logger.info(s"Persisting ${tsDataPoints.length} data-points in
> > > Kafka topic ${producerConfig.topic}")
> > > val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
> > > val jsonMessage = Json.toJson(dataPoints).toString()
> > > val recordMetaDataF = producer.send(
> > > new ProducerRecord[String, String](producerConfig.topic,
> > jsonMessage)
> > > )
> > > // if we don't make it to Kafka within 3 seconds, we timeout
> > > val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
> > > logger.info(
> > > s"persisted ${tsDataPoints.length} data-points to kafka topic:
> " +
> > > s"${recordMetaData.topic()} partition:
> > > ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
> > > )
> > > ()
> > > }
> > > }
> > >
> > >
> > > How could I make this code write to a specific partition? currently my
> > > topic does not have partitions, so by default this code write to
> > partition
> > > 0 of the topic!
> > >
> > > I'm using Kafka 0.9.0.0! Any suggestions?
> > >
> > > Regards,
> > > Joe
> > >
> >
>
Re: Producer code to a partition
Posted by Joe San <co...@gmail.com>.
What is the partition key? Why do I need to specify the partition key and a
partition number?
On Thu, Feb 4, 2016 at 7:17 AM, Manikumar Reddy <ma...@gmail.com>
wrote:
> Hi,
>
> You can use ProducerRecord(java.lang.String topic, java.lang.Integer
> partition, K key, V value) constructor
> to pass partition number.
>
>
>
> https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>
> Kumar
>
> On Thu, Feb 4, 2016 at 11:41 AM, Joe San <co...@gmail.com> wrote:
>
> > Kafka users,
> >
> > The code below is something that I have to write to a Topic!
> >
> > def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
> > Future {
> > logger.info(s"Persisting ${tsDataPoints.length} data-points in
> > Kafka topic ${producerConfig.topic}")
> > val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
> > val jsonMessage = Json.toJson(dataPoints).toString()
> > val recordMetaDataF = producer.send(
> > new ProducerRecord[String, String](producerConfig.topic,
> jsonMessage)
> > )
> > // if we don't make it to Kafka within 3 seconds, we timeout
> > val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
> > logger.info(
> > s"persisted ${tsDataPoints.length} data-points to kafka topic: " +
> > s"${recordMetaData.topic()} partition:
> > ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
> > )
> > ()
> > }
> > }
> >
> >
> > How could I make this code write to a specific partition? currently my
> > topic does not have partitions, so by default this code write to
> partition
> > 0 of the topic!
> >
> > I'm using Kafka 0.9.0.0! Any suggestions?
> >
> > Regards,
> > Joe
> >
>
Re: Producer code to a partition
Posted by Manikumar Reddy <ma...@gmail.com>.
Hi,
You can use ProducerRecord(java.lang.String topic, java.lang.Integer
partition, K key, V value) constructor
to pass partition number.
https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
Kumar
On Thu, Feb 4, 2016 at 11:41 AM, Joe San <co...@gmail.com> wrote:
> Kafka users,
>
> The code below is something that I have to write to a Topic!
>
> def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
> Future {
> logger.info(s"Persisting ${tsDataPoints.length} data-points in
> Kafka topic ${producerConfig.topic}")
> val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
> val jsonMessage = Json.toJson(dataPoints).toString()
> val recordMetaDataF = producer.send(
> new ProducerRecord[String, String](producerConfig.topic, jsonMessage)
> )
> // if we don't make it to Kafka within 3 seconds, we timeout
> val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
> logger.info(
> s"persisted ${tsDataPoints.length} data-points to kafka topic: " +
> s"${recordMetaData.topic()} partition:
> ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
> )
> ()
> }
> }
>
>
> How could I make this code write to a specific partition? currently my
> topic does not have partitions, so by default this code write to partition
> 0 of the topic!
>
> I'm using Kafka 0.9.0.0! Any suggestions?
>
> Regards,
> Joe
>