You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Tyson Norris <tn...@adobe.com> on 2014/04/11 21:15:54 UTC

partitioning question

Hi - 
I have a couple questions about partitioning - I’m trying to have multiple tasks instances run, each processing a separate partition, and it appears that only a single task instance runs, processing all partitions. Or else my partitions are not created properly. This is based on a modified version of hello-samza, so I’m not sure exactly which config+code steps to take to enable partitioning of message to multiple instances of the same task.

To route to a partition i use: messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey, outgoingMap));
- question here: the example in docs of collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems confusing, because the OutgoingMessageEnvelope constructor has a key AND partitionKey - I assume the partitionKey should be msg.get(“user_id”) in this case, but what should key be? Just a unique value for this message?

I tried the 3 parameter constructor as well and have similar problems, where my single task instance is used regardless of partitionKey specified in the OutgoingMessageEnvelope.

Do I need to specify partition manager and yarn.container.count to get multiple instances of my task working to service separate partitions?

I’m not sure how to tell if my messages are routed to the correct partition in kafka, or whether the problem is a partition handling config in samza.

Any advice is appreciated!

Thanks
Tyson

Re: partitioning question

Posted by Tyson Norris <tn...@adobe.com>.
Hi -
Yes that did the trick thanks!

I see that http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/configuration-table.html seems a bit out of date.

I created a JIRA issue to adde some notes to docs about creating partitions, just to give a tip to new users like myself:
https://issues.apache.org/jira/browse/SAMZA-239

I also had changed the kafka producer.properties: partitioner.class=kafka.producer.DefaultPartitioner, and need to test to see if that is still required (if so would also be worth mentioning in config docs).


Thanks
Tyson

On Apr 15, 2014, at 4:56 AM, Martin Kleppmann <mk...@linkedin.com>> wrote:

Have you set a serde (serializer/deserializer) for the key? You need to tell Samza how to go from a string to bytes, and reverse. Something like this:

serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
systems.kafka.samza.key.serde=string

Martin

On 15 Apr 2014, at 06:56, Tyson Norris <tn...@adobe.com>> wrote:
Hi -
Just following up on this, I ran into a couple other problems, as I am trying to use a String for partition key.


Passing String types for either key or partitionKey generated:
KafkaSystemProducer [WARN] Triggering a reconnect for kafka because connection failed: java.lang.ClassCastException: java.lang.String cannot be cast to [B

I was able to configure kafka with:
partitioner.class=kafka.producer.DefaultPartitioner

In which case I can now use string for partitionKey and byte[] for key, which worked (yay!).

However, if I change to pass only key, it still fails with the  java.lang.ClassCastException: java.lang.String cannot be cast to [B
I also tried specifying in the kafka producer.properties:
key.serializer.class=kafka.serializer.StringEncoder

but had the same results.

Is there something special I need to do to use String type for key when using:
public OutgoingMessageEnvelope(SystemStream systemStream,
                             java.lang.Object key,
                             java.lang.Object message)

Thanks
Tyson

On Apr 14, 2014, at 12:58 PM, Chris Riccomini <cr...@linkedin.com>> wrote:

Hey Tyler,

Yeah, sorry this is not more clear. Physical Kafka partitions are
per-topic. The default partition count for a newly created topic in Kafka
is defined using the num.partitions setting, as you've discovered. The
default setting that Kafka ships with is 1. This can be overridden on a
per-topic basis by using the kafka-create-topic.sh tool.

Cheers,
Chris

On 4/14/14 12:47 PM, "Tyson Norris" <tn...@adobe.com>> wrote:

OK that was wrong as well (my SystemFactory is indeed supposed to use a
single partition, just not the kafka system factory), but I finally got
things working as expected with some kafka config changes.

For now, I set in deploy/kafka/config/server.properties:
num.partitions=5

(although I assume there is a better per-topic value I should set instead
of a default like this)

And now I see multiple task instances created as desired.

Thanks
Tyson

On Apr 14, 2014, at 12:36 PM, Tyson Norris <tn...@adobe.com>> wrote:

OK sorry for the noise.
I stumbled upon another clue - my SystemFactory has (based on
WikipediaSystemFactory) :
@Override
public SystemAdmin getAdmin(String systemName, Config config) {
    return new SinglePartitionWithoutOffsetsSystemAdmin();
}

Which I guess is a good reason my system is only using a single
partition. Doh.
I will work on a new SystemFactory impl to test with...

Thanks
Tyson

On Apr 14, 2014, at 12:20 PM, Tyson Norris <tn...@adobe.com>> wrote:

I am actually wondering if I’m missing a bit of configuration that
indicates the number of partitions I want to create for various kafka
topics that messages are sent to?

I don’t see where this should be added in the config, and it appears
the partitions are not created automatically when I specify the key for
partitioning.


Thanks
Tyson



On Apr 14, 2014, at 10:35 AM, Tyson Norris
<tn...@adobe.com>> wrote:

Hi Chris -

On Apr 14, 2014, at 9:13 AM, Chris Riccomini
<cr...@linkedin.com>> wrote:

Hey Tyler,

"""I¹m trying to have multiple tasks instances run, each processing a
separate partition, and it appears that only a single task instance
runs,
processing all partitions. Or else my partitions are not created
properly."""

Are you trying to consume a single stream that has multiple
partitions, or
are you processing multiple streams that all have one partition? If
it's
the latter, all of these messages will get routed to a single task
instance. There is an upcoming patch to allow alternative partition
task
instance mappings (SAMZA-71), which Jakob Homan is working on
currently.

No I’m trying for the former, although my SystemConsumer is set up
like the latter.  That is, I have a system consumer that should
generate messages in a single partition, and a task that takes all
messages and splits them into multiple partitions.

So, in my SystemConsumer I have:
  SystemStreamPartition systemStreamPartition = new
SystemStreamPartition(systemName, streamId, new Partition(0));
  try {
      put(systemStreamPartition, new
IncomingMessageEnvelope(systemStreamPartition, null, null, object));

which generates messages on the same stream + partition.

Then in my first task I have:
messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
partitionKey.getBytes(), outgoingMap));

which I am trying to get routed to separate task instances based on
partitionKey.


If you have a single input stream with multiple partitions, you should
end
up with one task instance per partition. This partitioning model is
explained in some detail at the 20 minute mark in this talk:

http://www.infoq.com/presentations/samza-linkedin

"""the example in docs of collector.send(new
OutgoingMessageEnvelope(new
SystemStream("kafka", "SomeTopicPartitionedByUserId"),
msg.get("user_id"),
msg)) seems confusing, because the OutgoingMessageEnvelope constructor
has
a key AND partitionKey - I assume the partitionKey should be
msg.get(³user_id²) in this case, but what should key be? Just a unique
value for this message?"""

The OutgoingMessageEnvelope has several constructors. The two you're
referring to are:

public OutgoingMessageEnvelope(SystemStream systemStream, Object
partitionKey, Object key, Object message)

public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
Object message)


This is, indeed, odd. In general, people only want the second
constructor
(systemStream, key, message). The constructor with the partitionKey has
its origins in the Kafka API. With Kafka 0.8, keys are now stored along
with messages in the actual log segments on the disk. This is useful
because it means you can get access to the key information that was
sent
with the message. It also means that you can use log-compaction to
de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
cases where you might wish to partition a topic by one key (say, member
ID), but store (or de-deuplicate by) a different key with the message.


So in the case where I don’t care about deduplication, is the second
constructor “key” parameter actually used as partition key?




"""Do I need to specify partition manager and yarn.container.count to
get
multiple instances of my task working to service separate
partitions?"""

This class has been replaced by the KafkaSystemFactory and
KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
partitions will be handled properly by Samza. The yarn.container.count
simply specifies how many containers (java processes) you get to run
your
tasks in. If you have only one TaskInstance, but specify a container
count
of 2, the second container won't have any partitions to process, and I
believe the job will fail. You need to set your container count <=  the
partition count of your input topics.

Ok, so it sounds like should be able to have multiple task instances
in a single container, if the partitioning works.

Thanks!
Tyson



Cheers,
Chris

