You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Michal Hariš <mi...@gmail.com> on 2015/12/11 13:46:28 UTC

KafkaSystemProducer partitioning

Hi all, I am not sure if this is the right mailing list but
users@samza.apache.org doesn't seem to exist.

I am just looking at the code of KafkaSystemProducer and am a bit confused
as to how the  partitioning at Samza output is handled.

Firstly it seems to be hard-coded to take a modulo of the hashCode of the
envelope partitionKey if provided otherwise null which means that it hands
the partitioning decision to the underlying kafka producer.

Now when I try to override `systems.<..>.producer.partitioner.class` I see
a warning in the initialization that partitioner.class is not a known
config - however the configuartion for samza says that any configuration
available for kafka producer can be passed to `systems.<..>.producer...`. I
have checked that both new and old kafka producer api have
`partitioner.class` configurable.

I think I am missing something or else it means that samza doesn't allow
for custom partitioning strategies at the output to kafka.

Michal,

Re: KafkaSystemProducer partitioning

Posted by Yi Pan <ni...@gmail.com>.
Hi, Michal,

Yeah, unfortunately, based on the current code base, the only short term
solution would be your proposal.The issue is the following:
1. Kafka producer only applies the specified partition.class to the
ProducerRecord key. Samza allows the user to specify a partition key that
is different from the ProducerRecord key, which is not supported from Kafka
producer's partition.class.
2. Kafka producer has made a backward incompatible change in the default
Partitioner from 0.8.1 to 0.8.2.

There is another JIRA ticket (SAMZA-839) opened to track this issue. I
would suggest that you join the discussion and keep watching this ticket.

Thanks!

-Yi

On Fri, Dec 18, 2015 at 7:01 AM, Michal Haris <mi...@visualdna.com>
wrote:

> Ah, that's unfortunate. Basically we have an existing stream-processing
> pipeline which relies on different partitioning schemes and we are writing
> some upstream Samza jobs. The only way to get it write in a particular
> partitioning scheme is then to write a different KafkaSystemFactory right ?
> Or perhaps patch the existing one ? I don't see a reason why it has to
> always use the default partitioning..
>
> On 17 December 2015 at 07:59, Yi Pan <ni...@gmail.com> wrote:
>
> > Hi, Michal,
> >
> > Sorry to reply late. Actually, you are right that the "partition.class"
> > configuration is not used in Samza to determine the outgoing partition.
> In
> > Samza, partition is defined by the following code sections:
> > {code}
> >
> > val topicName = envelope.getSystemStream.getStream
> > val partitions: java.util.List[PartitionInfo]  =
> > producer.partitionsFor(topicName)
> > val partitionKey = if(envelope.getPartitionKey != null)
> > KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
> > val record = new ProducerRecord(envelope.getSystemStream.getStream,
> >                                 partitionKey,
> >
>  envelope.getKey.asInstanceOf[Array[Byte]],
> >
> > envelope.getMessage.asInstanceOf[Array[Byte]])
> >
> > {code}
> >
> > {code}
> >
> > def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope,
> > partitions: java.util.List[PartitionInfo]): Integer = {
> >   val numPartitions = partitions.size
> >   abs(envelope.getPartitionKey.hashCode()) % numPartitions
> > }
> >
> > {code}
> >
> >
> > Hence, the partition.class in producer configuration is not used.
> >
> >
> > On Fri, Dec 11, 2015 at 4:46 AM, Michal Hariš <mi...@gmail.com>
> > wrote:
> >
> > > Hi all, I am not sure if this is the right mailing list but
> > > users@samza.apache.org doesn't seem to exist.
> > >
> > > I am just looking at the code of KafkaSystemProducer and am a bit
> > confused
> > > as to how the  partitioning at Samza output is handled.
> > >
> > > Firstly it seems to be hard-coded to take a modulo of the hashCode of
> the
> > > envelope partitionKey if provided otherwise null which means that it
> > hands
> > > the partitioning decision to the underlying kafka producer.
> > >
> > > Now when I try to override `systems.<..>.producer.partitioner.class` I
> > see
> > > a warning in the initialization that partitioner.class is not a known
> > > config - however the configuartion for samza says that any
> configuration
> > > available for kafka producer can be passed to
> > `systems.<..>.producer...`. I
> > > have checked that both new and old kafka producer api have
> > > `partitioner.class` configurable.
> > >
> > > I think I am missing something or else it means that samza doesn't
> allow
> > > for custom partitioning strategies at the output to kafka.
> > >
> > > Michal,
> > >
> >
>
>
>
> --
> Michal Haris
> Technical Architect
> direct line: +44 (0) 207 749 0229
> www.visualdna.com | t: +44 (0) 207 734 7033
> 31 Old Nichol Street
> London
> E2 7HR
>

Re: KafkaSystemProducer partitioning

