You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2013/11/09 05:09:17 UTC

[jira] [Commented] (SAMZA-82) Not use maximum number of partitions when initializing streams

    [ https://issues.apache.org/jira/browse/SAMZA-82?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13817998#comment-13817998 ] 

Chris Riccomini commented on SAMZA-82:
--------------------------------------

Commentary from SAMZA-83:

I'm a bit confused, I think. I took a look at BrokerProxy. You're saying that we're getting exceptions because we're trying to find a partition for topics that might not have the partition. For example, if we have one topic with 16 partitions and the other with 8, we'll try and get partition 9-15 on the second, even though they don't exist?

Based on the code, I see:

{noformat}
        topicPartitionsAndOffsets.map {
{noformat}

This map is iterated over, to get every topic/partition pair from the partition metadata. In turn, this variable is populated by:

{noformat}
    val topicPartitionsAndOffsets = lastReadOffsets.map {
      case (systemStreamPartition, offset) =>
        val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
        (topicAndPartition, offset)
    }
{noformat}

The lastReadOffsets, in turn, is populated by the register method, which simply registers every topic/partition pair that it's been given from TaskInstance.

I think the real problem is here, in TaskInstance:

{noformat}
    inputStreams.foreach(stream =>
      consumerMultiplexer.register(
        new SystemStreamPartition(stream, partition),
        offsets.get(stream).getOrElse(null)))
{noformat}

Every task instance registers ALL input streams with its partition. This means, if we have 16 TaskInstances (as we would in the example above), we register 16 partitions for EVERY input stream.

Recommended fix: I think what we need to do is make TaskInstance only register valid partitions. This should fix the problem. Once this is done, we should go back to failing if we have a missing topic/partition combo (revert SAMZA-83).

> Not use maximum number of partitions when initializing streams
> --------------------------------------------------------------
>
>                 Key: SAMZA-82
>                 URL: https://issues.apache.org/jira/browse/SAMZA-82
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.7.0
>            Reporter: Jakob Homan
>            Assignee: Jakob Homan
>             Fix For: 0.7.0
>
>
> Util.scala:
> {code}  /**
>    * Uses config to create SystemAdmin classes for all input stream systems to
>    * get each input stream's partition count, then returns the maximum count.
>    * An input stream with two partitions, and a second input stream with four
>    * partitions would result in this method returning 4.
>    */
>   def getMaxInputStreamPartitions(config: Config) = {
> {code}
> This approach works if all the streams have the same number of partitions, but is inefficient for other cases and, where the underlying system gets cranky about being asked about non-existing partitions, fails.  We should eagerly figure out the correct number of partitions for each topic and pass that information from there.



--
This message was sent by Atlassian JIRA
(v6.1#6144)