You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Roger Hoover <ro...@gmail.com> on 2014/12/15 19:17:55 UTC

Map-side join/broadcast streams

Hi all,

I appreciate any advice on how best to do this:

I have a very small dimension table that I want to join with a large-volume
event stream.  Partitioning the event stream for the join seems overly
complicated for this use case.  What I really want is for all tasks of my
stream join job to consume all partitions of the dimension stream.

I see that there's an issue open for this and that it's not supported yet:
https://issues.apache.org/jira/browse/SAMZA-353

As a work around, I was thinking of creating another topic for the
dimension stream with an equal number of partitions as the event stream and
having each partition of that stream contain a full copy of the dimension
table.  To do this, I think I need a job to copy (fan out) messages from
the single upstream partition of the dimension stream to all partitions of
this new topic.

The issue is that the Samza API for OutgoingMessageEnvelope doesn't let me
specify a partition id for an outbound message, only a partition key.

What's the best way to ensure that a given message goes to a particular
partition id?
1) Take advantage of the Kafka default partitioner using the key object's
hashCode and make the key object hash to the partition id.  Is this too
brittle in relying on the DefaultPartitioner implementation of the old
Kafka producer API?
2) Create a custom Kafka partitioner and enable it using the
"partitioner.class" setting?
3) Is there another way?

Thanks,

Roger

Re: Map-side join/broadcast streams

Posted by Roger Hoover <ro...@gmail.com>.
Ah...I didn't know that the hashCode for boxed Integers is equal to their
value (which makes the hashmod a no-op).

Thanks, Chris!

Roger


On Mon, Dec 15, 2014 at 12:59 PM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:
>
> Hey Roger,
>
> I believe that the OutgoingMessageEnvelope API allows you to specify a
> partition key independent of the key for the message.
>
>   public OutgoingMessageEnvelope(SystemStream systemStream, String
> keySerializerName, String messageSerializerName, Object partitionKey,
> Object key, Object message)
>
>
> This API can be used to directly specify a partition. For example, if you
> want to force the message to go to partition 7, you'd specify partitionKey
> as 7. Kafka's default partitioner does a hash/mod:
>
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/produc
> er/DefaultPartitioner.scala
>
> This is a noop for your use case, and should then pass directly down to
> the proper partition. This should allow you to forcibly set an partition,
> while still keeping the message's key separately.
>
> Cheers,
> Chris
>
> On 12/15/14 10:17 AM, "Roger Hoover" <ro...@gmail.com> wrote:
>
> >Hi all,
> >
> >I appreciate any advice on how best to do this:
> >
> >I have a very small dimension table that I want to join with a
> >large-volume
> >event stream.  Partitioning the event stream for the join seems overly
> >complicated for this use case.  What I really want is for all tasks of my
> >stream join job to consume all partitions of the dimension stream.
> >
> >I see that there's an issue open for this and that it's not supported yet:
> >https://issues.apache.org/jira/browse/SAMZA-353
> >
> >As a work around, I was thinking of creating another topic for the
> >dimension stream with an equal number of partitions as the event stream
> >and
> >having each partition of that stream contain a full copy of the dimension
> >table.  To do this, I think I need a job to copy (fan out) messages from
> >the single upstream partition of the dimension stream to all partitions of
> >this new topic.
> >
> >The issue is that the Samza API for OutgoingMessageEnvelope doesn't let me
> >specify a partition id for an outbound message, only a partition key.
> >
> >What's the best way to ensure that a given message goes to a particular
> >partition id?
> >1) Take advantage of the Kafka default partitioner using the key object's
> >hashCode and make the key object hash to the partition id.  Is this too
> >brittle in relying on the DefaultPartitioner implementation of the old
> >Kafka producer API?
> >2) Create a custom Kafka partitioner and enable it using the
> >"partitioner.class" setting?
> >3) Is there another way?
> >
> >Thanks,
> >
> >Roger
>
>

Re: Map-side join/broadcast streams

Posted by Chris Riccomini <cr...@linkedin.com.INVALID>.
Hey Roger,

I believe that the OutgoingMessageEnvelope API allows you to specify a
partition key independent of the key for the message.

  public OutgoingMessageEnvelope(SystemStream systemStream, String
keySerializerName, String messageSerializerName, Object partitionKey,
Object key, Object message)


This API can be used to directly specify a partition. For example, if you
want to force the message to go to partition 7, you'd specify partitionKey
as 7. Kafka's default partitioner does a hash/mod:

  
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/produc
er/DefaultPartitioner.scala

This is a noop for your use case, and should then pass directly down to
the proper partition. This should allow you to forcibly set an partition,
while still keeping the message's key separately.

Cheers,
Chris

On 12/15/14 10:17 AM, "Roger Hoover" <ro...@gmail.com> wrote:

>Hi all,
>
>I appreciate any advice on how best to do this:
>
>I have a very small dimension table that I want to join with a
>large-volume
>event stream.  Partitioning the event stream for the join seems overly
>complicated for this use case.  What I really want is for all tasks of my
>stream join job to consume all partitions of the dimension stream.
>
>I see that there's an issue open for this and that it's not supported yet:
>https://issues.apache.org/jira/browse/SAMZA-353
>
>As a work around, I was thinking of creating another topic for the
>dimension stream with an equal number of partitions as the event stream
>and
>having each partition of that stream contain a full copy of the dimension
>table.  To do this, I think I need a job to copy (fan out) messages from
>the single upstream partition of the dimension stream to all partitions of
>this new topic.
>
>The issue is that the Samza API for OutgoingMessageEnvelope doesn't let me
>specify a partition id for an outbound message, only a partition key.
>
>What's the best way to ensure that a given message goes to a particular
>partition id?
>1) Take advantage of the Kafka default partitioner using the key object's
>hashCode and make the key object hash to the partition id.  Is this too
>brittle in relying on the DefaultPartitioner implementation of the old
>Kafka producer API?
>2) Create a custom Kafka partitioner and enable it using the
>"partitioner.class" setting?
>3) Is there another way?
>
>Thanks,
>
>Roger