On 4/11/14 12:55 PM, "Tyson Norris"
<tn...@adobe.com>> wrote:

Possibly related - I cannot seem to find the source for
KafkaPartitionManager - can someone point me to it?

Thanks
Tyson

On Apr 11, 2014, at 12:15 PM, Tyson Norris
<tn...@adobe.com>> wrote:

Hi -
I have a couple questions about partitioning - I¹m trying to have
multiple tasks instances run, each processing a separate partition, and
it appears that only a single task instance runs, processing all
partitions. Or else my partitions are not created properly. This is
based on a modified version of hello-samza, so I¹m not sure exactly
which config+code steps to take to enable partitioning of message to
multiple instances of the same task.

To route to a partition i use: messageCollector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
outgoingMap));
- question here: the example in docs of collector.send(new
OutgoingMessageEnvelope(new SystemStream("kafka",
"SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
confusing, because the OutgoingMessageEnvelope constructor has a key
AND
partitionKey - I assume the partitionKey should be msg.get(³user_id²)
in
this case, but what should key be? Just a unique value for this
message?

I tried the 3 parameter constructor as well and have similar problems,
where my single task instance is used regardless of partitionKey
specified in the OutgoingMessageEnvelope.

Do I need to specify partition manager and yarn.container.count to get
multiple instances of my task working to service separate partitions?

I¹m not sure how to tell if my messages are routed to the correct
partition in kafka, or whether the problem is a partition handling
config in samza.

Any advice is appreciated!

Thanks
Tyson








Re: partitioning question

Posted by Martin Kleppmann <mk...@linkedin.com>.
Have you set a serde (serializer/deserializer) for the key? You need to tell Samza how to go from a string to bytes, and reverse. Something like this:

serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
systems.kafka.samza.key.serde=string

Martin

On 15 Apr 2014, at 06:56, Tyson Norris <tn...@adobe.com> wrote:
> Hi - 
> Just following up on this, I ran into a couple other problems, as I am trying to use a String for partition key. 
> 
> 
> Passing String types for either key or partitionKey generated:
> KafkaSystemProducer [WARN] Triggering a reconnect for kafka because connection failed: java.lang.ClassCastException: java.lang.String cannot be cast to [B
> 
> I was able to configure kafka with:
> partitioner.class=kafka.producer.DefaultPartitioner
> 
> In which case I can now use string for partitionKey and byte[] for key, which worked (yay!). 
> 
> However, if I change to pass only key, it still fails with the  java.lang.ClassCastException: java.lang.String cannot be cast to [B
> I also tried specifying in the kafka producer.properties:
> key.serializer.class=kafka.serializer.StringEncoder
> 
> but had the same results. 
> 
> Is there something special I need to do to use String type for key when using:
> public OutgoingMessageEnvelope(SystemStream systemStream,
>                               java.lang.Object key,
>                               java.lang.Object message)
> 
> Thanks
> Tyson
> 
> On Apr 14, 2014, at 12:58 PM, Chris Riccomini <cr...@linkedin.com> wrote:
> 
>> Hey Tyler,
>> 
>> Yeah, sorry this is not more clear. Physical Kafka partitions are
>> per-topic. The default partition count for a newly created topic in Kafka
>> is defined using the num.partitions setting, as you've discovered. The
>> default setting that Kafka ships with is 1. This can be overridden on a
>> per-topic basis by using the kafka-create-topic.sh tool.
>> 
>> Cheers,
>> Chris
>> 
>> On 4/14/14 12:47 PM, "Tyson Norris" <tn...@adobe.com> wrote:
>> 
>>> OK that was wrong as well (my SystemFactory is indeed supposed to use a
>>> single partition, just not the kafka system factory), but I finally got
>>> things working as expected with some kafka config changes.
>>> 
>>> For now, I set in deploy/kafka/config/server.properties:
>>> num.partitions=5
>>> 
>>> (although I assume there is a better per-topic value I should set instead
>>> of a default like this)
>>> 
>>> And now I see multiple task instances created as desired.
>>> 
>>> Thanks
>>> Tyson
>>> 
>>> On Apr 14, 2014, at 12:36 PM, Tyson Norris <tn...@adobe.com> wrote:
>>> 
>>>> OK sorry for the noise.
>>>> I stumbled upon another clue - my SystemFactory has (based on
>>>> WikipediaSystemFactory) :
>>>>  @Override
>>>>  public SystemAdmin getAdmin(String systemName, Config config) {
>>>>      return new SinglePartitionWithoutOffsetsSystemAdmin();
>>>>  }
>>>> 
>>>> Which I guess is a good reason my system is only using a single
>>>> partition. Doh.
>>>> I will work on a new SystemFactory impl to test with...
>>>> 
>>>> Thanks
>>>> Tyson
>>>> 
>>>> On Apr 14, 2014, at 12:20 PM, Tyson Norris <tn...@adobe.com> wrote:
>>>> 
>>>>> I am actually wondering if I’m missing a bit of configuration that
>>>>> indicates the number of partitions I want to create for various kafka
>>>>> topics that messages are sent to?
>>>>> 
>>>>> I don’t see where this should be added in the config, and it appears
>>>>> the partitions are not created automatically when I specify the key for
>>>>> partitioning.
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> Tyson
>>>>> 
>>>>> 
>>>>> 
>>>>> On Apr 14, 2014, at 10:35 AM, Tyson Norris
>>>>> <tn...@adobe.com>> wrote:
>>>>> 
>>>>> Hi Chris -
>>>>> 
>>>>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini
>>>>> <cr...@linkedin.com>> wrote:
>>>>> 
>>>>> Hey Tyler,
>>>>> 
>>>>> """I¹m trying to have multiple tasks instances run, each processing a
>>>>> separate partition, and it appears that only a single task instance
>>>>> runs,
>>>>> processing all partitions. Or else my partitions are not created
>>>>> properly."""
>>>>> 
>>>>> Are you trying to consume a single stream that has multiple
>>>>> partitions, or
>>>>> are you processing multiple streams that all have one partition? If
>>>>> it's
>>>>> the latter, all of these messages will get routed to a single task
>>>>> instance. There is an upcoming patch to allow alternative partition
>>>>> task
>>>>> instance mappings (SAMZA-71), which Jakob Homan is working on
>>>>> currently.
>>>>> 
>>>>> No I’m trying for the former, although my SystemConsumer is set up
>>>>> like the latter.  That is, I have a system consumer that should
>>>>> generate messages in a single partition, and a task that takes all
>>>>> messages and splits them into multiple partitions.
>>>>> 
>>>>> So, in my SystemConsumer I have:
>>>>>    SystemStreamPartition systemStreamPartition = new
>>>>> SystemStreamPartition(systemName, streamId, new Partition(0));
>>>>>    try {
>>>>>        put(systemStreamPartition, new
>>>>> IncomingMessageEnvelope(systemStreamPartition, null, null, object));
>>>>> 
>>>>> which generates messages on the same stream + partition.
>>>>> 
>>>>> Then in my first task I have:
>>>>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>>>> partitionKey.getBytes(), outgoingMap));
>>>>> 
>>>>> which I am trying to get routed to separate task instances based on
>>>>> partitionKey.
>>>>> 
>>>>> 
>>>>> If you have a single input stream with multiple partitions, you should
>>>>> end
>>>>> up with one task instance per partition. This partitioning model is
>>>>> explained in some detail at the 20 minute mark in this talk:
>>>>> 
>>>>> http://www.infoq.com/presentations/samza-linkedin
>>>>> 
>>>>> """the example in docs of collector.send(new
>>>>> OutgoingMessageEnvelope(new
>>>>> SystemStream("kafka", "SomeTopicPartitionedByUserId"),
>>>>> msg.get("user_id"),
>>>>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor
>>>>> has
>>>>> a key AND partitionKey - I assume the partitionKey should be
>>>>> msg.get(³user_id²) in this case, but what should key be? Just a unique
>>>>> value for this message?"""
>>>>> 
>>>>> The OutgoingMessageEnvelope has several constructors. The two you're
>>>>> referring to are:
>>>>> 
>>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object
>>>>> partitionKey, Object key, Object message)
>>>>> 
>>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
>>>>> Object message)
>>>>> 
>>>>> 
>>>>> This is, indeed, odd. In general, people only want the second
>>>>> constructor
>>>>> (systemStream, key, message). The constructor with the partitionKey has
>>>>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
>>>>> with messages in the actual log segments on the disk. This is useful
>>>>> because it means you can get access to the key information that was
>>>>> sent
>>>>> with the message. It also means that you can use log-compaction to
>>>>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
>>>>> cases where you might wish to partition a topic by one key (say, member
>>>>> ID), but store (or de-deuplicate by) a different key with the message.
>>>>> 
>>>>> 
>>>>> So in the case where I don’t care about deduplication, is the second
>>>>> constructor “key” parameter actually used as partition key?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> """Do I need to specify partition manager and yarn.container.count to
>>>>> get
>>>>> multiple instances of my task working to service separate
>>>>> partitions?"""
>>>>> 
>>>>> This class has been replaced by the KafkaSystemFactory and
>>>>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
>>>>> partitions will be handled properly by Samza. The yarn.container.count
>>>>> simply specifies how many containers (java processes) you get to run
>>>>> your
>>>>> tasks in. If you have only one TaskInstance, but specify a container
>>>>> count
>>>>> of 2, the second container won't have any partitions to process, and I
>>>>> believe the job will fail. You need to set your container count <=  the
>>>>> partition count of your input topics.
>>>>> 
>>>>> Ok, so it sounds like should be able to have multiple task instances
>>>>> in a single container, if the partitioning works.
>>>>> 
>>>>> Thanks!
>>>>> Tyson
>>>>> 
>>>>> 
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 4/11/14 12:55 PM, "Tyson Norris"
>>>>> <tn...@adobe.com>> wrote:
>>>>> 
>>>>> Possibly related - I cannot seem to find the source for
>>>>> KafkaPartitionManager - can someone point me to it?
>>>>> 
>>>>> Thanks
>>>>> Tyson
>>>>> 
>>>>> On Apr 11, 2014, at 12:15 PM, Tyson Norris
>>>>> <tn...@adobe.com>> wrote:
>>>>> 
>>>>> Hi -
>>>>> I have a couple questions about partitioning - I¹m trying to have
>>>>> multiple tasks instances run, each processing a separate partition, and
>>>>> it appears that only a single task instance runs, processing all
>>>>> partitions. Or else my partitions are not created properly. This is
>>>>> based on a modified version of hello-samza, so I¹m not sure exactly
>>>>> which config+code steps to take to enable partitioning of message to
>>>>> multiple instances of the same task.
>>>>> 
>>>>> To route to a partition i use: messageCollector.send(new
>>>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>>>> outgoingMap));
>>>>> - question here: the example in docs of collector.send(new
>>>>> OutgoingMessageEnvelope(new SystemStream("kafka",
>>>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>>>> confusing, because the OutgoingMessageEnvelope constructor has a key
>>>>> AND
>>>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²)
>>>>> in
>>>>> this case, but what should key be? Just a unique value for this
>>>>> message?
>>>>> 
>>>>> I tried the 3 parameter constructor as well and have similar problems,
>>>>> where my single task instance is used regardless of partitionKey
>>>>> specified in the OutgoingMessageEnvelope.
>>>>> 
>>>>> Do I need to specify partition manager and yarn.container.count to get
>>>>> multiple instances of my task working to service separate partitions?
>>>>> 
>>>>> I¹m not sure how to tell if my messages are routed to the correct
>>>>> partition in kafka, or whether the problem is a partition handling
>>>>> config in samza.
>>>>> 
>>>>> Any advice is appreciated!
>>>>> 
>>>>> Thanks
>>>>> Tyson
>>>>> 
>>>> 
>>> 
>> 
> 


Re: partitioning question

Posted by Tyson Norris <tn...@adobe.com>.
Hi - 
Just following up on this, I ran into a couple other problems, as I am trying to use a String for partition key. 


Passing String types for either key or partitionKey generated:
KafkaSystemProducer [WARN] Triggering a reconnect for kafka because connection failed: java.lang.ClassCastException: java.lang.String cannot be cast to [B

I was able to configure kafka with:
partitioner.class=kafka.producer.DefaultPartitioner

In which case I can now use string for partitionKey and byte[] for key, which worked (yay!). 

However, if I change to pass only key, it still fails with the  java.lang.ClassCastException: java.lang.String cannot be cast to [B
I also tried specifying in the kafka producer.properties:
key.serializer.class=kafka.serializer.StringEncoder

but had the same results. 

Is there something special I need to do to use String type for key when using:
public OutgoingMessageEnvelope(SystemStream systemStream,
                               java.lang.Object key,
                               java.lang.Object message)

Thanks
Tyson

On Apr 14, 2014, at 12:58 PM, Chris Riccomini <cr...@linkedin.com> wrote:

> Hey Tyler,
> 
> Yeah, sorry this is not more clear. Physical Kafka partitions are
> per-topic. The default partition count for a newly created topic in Kafka
> is defined using the num.partitions setting, as you've discovered. The
> default setting that Kafka ships with is 1. This can be overridden on a
> per-topic basis by using the kafka-create-topic.sh tool.
> 
> Cheers,
> Chris
> 
> On 4/14/14 12:47 PM, "Tyson Norris" <tn...@adobe.com> wrote:
> 
>> OK that was wrong as well (my SystemFactory is indeed supposed to use a
>> single partition, just not the kafka system factory), but I finally got
>> things working as expected with some kafka config changes.
>> 
>> For now, I set in deploy/kafka/config/server.properties:
>> num.partitions=5
>> 
>> (although I assume there is a better per-topic value I should set instead
>> of a default like this)
>> 
>> And now I see multiple task instances created as desired.
>> 
>> Thanks
>> Tyson
>> 
>> On Apr 14, 2014, at 12:36 PM, Tyson Norris <tn...@adobe.com> wrote:
>> 
>>> OK sorry for the noise.
>>> I stumbled upon another clue - my SystemFactory has (based on
>>> WikipediaSystemFactory) :
>>>   @Override
>>>   public SystemAdmin getAdmin(String systemName, Config config) {
>>>       return new SinglePartitionWithoutOffsetsSystemAdmin();
>>>   }
>>> 
>>> Which I guess is a good reason my system is only using a single
>>> partition. Doh.
>>> I will work on a new SystemFactory impl to test with...
>>> 
>>> Thanks
>>> Tyson
>>> 
>>> On Apr 14, 2014, at 12:20 PM, Tyson Norris <tn...@adobe.com> wrote:
>>> 
>>>> I am actually wondering if I’m missing a bit of configuration that
>>>> indicates the number of partitions I want to create for various kafka
>>>> topics that messages are sent to?
>>>> 
>>>> I don’t see where this should be added in the config, and it appears
>>>> the partitions are not created automatically when I specify the key for
>>>> partitioning.
>>>> 
>>>> 
>>>> Thanks
>>>> Tyson
>>>> 
>>>> 
>>>> 
>>>> On Apr 14, 2014, at 10:35 AM, Tyson Norris
>>>> <tn...@adobe.com>> wrote:
>>>> 
>>>> Hi Chris -
>>>> 
>>>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini
>>>> <cr...@linkedin.com>> wrote:
>>>> 
>>>> Hey Tyler,
>>>> 
>>>> """I¹m trying to have multiple tasks instances run, each processing a
>>>> separate partition, and it appears that only a single task instance
>>>> runs,
>>>> processing all partitions. Or else my partitions are not created
>>>> properly."""
>>>> 
>>>> Are you trying to consume a single stream that has multiple
>>>> partitions, or
>>>> are you processing multiple streams that all have one partition? If
>>>> it's
>>>> the latter, all of these messages will get routed to a single task
>>>> instance. There is an upcoming patch to allow alternative partition
>>>> task
>>>> instance mappings (SAMZA-71), which Jakob Homan is working on
>>>> currently.
>>>> 
>>>> No I’m trying for the former, although my SystemConsumer is set up
>>>> like the latter.  That is, I have a system consumer that should
>>>> generate messages in a single partition, and a task that takes all
>>>> messages and splits them into multiple partitions.
>>>> 
>>>> So, in my SystemConsumer I have:
>>>>     SystemStreamPartition systemStreamPartition = new
>>>> SystemStreamPartition(systemName, streamId, new Partition(0));
>>>>     try {
>>>>         put(systemStreamPartition, new
>>>> IncomingMessageEnvelope(systemStreamPartition, null, null, object));
>>>> 
>>>> which generates messages on the same stream + partition.
>>>> 
>>>> Then in my first task I have:
>>>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>>> partitionKey.getBytes(), outgoingMap));
>>>> 
>>>> which I am trying to get routed to separate task instances based on
>>>> partitionKey.
>>>> 
>>>> 
>>>> If you have a single input stream with multiple partitions, you should
>>>> end
>>>> up with one task instance per partition. This partitioning model is
>>>> explained in some detail at the 20 minute mark in this talk:
>>>> 
>>>> http://www.infoq.com/presentations/samza-linkedin
>>>> 
>>>> """the example in docs of collector.send(new
>>>> OutgoingMessageEnvelope(new
>>>> SystemStream("kafka", "SomeTopicPartitionedByUserId"),
>>>> msg.get("user_id"),
>>>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor
>>>> has
>>>> a key AND partitionKey - I assume the partitionKey should be
>>>> msg.get(³user_id²) in this case, but what should key be? Just a unique
>>>> value for this message?"""
>>>> 
>>>> The OutgoingMessageEnvelope has several constructors. The two you're
>>>> referring to are:
>>>> 
>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object
>>>> partitionKey, Object key, Object message)
>>>> 
>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
>>>> Object message)
>>>> 
>>>> 
>>>> This is, indeed, odd. In general, people only want the second
>>>> constructor
>>>> (systemStream, key, message). The constructor with the partitionKey has
>>>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
>>>> with messages in the actual log segments on the disk. This is useful
>>>> because it means you can get access to the key information that was
>>>> sent
>>>> with the message. It also means that you can use log-compaction to
>>>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
>>>> cases where you might wish to partition a topic by one key (say, member
>>>> ID), but store (or de-deuplicate by) a different key with the message.
>>>> 
>>>> 
>>>> So in the case where I don’t care about deduplication, is the second
>>>> constructor “key” parameter actually used as partition key?
>>>> 
>>>> 
>>>> 
>>>> 
>>>> """Do I need to specify partition manager and yarn.container.count to
>>>> get
>>>> multiple instances of my task working to service separate
>>>> partitions?"""
>>>> 
>>>> This class has been replaced by the KafkaSystemFactory and
>>>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
>>>> partitions will be handled properly by Samza. The yarn.container.count
>>>> simply specifies how many containers (java processes) you get to run
>>>> your
>>>> tasks in. If you have only one TaskInstance, but specify a container
>>>> count
>>>> of 2, the second container won't have any partitions to process, and I
>>>> believe the job will fail. You need to set your container count <=  the
>>>> partition count of your input topics.
>>>> 
>>>> Ok, so it sounds like should be able to have multiple task instances
>>>> in a single container, if the partitioning works.
>>>> 
>>>> Thanks!
>>>> Tyson
>>>> 
>>>> 
>>>> 
>>>> Cheers,
>>>> Chris
>>>> 
>>>> On 4/11/14 12:55 PM, "Tyson Norris"
>>>> <tn...@adobe.com>> wrote:
>>>> 
>>>> Possibly related - I cannot seem to find the source for
>>>> KafkaPartitionManager - can someone point me to it?
>>>> 
>>>> Thanks
>>>> Tyson
>>>> 
>>>> On Apr 11, 2014, at 12:15 PM, Tyson Norris
>>>> <tn...@adobe.com>> wrote:
>>>> 
>>>> Hi -
>>>> I have a couple questions about partitioning - I¹m trying to have
>>>> multiple tasks instances run, each processing a separate partition, and
>>>> it appears that only a single task instance runs, processing all
>>>> partitions. Or else my partitions are not created properly. This is
>>>> based on a modified version of hello-samza, so I¹m not sure exactly
>>>> which config+code steps to take to enable partitioning of message to
>>>> multiple instances of the same task.
>>>> 
>>>> To route to a partition i use: messageCollector.send(new
>>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>>> outgoingMap));
>>>> - question here: the example in docs of collector.send(new
>>>> OutgoingMessageEnvelope(new SystemStream("kafka",
>>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>>> confusing, because the OutgoingMessageEnvelope constructor has a key
>>>> AND
>>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²)
>>>> in
>>>> this case, but what should key be? Just a unique value for this
>>>> message?
>>>> 
>>>> I tried the 3 parameter constructor as well and have similar problems,
>>>> where my single task instance is used regardless of partitionKey
>>>> specified in the OutgoingMessageEnvelope.
>>>> 
>>>> Do I need to specify partition manager and yarn.container.count to get
>>>> multiple instances of my task working to service separate partitions?
>>>> 
>>>> I¹m not sure how to tell if my messages are routed to the correct
>>>> partition in kafka, or whether the problem is a partition handling
>>>> config in samza.
>>>> 
>>>> Any advice is appreciated!
>>>> 
>>>> Thanks
>>>> Tyson
>>>> 
>>> 
>> 
> 


Re: partitioning question

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Tyler,

Yeah, sorry this is not more clear. Physical Kafka partitions are
per-topic. The default partition count for a newly created topic in Kafka
is defined using the num.partitions setting, as you've discovered. The
default setting that Kafka ships with is 1. This can be overridden on a
per-topic basis by using the kafka-create-topic.sh tool.

Cheers,
Chris

On 4/14/14 12:47 PM, "Tyson Norris" <tn...@adobe.com> wrote:

>OK that was wrong as well (my SystemFactory is indeed supposed to use a
>single partition, just not the kafka system factory), but I finally got
>things working as expected with some kafka config changes.
>
>For now, I set in deploy/kafka/config/server.properties:
>num.partitions=5
>
>(although I assume there is a better per-topic value I should set instead
>of a default like this)
>
>And now I see multiple task instances created as desired.
>
>Thanks
>Tyson
>
>On Apr 14, 2014, at 12:36 PM, Tyson Norris <tn...@adobe.com> wrote:
>
>> OK sorry for the noise.
>> I stumbled upon another clue - my SystemFactory has (based on
>>WikipediaSystemFactory) :
>>    @Override
>>    public SystemAdmin getAdmin(String systemName, Config config) {
>>        return new SinglePartitionWithoutOffsetsSystemAdmin();
>>    }
>> 
>> Which I guess is a good reason my system is only using a single
>>partition. Doh.
>> I will work on a new SystemFactory impl to test with...
>> 
>> Thanks
>> Tyson
>> 
>> On Apr 14, 2014, at 12:20 PM, Tyson Norris <tn...@adobe.com> wrote:
>> 
>>> I am actually wondering if I’m missing a bit of configuration that
>>>indicates the number of partitions I want to create for various kafka
>>>topics that messages are sent to?
>>> 
>>> I don’t see where this should be added in the config, and it appears
>>>the partitions are not created automatically when I specify the key for
>>>partitioning.
>>> 
>>> 
>>> Thanks
>>> Tyson
>>> 
>>> 
>>> 
>>> On Apr 14, 2014, at 10:35 AM, Tyson Norris
>>><tn...@adobe.com>> wrote:
>>> 
>>> Hi Chris -
>>> 
>>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini
>>><cr...@linkedin.com>> wrote:
>>> 
>>> Hey Tyler,
>>> 
>>> """I¹m trying to have multiple tasks instances run, each processing a
>>> separate partition, and it appears that only a single task instance
>>>runs,
>>> processing all partitions. Or else my partitions are not created
>>> properly."""
>>> 
>>> Are you trying to consume a single stream that has multiple
>>>partitions, or
>>> are you processing multiple streams that all have one partition? If
>>>it's
>>> the latter, all of these messages will get routed to a single task
>>> instance. There is an upcoming patch to allow alternative partition
>>>task
>>> instance mappings (SAMZA-71), which Jakob Homan is working on
>>>currently.
>>> 
>>> No I’m trying for the former, although my SystemConsumer is set up
>>>like the latter.  That is, I have a system consumer that should
>>>generate messages in a single partition, and a task that takes all
>>>messages and splits them into multiple partitions.
>>> 
>>> So, in my SystemConsumer I have:
>>>      SystemStreamPartition systemStreamPartition = new
>>>SystemStreamPartition(systemName, streamId, new Partition(0));
>>>      try {
>>>          put(systemStreamPartition, new
>>>IncomingMessageEnvelope(systemStreamPartition, null, null, object));
>>> 
>>> which generates messages on the same stream + partition.
>>> 
>>> Then in my first task I have:
>>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>>partitionKey.getBytes(), outgoingMap));
>>> 
>>> which I am trying to get routed to separate task instances based on
>>>partitionKey.
>>> 
>>> 
>>> If you have a single input stream with multiple partitions, you should
>>>end
>>> up with one task instance per partition. This partitioning model is
>>> explained in some detail at the 20 minute mark in this talk:
>>> 
>>> http://www.infoq.com/presentations/samza-linkedin
>>> 
>>> """the example in docs of collector.send(new
>>>OutgoingMessageEnvelope(new
>>> SystemStream("kafka", "SomeTopicPartitionedByUserId"),
>>>msg.get("user_id"),
>>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor
>>>has
>>> a key AND partitionKey - I assume the partitionKey should be
>>> msg.get(³user_id²) in this case, but what should key be? Just a unique
>>> value for this message?"""
>>> 
>>> The OutgoingMessageEnvelope has several constructors. The two you're
>>> referring to are:
>>> 
>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object
>>> partitionKey, Object key, Object message)
>>> 
>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
>>> Object message)
>>> 
>>> 
>>> This is, indeed, odd. In general, people only want the second
>>>constructor
>>> (systemStream, key, message). The constructor with the partitionKey has
>>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
>>> with messages in the actual log segments on the disk. This is useful
>>> because it means you can get access to the key information that was
>>>sent
>>> with the message. It also means that you can use log-compaction to
>>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
>>> cases where you might wish to partition a topic by one key (say, member
>>> ID), but store (or de-deuplicate by) a different key with the message.
>>> 
>>> 
>>> So in the case where I don’t care about deduplication, is the second
>>>constructor “key” parameter actually used as partition key?
>>> 
>>> 
>>> 
>>> 
>>> """Do I need to specify partition manager and yarn.container.count to
>>>get
>>> multiple instances of my task working to service separate
>>>partitions?"""
>>> 
>>> This class has been replaced by the KafkaSystemFactory and
>>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
>>> partitions will be handled properly by Samza. The yarn.container.count
>>> simply specifies how many containers (java processes) you get to run
>>>your
>>> tasks in. If you have only one TaskInstance, but specify a container
>>>count
>>> of 2, the second container won't have any partitions to process, and I
>>> believe the job will fail. You need to set your container count <=  the
>>> partition count of your input topics.
>>> 
>>> Ok, so it sounds like should be able to have multiple task instances
>>>in a single container, if the partitioning works.
>>> 
>>> Thanks!
>>> Tyson
>>> 
>>> 
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 4/11/14 12:55 PM, "Tyson Norris"
>>><tn...@adobe.com>> wrote:
>>> 
>>> Possibly related - I cannot seem to find the source for
>>> KafkaPartitionManager - can someone point me to it?
>>> 
>>> Thanks
>>> Tyson
>>> 
>>> On Apr 11, 2014, at 12:15 PM, Tyson Norris
>>><tn...@adobe.com>> wrote:
>>> 
>>> Hi -
>>> I have a couple questions about partitioning - I¹m trying to have
>>> multiple tasks instances run, each processing a separate partition, and
>>> it appears that only a single task instance runs, processing all
>>> partitions. Or else my partitions are not created properly. This is
>>> based on a modified version of hello-samza, so I¹m not sure exactly
>>> which config+code steps to take to enable partitioning of message to
>>> multiple instances of the same task.
>>> 
>>> To route to a partition i use: messageCollector.send(new
>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>> outgoingMap));
>>> - question here: the example in docs of collector.send(new
>>> OutgoingMessageEnvelope(new SystemStream("kafka",
>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>> confusing, because the OutgoingMessageEnvelope constructor has a key
>>>AND
>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²)
>>>in
>>> this case, but what should key be? Just a unique value for this
>>>message?
>>> 
>>> I tried the 3 parameter constructor as well and have similar problems,
>>> where my single task instance is used regardless of partitionKey
>>> specified in the OutgoingMessageEnvelope.
>>> 
>>> Do I need to specify partition manager and yarn.container.count to get
>>> multiple instances of my task working to service separate partitions?
>>> 
>>> I¹m not sure how to tell if my messages are routed to the correct
>>> partition in kafka, or whether the problem is a partition handling
>>> config in samza.
>>> 
>>> Any advice is appreciated!
>>> 
>>> Thanks
>>> Tyson
>>> 
>> 
>


Re: partitioning question

Posted by Tyson Norris <tn...@adobe.com>.
OK that was wrong as well (my SystemFactory is indeed supposed to use a single partition, just not the kafka system factory), but I finally got things working as expected with some kafka config changes. 

For now, I set in deploy/kafka/config/server.properties:
num.partitions=5

(although I assume there is a better per-topic value I should set instead of a default like this)

And now I see multiple task instances created as desired. 

Thanks
Tyson

On Apr 14, 2014, at 12:36 PM, Tyson Norris <tn...@adobe.com> wrote:

> OK sorry for the noise. 
> I stumbled upon another clue - my SystemFactory has (based on WikipediaSystemFactory) :
>    @Override
>    public SystemAdmin getAdmin(String systemName, Config config) {
>        return new SinglePartitionWithoutOffsetsSystemAdmin();
>    }
> 
> Which I guess is a good reason my system is only using a single partition. Doh.
> I will work on a new SystemFactory impl to test with...
> 
> Thanks
> Tyson
> 
> On Apr 14, 2014, at 12:20 PM, Tyson Norris <tn...@adobe.com> wrote:
> 
>> I am actually wondering if I’m missing a bit of configuration that indicates the number of partitions I want to create for various kafka topics that messages are sent to?
>> 
>> I don’t see where this should be added in the config, and it appears the partitions are not created automatically when I specify the key for partitioning.
>> 
>> 
>> Thanks
>> Tyson
>> 
>> 
>> 
>> On Apr 14, 2014, at 10:35 AM, Tyson Norris <tn...@adobe.com>> wrote:
>> 
>> Hi Chris -
>> 
>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini <cr...@linkedin.com>> wrote:
>> 
>> Hey Tyler,
>> 
>> """I¹m trying to have multiple tasks instances run, each processing a
>> separate partition, and it appears that only a single task instance runs,
>> processing all partitions. Or else my partitions are not created
>> properly."""
>> 
>> Are you trying to consume a single stream that has multiple partitions, or
>> are you processing multiple streams that all have one partition? If it's
>> the latter, all of these messages will get routed to a single task
>> instance. There is an upcoming patch to allow alternative partition task
>> instance mappings (SAMZA-71), which Jakob Homan is working on currently.
>> 
>> No I’m trying for the former, although my SystemConsumer is set up like the latter.  That is, I have a system consumer that should generate messages in a single partition, and a task that takes all messages and splits them into multiple partitions.
>> 
>> So, in my SystemConsumer I have:
>>      SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamId, new Partition(0));
>>      try {
>>          put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, object));
>> 
>> which generates messages on the same stream + partition.
>> 
>> Then in my first task I have:
>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, partitionKey.getBytes(), outgoingMap));
>> 
>> which I am trying to get routed to separate task instances based on partitionKey.
>> 
>> 
>> If you have a single input stream with multiple partitions, you should end
>> up with one task instance per partition. This partitioning model is
>> explained in some detail at the 20 minute mark in this talk:
>> 
>> http://www.infoq.com/presentations/samza-linkedin
>> 
>> """the example in docs of collector.send(new OutgoingMessageEnvelope(new
>> SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"),
>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor has
>> a key AND partitionKey - I assume the partitionKey should be
>> msg.get(³user_id²) in this case, but what should key be? Just a unique
>> value for this message?"""
>> 
>> The OutgoingMessageEnvelope has several constructors. The two you're
>> referring to are:
>> 
>> public OutgoingMessageEnvelope(SystemStream systemStream, Object
>> partitionKey, Object key, Object message)
>> 
>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
>> Object message)
>> 
>> 
>> This is, indeed, odd. In general, people only want the second constructor
>> (systemStream, key, message). The constructor with the partitionKey has
>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
>> with messages in the actual log segments on the disk. This is useful
>> because it means you can get access to the key information that was sent
>> with the message. It also means that you can use log-compaction to
>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
>> cases where you might wish to partition a topic by one key (say, member
>> ID), but store (or de-deuplicate by) a different key with the message.
>> 
>> 
>> So in the case where I don’t care about deduplication, is the second constructor “key” parameter actually used as partition key?
>> 
>> 
>> 
>> 
>> """Do I need to specify partition manager and yarn.container.count to get
>> multiple instances of my task working to service separate partitions?"""
>> 
>> This class has been replaced by the KafkaSystemFactory and
>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
>> partitions will be handled properly by Samza. The yarn.container.count
>> simply specifies how many containers (java processes) you get to run your
>> tasks in. If you have only one TaskInstance, but specify a container count
>> of 2, the second container won't have any partitions to process, and I
>> believe the job will fail. You need to set your container count <=  the
>> partition count of your input topics.
>> 
>> Ok, so it sounds like should be able to have multiple task instances in a single container, if the partitioning works.
>> 
>> Thanks!
>> Tyson
>> 
>> 
>> 
>> Cheers,
>> Chris
>> 
>> On 4/11/14 12:55 PM, "Tyson Norris" <tn...@adobe.com>> wrote:
>> 
>> Possibly related - I cannot seem to find the source for
>> KafkaPartitionManager - can someone point me to it?
>> 
>> Thanks
>> Tyson
>> 
>> On Apr 11, 2014, at 12:15 PM, Tyson Norris <tn...@adobe.com>> wrote:
>> 
>> Hi -
>> I have a couple questions about partitioning - I¹m trying to have
>> multiple tasks instances run, each processing a separate partition, and
>> it appears that only a single task instance runs, processing all
>> partitions. Or else my partitions are not created properly. This is
>> based on a modified version of hello-samza, so I¹m not sure exactly
>> which config+code steps to take to enable partitioning of message to
>> multiple instances of the same task.
>> 
>> To route to a partition i use: messageCollector.send(new
>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>> outgoingMap));
>> - question here: the example in docs of collector.send(new
>> OutgoingMessageEnvelope(new SystemStream("kafka",
>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>> confusing, because the OutgoingMessageEnvelope constructor has a key AND
>> partitionKey - I assume the partitionKey should be msg.get(³user_id²) in
>> this case, but what should key be? Just a unique value for this message?
>> 
>> I tried the 3 parameter constructor as well and have similar problems,
>> where my single task instance is used regardless of partitionKey
>> specified in the OutgoingMessageEnvelope.
>> 
>> Do I need to specify partition manager and yarn.container.count to get
>> multiple instances of my task working to service separate partitions?
>> 
>> I¹m not sure how to tell if my messages are routed to the correct
>> partition in kafka, or whether the problem is a partition handling
>> config in samza.
>> 
>> Any advice is appreciated!
>> 
>> Thanks
>> Tyson
>> 
> 


Re: partitioning question

Posted by Tyson Norris <tn...@adobe.com>.
OK sorry for the noise. 
I stumbled upon another clue - my SystemFactory has (based on WikipediaSystemFactory) :
    @Override
    public SystemAdmin getAdmin(String systemName, Config config) {
        return new SinglePartitionWithoutOffsetsSystemAdmin();
    }

Which I guess is a good reason my system is only using a single partition. Doh.
I will work on a new SystemFactory impl to test with...

Thanks
Tyson

On Apr 14, 2014, at 12:20 PM, Tyson Norris <tn...@adobe.com> wrote:

> I am actually wondering if I’m missing a bit of configuration that indicates the number of partitions I want to create for various kafka topics that messages are sent to?
> 
> I don’t see where this should be added in the config, and it appears the partitions are not created automatically when I specify the key for partitioning.
> 
> 
> Thanks
> Tyson
> 
> 
> 
> On Apr 14, 2014, at 10:35 AM, Tyson Norris <tn...@adobe.com>> wrote:
> 
> Hi Chris -
> 
> On Apr 14, 2014, at 9:13 AM, Chris Riccomini <cr...@linkedin.com>> wrote:
> 
> Hey Tyler,
> 
> """I¹m trying to have multiple tasks instances run, each processing a
> separate partition, and it appears that only a single task instance runs,
> processing all partitions. Or else my partitions are not created
> properly."""
> 
> Are you trying to consume a single stream that has multiple partitions, or
> are you processing multiple streams that all have one partition? If it's
> the latter, all of these messages will get routed to a single task
> instance. There is an upcoming patch to allow alternative partition task
> instance mappings (SAMZA-71), which Jakob Homan is working on currently.
> 
> No I’m trying for the former, although my SystemConsumer is set up like the latter.  That is, I have a system consumer that should generate messages in a single partition, and a task that takes all messages and splits them into multiple partitions.
> 
> So, in my SystemConsumer I have:
>       SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamId, new Partition(0));
>       try {
>           put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, object));
> 
> which generates messages on the same stream + partition.
> 
> Then in my first task I have:
> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, partitionKey.getBytes(), outgoingMap));
> 
> which I am trying to get routed to separate task instances based on partitionKey.
> 
> 
> If you have a single input stream with multiple partitions, you should end
> up with one task instance per partition. This partitioning model is
> explained in some detail at the 20 minute mark in this talk:
> 
> http://www.infoq.com/presentations/samza-linkedin
> 
> """the example in docs of collector.send(new OutgoingMessageEnvelope(new
> SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"),
> msg)) seems confusing, because the OutgoingMessageEnvelope constructor has
> a key AND partitionKey - I assume the partitionKey should be
> msg.get(³user_id²) in this case, but what should key be? Just a unique
> value for this message?"""
> 
> The OutgoingMessageEnvelope has several constructors. The two you're
> referring to are:
> 
> public OutgoingMessageEnvelope(SystemStream systemStream, Object
> partitionKey, Object key, Object message)
> 
> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
> Object message)
> 
> 
> This is, indeed, odd. In general, people only want the second constructor
> (systemStream, key, message). The constructor with the partitionKey has
> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
> with messages in the actual log segments on the disk. This is useful
> because it means you can get access to the key information that was sent
> with the message. It also means that you can use log-compaction to
> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
> cases where you might wish to partition a topic by one key (say, member
> ID), but store (or de-deuplicate by) a different key with the message.
> 
> 
> So in the case where I don’t care about deduplication, is the second constructor “key” parameter actually used as partition key?
> 
> 
> 
> 
> """Do I need to specify partition manager and yarn.container.count to get
> multiple instances of my task working to service separate partitions?"""
> 
> This class has been replaced by the KafkaSystemFactory and
> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
> partitions will be handled properly by Samza. The yarn.container.count
> simply specifies how many containers (java processes) you get to run your
> tasks in. If you have only one TaskInstance, but specify a container count
> of 2, the second container won't have any partitions to process, and I
> believe the job will fail. You need to set your container count <=  the
> partition count of your input topics.
> 
> Ok, so it sounds like should be able to have multiple task instances in a single container, if the partitioning works.
> 
> Thanks!
> Tyson
> 
> 
> 
> Cheers,
> Chris
> 
> On 4/11/14 12:55 PM, "Tyson Norris" <tn...@adobe.com>> wrote:
> 
> Possibly related - I cannot seem to find the source for
> KafkaPartitionManager - can someone point me to it?
> 
> Thanks
> Tyson
> 
> On Apr 11, 2014, at 12:15 PM, Tyson Norris <tn...@adobe.com>> wrote:
> 
> Hi -
> I have a couple questions about partitioning - I¹m trying to have
> multiple tasks instances run, each processing a separate partition, and
> it appears that only a single task instance runs, processing all
> partitions. Or else my partitions are not created properly. This is
> based on a modified version of hello-samza, so I¹m not sure exactly
> which config+code steps to take to enable partitioning of message to
> multiple instances of the same task.
> 
> To route to a partition i use: messageCollector.send(new
> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
> outgoingMap));
> - question here: the example in docs of collector.send(new
> OutgoingMessageEnvelope(new SystemStream("kafka",
> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
> confusing, because the OutgoingMessageEnvelope constructor has a key AND
> partitionKey - I assume the partitionKey should be msg.get(³user_id²) in
> this case, but what should key be? Just a unique value for this message?
> 
> I tried the 3 parameter constructor as well and have similar problems,
> where my single task instance is used regardless of partitionKey
> specified in the OutgoingMessageEnvelope.
> 
> Do I need to specify partition manager and yarn.container.count to get
> multiple instances of my task working to service separate partitions?
> 
> I¹m not sure how to tell if my messages are routed to the correct
> partition in kafka, or whether the problem is a partition handling
> config in samza.
> 
> Any advice is appreciated!
> 
> Thanks
> Tyson
> 


Re: partitioning question

Posted by Tyson Norris <tn...@adobe.com>.
I am actually wondering if I’m missing a bit of configuration that indicates the number of partitions I want to create for various kafka topics that messages are sent to?

I don’t see where this should be added in the config, and it appears the partitions are not created automatically when I specify the key for partitioning.


Thanks
Tyson



On Apr 14, 2014, at 10:35 AM, Tyson Norris <tn...@adobe.com>> wrote:

Hi Chris -

On Apr 14, 2014, at 9:13 AM, Chris Riccomini <cr...@linkedin.com>> wrote:

Hey Tyler,

"""I¹m trying to have multiple tasks instances run, each processing a
separate partition, and it appears that only a single task instance runs,
processing all partitions. Or else my partitions are not created
properly."""

Are you trying to consume a single stream that has multiple partitions, or
are you processing multiple streams that all have one partition? If it's
the latter, all of these messages will get routed to a single task
instance. There is an upcoming patch to allow alternative partition task
instance mappings (SAMZA-71), which Jakob Homan is working on currently.

No I’m trying for the former, although my SystemConsumer is set up like the latter.  That is, I have a system consumer that should generate messages in a single partition, and a task that takes all messages and splits them into multiple partitions.

So, in my SystemConsumer I have:
       SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamId, new Partition(0));
       try {
           put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, object));

which generates messages on the same stream + partition.

Then in my first task I have:
messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, partitionKey.getBytes(), outgoingMap));