Posted by Michal Haris <mi...@visualdna.com>.
Ah, that's unfortunate. Basically we have an existing stream-processing
pipeline which relies on different partitioning schemes and we are writing
some upstream Samza jobs. The only way to get it write in a particular
partitioning scheme is then to write a different KafkaSystemFactory right ?
Or perhaps patch the existing one ? I don't see a reason why it has to
always use the default partitioning..

On 17 December 2015 at 07:59, Yi Pan <ni...@gmail.com> wrote:

> Hi, Michal,
>
> Sorry to reply late. Actually, you are right that the "partition.class"
> configuration is not used in Samza to determine the outgoing partition. In
> Samza, partition is defined by the following code sections:
> {code}
>
> val topicName = envelope.getSystemStream.getStream
> val partitions: java.util.List[PartitionInfo]  =
> producer.partitionsFor(topicName)
> val partitionKey = if(envelope.getPartitionKey != null)
> KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
> val record = new ProducerRecord(envelope.getSystemStream.getStream,
>                                 partitionKey,
>                                 envelope.getKey.asInstanceOf[Array[Byte]],
>
> envelope.getMessage.asInstanceOf[Array[Byte]])
>
> {code}
>
> {code}
>
> def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope,
> partitions: java.util.List[PartitionInfo]): Integer = {
>   val numPartitions = partitions.size
>   abs(envelope.getPartitionKey.hashCode()) % numPartitions
> }
>
> {code}
>
>
> Hence, the partition.class in producer configuration is not used.
>
>
> On Fri, Dec 11, 2015 at 4:46 AM, Michal Hariš <mi...@gmail.com>
> wrote:
>
> > Hi all, I am not sure if this is the right mailing list but
> > users@samza.apache.org doesn't seem to exist.
> >
> > I am just looking at the code of KafkaSystemProducer and am a bit
> confused
> > as to how the  partitioning at Samza output is handled.
> >
> > Firstly it seems to be hard-coded to take a modulo of the hashCode of the
> > envelope partitionKey if provided otherwise null which means that it
> hands
> > the partitioning decision to the underlying kafka producer.
> >
> > Now when I try to override `systems.<..>.producer.partitioner.class` I
> see
> > a warning in the initialization that partitioner.class is not a known
> > config - however the configuartion for samza says that any configuration
> > available for kafka producer can be passed to
> `systems.<..>.producer...`. I
> > have checked that both new and old kafka producer api have
> > `partitioner.class` configurable.
> >
> > I think I am missing something or else it means that samza doesn't allow
> > for custom partitioning strategies at the output to kafka.
> >
> > Michal,
> >
>



-- 
Michal Haris
Technical Architect
direct line: +44 (0) 207 749 0229
www.visualdna.com | t: +44 (0) 207 734 7033
31 Old Nichol Street
London
E2 7HR

Re: KafkaSystemProducer partitioning

Posted by Yi Pan <ni...@gmail.com>.
Hi, Michal,

Sorry to reply late. Actually, you are right that the "partition.class"
configuration is not used in Samza to determine the outgoing partition. In
Samza, partition is defined by the following code sections:
{code}

val topicName = envelope.getSystemStream.getStream
val partitions: java.util.List[PartitionInfo]  =
producer.partitionsFor(topicName)
val partitionKey = if(envelope.getPartitionKey != null)
KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
val record = new ProducerRecord(envelope.getSystemStream.getStream,
                                partitionKey,
                                envelope.getKey.asInstanceOf[Array[Byte]],
                                envelope.getMessage.asInstanceOf[Array[Byte]])

{code}

{code}

def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope,
partitions: java.util.List[PartitionInfo]): Integer = {
  val numPartitions = partitions.size
  abs(envelope.getPartitionKey.hashCode()) % numPartitions
}

{code}


Hence, the partition.class in producer configuration is not used.


On Fri, Dec 11, 2015 at 4:46 AM, Michal Hariš <mi...@gmail.com>
wrote:

> Hi all, I am not sure if this is the right mailing list but
> users@samza.apache.org doesn't seem to exist.
>
> I am just looking at the code of KafkaSystemProducer and am a bit confused
> as to how the  partitioning at Samza output is handled.
>
> Firstly it seems to be hard-coded to take a modulo of the hashCode of the
> envelope partitionKey if provided otherwise null which means that it hands
> the partitioning decision to the underlying kafka producer.
>
> Now when I try to override `systems.<..>.producer.partitioner.class` I see
> a warning in the initialization that partitioner.class is not a known
> config - however the configuartion for samza says that any configuration
> available for kafka producer can be passed to `systems.<..>.producer...`. I
> have checked that both new and old kafka producer api have
> `partitioner.class` configurable.
>
> I think I am missing something or else it means that samza doesn't allow
> for custom partitioning strategies at the output to kafka.
>
> Michal,
>