You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by JIEFU GONG <jg...@berkeley.edu> on 2015/07/21 23:26:51 UTC

Implementing a custom partitioner

Hi all,

If I wanted to write my own partitioner, all I would need to do is write a
class that extends Partitioner and override the partition function,
correct? I am currently doing so, at least, with a class in the package
'services', yet when I use:

properties.put("partitioner.class", "services.myPartitioner");

and instantiate my producer, this doesn't work properly. I'm using a simple
switch statement, so I am led to believe that I have not improperly written
my partitioner. After attempting to debug the issue, I
notice that the constructor I'm entering when attempting to instantiate the
producer has the line:

this.partitioner = new Partitioner();

which more or less ignores my input. Any ideas? Help is appreciated!




-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427

Re: Implementing a custom partitioner

Posted by Sriharsha Chintalapani <ka...@harsha.io>.
If you are using the new producer api from kafka 0.8.2 there is no pluggable partitioner in it for this you need to use the latest trunk. But in 0.8.2 if you are using old producer code you can implement a pluggable partitioner 
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/ProducerConfig.scala#L69
by implementing this interface
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Partitioner.scala

and its get created here https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Producer.scala#L61

Thanks,
Harsha


On July 21, 2015 at 2:54:05 PM, JIEFU GONG (jgong@berkeley.edu) wrote:

Sriharsha, thanks for your response. I'm using version 0.8.2, and I am implementing kafka.producer.Partitioner. 

I noticed that in the latest trunk the line I specified above is replaced with:

this.partitioner  
=  
config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,  
Partitioner.class);

does this mean I cannot use my own partitioner in v 0.8.2?


On Tue, Jul 21, 2015 at 2:48 PM, Sriharsha Chintalapani <ka...@harsha.io> wrote:
Hi,
     Are you using the latest trunk for Producer API?.  Did you implement the interface here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
-- 
Harsha


On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jgong@berkeley.edu) wrote:

Hi all,

If I wanted to write my own partitioner, all I would need to do is write a
class that extends Partitioner and override the partition function,
correct? I am currently doing so, at least, with a class in the package
'services', yet when I use:

properties.put("partitioner.class", "services.myPartitioner");

and instantiate my producer, this doesn't work properly. I'm using a simple
switch statement, so I am led to believe that I have not improperly written
my partitioner. After attempting to debug the issue, I
notice that the constructor I'm entering when attempting to instantiate the
producer has the line:

this.partitioner = new Partitioner();

which more or less ignores my input. Any ideas? Help is appreciated!




--

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427



--
Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences
jgong@berkeley.edu | (925) 400-3427

Re: Implementing a custom partitioner

Posted by JIEFU GONG <jg...@berkeley.edu>.
Sriharsha, thanks for your response. I'm using version 0.8.2, and I am
implementing kafka.producer.Partitioner.

I noticed that in the latest trunk the line I specified above is replaced
with:
this.partitioner = config.getConfiguredInstance(ProducerConfig.
PARTITIONER_CLASS_CONFIG, Partitioner.class);

does this mean I cannot use my own partitioner in v 0.8.2?


On Tue, Jul 21, 2015 at 2:48 PM, Sriharsha Chintalapani <ka...@harsha.io>
wrote:

> Hi,
>      Are you using the latest trunk for Producer API?.  Did you implement
> the interface here
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
> --
> Harsha
>
>
> On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jgong@berkeley.edu) wrote:
>
> Hi all,
>
> If I wanted to write my own partitioner, all I would need to do is write a
> class that extends Partitioner and override the partition function,
> correct? I am currently doing so, at least, with a class in the package
> 'services', yet when I use:
>
> properties.put("partitioner.class", "services.myPartitioner");
>
> and instantiate my producer, this doesn't work properly. I'm using a
> simple
> switch statement, so I am led to believe that I have not improperly
> written
> my partitioner. After attempting to debug the issue, I
> notice that the constructor I'm entering when attempting to instantiate
> the
> producer has the line:
>
> this.partitioner = new Partitioner();
>
> which more or less ignores my input. Any ideas? Help is appreciated!
>
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427
>
>


-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427

Re: Implementing a custom partitioner

Posted by Sriharsha Chintalapani <ka...@harsha.io>.
Hi,
     Are you using the latest trunk for Producer API?.  Did you implement the interface here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
-- 
Harsha


On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jgong@berkeley.edu) wrote:

Hi all,  

If I wanted to write my own partitioner, all I would need to do is write a  
class that extends Partitioner and override the partition function,  
correct? I am currently doing so, at least, with a class in the package  
'services', yet when I use:  

properties.put("partitioner.class", "services.myPartitioner");  

and instantiate my producer, this doesn't work properly. I'm using a simple  
switch statement, so I am led to believe that I have not improperly written  
my partitioner. After attempting to debug the issue, I  
notice that the constructor I'm entering when attempting to instantiate the  
producer has the line:  

this.partitioner = new Partitioner();  

which more or less ignores my input. Any ideas? Help is appreciated!  




--  

Jiefu Gong  
University of California, Berkeley | Class of 2017  
B.A Computer Science | College of Letters and Sciences  

jgong@berkeley.edu <el...@berkeley.edu> | (925) 400-3427