which I am trying to get routed to separate task instances based on partitionKey.


If you have a single input stream with multiple partitions, you should end
up with one task instance per partition. This partitioning model is
explained in some detail at the 20 minute mark in this talk:

http://www.infoq.com/presentations/samza-linkedin

"""the example in docs of collector.send(new OutgoingMessageEnvelope(new
SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"),
msg)) seems confusing, because the OutgoingMessageEnvelope constructor has
a key AND partitionKey - I assume the partitionKey should be
msg.get(³user_id²) in this case, but what should key be? Just a unique
value for this message?"""

The OutgoingMessageEnvelope has several constructors. The two you're
referring to are:

public OutgoingMessageEnvelope(SystemStream systemStream, Object
partitionKey, Object key, Object message)

public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
Object message)


This is, indeed, odd. In general, people only want the second constructor
(systemStream, key, message). The constructor with the partitionKey has
its origins in the Kafka API. With Kafka 0.8, keys are now stored along
with messages in the actual log segments on the disk. This is useful
because it means you can get access to the key information that was sent
with the message. It also means that you can use log-compaction to
de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
cases where you might wish to partition a topic by one key (say, member
ID), but store (or de-deuplicate by) a different key with the message.


So in the case where I don’t care about deduplication, is the second constructor “key” parameter actually used as partition key?




"""Do I need to specify partition manager and yarn.container.count to get
multiple instances of my task working to service separate partitions?"""

This class has been replaced by the KafkaSystemFactory and
KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
partitions will be handled properly by Samza. The yarn.container.count
simply specifies how many containers (java processes) you get to run your
tasks in. If you have only one TaskInstance, but specify a container count
of 2, the second container won't have any partitions to process, and I
believe the job will fail. You need to set your container count <=  the
partition count of your input topics.

Ok, so it sounds like should be able to have multiple task instances in a single container, if the partitioning works.

Thanks!
Tyson



Cheers,
Chris

On 4/11/14 12:55 PM, "Tyson Norris" <tn...@adobe.com>> wrote:

Possibly related - I cannot seem to find the source for
KafkaPartitionManager - can someone point me to it?

Thanks
Tyson

On Apr 11, 2014, at 12:15 PM, Tyson Norris <tn...@adobe.com>> wrote:

Hi -
I have a couple questions about partitioning - I¹m trying to have
multiple tasks instances run, each processing a separate partition, and
it appears that only a single task instance runs, processing all
partitions. Or else my partitions are not created properly. This is
based on a modified version of hello-samza, so I¹m not sure exactly
which config+code steps to take to enable partitioning of message to
multiple instances of the same task.

To route to a partition i use: messageCollector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
outgoingMap));
- question here: the example in docs of collector.send(new
OutgoingMessageEnvelope(new SystemStream("kafka",
"SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
confusing, because the OutgoingMessageEnvelope constructor has a key AND
partitionKey - I assume the partitionKey should be msg.get(³user_id²) in
this case, but what should key be? Just a unique value for this message?

I tried the 3 parameter constructor as well and have similar problems,
where my single task instance is used regardless of partitionKey
specified in the OutgoingMessageEnvelope.

Do I need to specify partition manager and yarn.container.count to get
multiple instances of my task working to service separate partitions?

I¹m not sure how to tell if my messages are routed to the correct
partition in kafka, or whether the problem is a partition handling
config in samza.

Any advice is appreciated!

Thanks
Tyson


Re: partitioning question

Posted by Tyson Norris <tn...@adobe.com>.
Hi Chris -

On Apr 14, 2014, at 9:13 AM, Chris Riccomini <cr...@linkedin.com> wrote:

> Hey Tyler,
> 
> """I¹m trying to have multiple tasks instances run, each processing a
> separate partition, and it appears that only a single task instance runs,
> processing all partitions. Or else my partitions are not created
> properly."""
> 
> Are you trying to consume a single stream that has multiple partitions, or
> are you processing multiple streams that all have one partition? If it's
> the latter, all of these messages will get routed to a single task
> instance. There is an upcoming patch to allow alternative partition task
> instance mappings (SAMZA-71), which Jakob Homan is working on currently.

No I’m trying for the former, although my SystemConsumer is set up like the latter.  That is, I have a system consumer that should generate messages in a single partition, and a task that takes all messages and splits them into multiple partitions. 

So, in my SystemConsumer I have:
        SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamId, new Partition(0));
        try {
            put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, null, null, object));

which generates messages on the same stream + partition.

Then in my first task I have:
messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, partitionKey.getBytes(), outgoingMap));

which I am trying to get routed to separate task instances based on partitionKey.

> 
> If you have a single input stream with multiple partitions, you should end
> up with one task instance per partition. This partitioning model is
> explained in some detail at the 20 minute mark in this talk:
> 
>  http://www.infoq.com/presentations/samza-linkedin
> 
> """the example in docs of collector.send(new OutgoingMessageEnvelope(new
> SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"),
> msg)) seems confusing, because the OutgoingMessageEnvelope constructor has
> a key AND partitionKey - I assume the partitionKey should be
> msg.get(³user_id²) in this case, but what should key be? Just a unique
> value for this message?"""
> 
> The OutgoingMessageEnvelope has several constructors. The two you're
> referring to are:
> 
>  public OutgoingMessageEnvelope(SystemStream systemStream, Object
> partitionKey, Object key, Object message)
> 
>  public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
> Object message)
> 
> 
> This is, indeed, odd. In general, people only want the second constructor
> (systemStream, key, message). The constructor with the partitionKey has
> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
> with messages in the actual log segments on the disk. This is useful
> because it means you can get access to the key information that was sent
> with the message. It also means that you can use log-compaction to
> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
> cases where you might wish to partition a topic by one key (say, member
> ID), but store (or de-deuplicate by) a different key with the message.
> 

So in the case where I don’t care about deduplication, is the second constructor “key” parameter actually used as partition key?




> """Do I need to specify partition manager and yarn.container.count to get
> multiple instances of my task working to service separate partitions?"""
> 
> This class has been replaced by the KafkaSystemFactory and
> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
> partitions will be handled properly by Samza. The yarn.container.count
> simply specifies how many containers (java processes) you get to run your
> tasks in. If you have only one TaskInstance, but specify a container count
> of 2, the second container won't have any partitions to process, and I
> believe the job will fail. You need to set your container count <=  the
> partition count of your input topics.

Ok, so it sounds like should be able to have multiple task instances in a single container, if the partitioning works.

Thanks!
Tyson


> 
> Cheers,
> Chris
> 
> On 4/11/14 12:55 PM, "Tyson Norris" <tn...@adobe.com> wrote:
> 
>> Possibly related - I cannot seem to find the source for
>> KafkaPartitionManager - can someone point me to it?
>> 
>> Thanks
>> Tyson
>> 
>> On Apr 11, 2014, at 12:15 PM, Tyson Norris <tn...@adobe.com> wrote:
>> 
>>> Hi - 
>>> I have a couple questions about partitioning - I¹m trying to have
>>> multiple tasks instances run, each processing a separate partition, and
>>> it appears that only a single task instance runs, processing all
>>> partitions. Or else my partitions are not created properly. This is
>>> based on a modified version of hello-samza, so I¹m not sure exactly
>>> which config+code steps to take to enable partitioning of message to
>>> multiple instances of the same task.
>>> 
>>> To route to a partition i use: messageCollector.send(new
>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>> outgoingMap));
>>> - question here: the example in docs of collector.send(new
>>> OutgoingMessageEnvelope(new SystemStream("kafka",
>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>> confusing, because the OutgoingMessageEnvelope constructor has a key AND
>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²) in
>>> this case, but what should key be? Just a unique value for this message?
>>> 
>>> I tried the 3 parameter constructor as well and have similar problems,
>>> where my single task instance is used regardless of partitionKey
>>> specified in the OutgoingMessageEnvelope.
>>> 
>>> Do I need to specify partition manager and yarn.container.count to get
>>> multiple instances of my task working to service separate partitions?
>>> 
>>> I¹m not sure how to tell if my messages are routed to the correct
>>> partition in kafka, or whether the problem is a partition handling
>>> config in samza.
>>> 
>>> Any advice is appreciated!
>>> 
>>> Thanks
>>> Tyson
> 


Re: partitioning question

Posted by Chris Riccomini <cr...@linkedin.com>.
Hey Tyler,

"""I¹m trying to have multiple tasks instances run, each processing a
separate partition, and it appears that only a single task instance runs,
processing all partitions. Or else my partitions are not created
properly."""

Are you trying to consume a single stream that has multiple partitions, or
are you processing multiple streams that all have one partition? If it's
the latter, all of these messages will get routed to a single task
instance. There is an upcoming patch to allow alternative partition task
instance mappings (SAMZA-71), which Jakob Homan is working on currently.

If you have a single input stream with multiple partitions, you should end
up with one task instance per partition. This partitioning model is
explained in some detail at the 20 minute mark in this talk:

  http://www.infoq.com/presentations/samza-linkedin

"""the example in docs of collector.send(new OutgoingMessageEnvelope(new
SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"),
msg)) seems confusing, because the OutgoingMessageEnvelope constructor has
a key AND partitionKey - I assume the partitionKey should be
msg.get(³user_id²) in this case, but what should key be? Just a unique
value for this message?"""

The OutgoingMessageEnvelope has several constructors. The two you're
referring to are:

  public OutgoingMessageEnvelope(SystemStream systemStream, Object
partitionKey, Object key, Object message)

  public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
Object message)


This is, indeed, odd. In general, people only want the second constructor
(systemStream, key, message). The constructor with the partitionKey has
its origins in the Kafka API. With Kafka 0.8, keys are now stored along
with messages in the actual log segments on the disk. This is useful
because it means you can get access to the key information that was sent
with the message. It also means that you can use log-compaction to
de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
cases where you might wish to partition a topic by one key (say, member
ID), but store (or de-deuplicate by) a different key with the message.

"""Do I need to specify partition manager and yarn.container.count to get
multiple instances of my task working to service separate partitions?"""

This class has been replaced by the KafkaSystemFactory and
KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
partitions will be handled properly by Samza. The yarn.container.count
simply specifies how many containers (java processes) you get to run your
tasks in. If you have only one TaskInstance, but specify a container count
of 2, the second container won't have any partitions to process, and I
believe the job will fail. You need to set your container count <=  the
partition count of your input topics.

Cheers,
Chris

On 4/11/14 12:55 PM, "Tyson Norris" <tn...@adobe.com> wrote:

>Possibly related - I cannot seem to find the source for
>KafkaPartitionManager - can someone point me to it?
>
>Thanks
>Tyson
>
>On Apr 11, 2014, at 12:15 PM, Tyson Norris <tn...@adobe.com> wrote:
>
>> Hi - 
>> I have a couple questions about partitioning - I¹m trying to have
>>multiple tasks instances run, each processing a separate partition, and
>>it appears that only a single task instance runs, processing all
>>partitions. Or else my partitions are not created properly. This is
>>based on a modified version of hello-samza, so I¹m not sure exactly
>>which config+code steps to take to enable partitioning of message to
>>multiple instances of the same task.
>> 
>> To route to a partition i use: messageCollector.send(new
>>OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>outgoingMap));
>> - question here: the example in docs of collector.send(new
>>OutgoingMessageEnvelope(new SystemStream("kafka",
>>"SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>confusing, because the OutgoingMessageEnvelope constructor has a key AND
>>partitionKey - I assume the partitionKey should be msg.get(³user_id²) in
>>this case, but what should key be? Just a unique value for this message?
>> 
>> I tried the 3 parameter constructor as well and have similar problems,
>>where my single task instance is used regardless of partitionKey
>>specified in the OutgoingMessageEnvelope.
>> 
>> Do I need to specify partition manager and yarn.container.count to get
>>multiple instances of my task working to service separate partitions?
>> 
>> I¹m not sure how to tell if my messages are routed to the correct
>>partition in kafka, or whether the problem is a partition handling
>>config in samza.
>> 
>> Any advice is appreciated!
>> 
>> Thanks
>> Tyson


Re: partitioning question

Posted by Tyson Norris <tn...@adobe.com>.
Possibly related - I cannot seem to find the source for KafkaPartitionManager - can someone point me to it?

Thanks
Tyson

On Apr 11, 2014, at 12:15 PM, Tyson Norris <tn...@adobe.com> wrote:

> Hi - 
> I have a couple questions about partitioning - I’m trying to have multiple tasks instances run, each processing a separate partition, and it appears that only a single task instance runs, processing all partitions. Or else my partitions are not created properly. This is based on a modified version of hello-samza, so I’m not sure exactly which config+code steps to take to enable partitioning of message to multiple instances of the same task.
> 
> To route to a partition i use: messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey, outgoingMap));
> - question here: the example in docs of collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems confusing, because the OutgoingMessageEnvelope constructor has a key AND partitionKey - I assume the partitionKey should be msg.get(“user_id”) in this case, but what should key be? Just a unique value for this message?
> 
> I tried the 3 parameter constructor as well and have similar problems, where my single task instance is used regardless of partitionKey specified in the OutgoingMessageEnvelope.
> 
> Do I need to specify partition manager and yarn.container.count to get multiple instances of my task working to service separate partitions?
> 
> I’m not sure how to tell if my messages are routed to the correct partition in kafka, or whether the problem is a partition handling config in samza.
> 
> Any advice is appreciated!
> 
> Thanks
> Tyson