You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Thomas Chow (JIRA)" <ji...@apache.org> on 2015/10/22 01:25:27 UTC

[jira] [Created] (SAMZA-797) "task.broadcast.inputs" does not support stream names with periods in them

Thomas Chow created SAMZA-797:
---------------------------------

             Summary: "task.broadcast.inputs" does not support stream names with periods in them
                 Key: SAMZA-797
                 URL: https://issues.apache.org/jira/browse/SAMZA-797
             Project: Samza
          Issue Type: Bug
            Reporter: Thomas Chow


TaskConfigJava.java, which parses the task.broadcast.inputs property values, throws an exception if the following regex patterns are not matched:

BROADCAST_STREAM_PATTERN = "[^#\\.]+\\.[^#\\.]+#[\\d]+"
BROADCAST_STREAM_RANGE_PATTERN = "[^#\\.]+\\.[^#\\.]+#\\[[\\d]+\\-[\\d]+\\]+"

Example: system.stream#0, or system.stream#[0-5]. 

However, if the stream name contains periods, an IllegalArgumentException will be thrown. 

Example:

Exception in thread "main" java.lang.IllegalArgumentException: incorrect format in databus.com.linkedin.events.gals.WhitelistedIps#0. Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'
        at org.apache.samza.config.TaskConfigJava.getBroadcastSystemStreamPartitions(TaskConfigJava.java:78)
        at org.apache.samza.container.grouper.stream.GroupByPartition.<init>(GroupByPartition.java:50)
        at org.apache.samza.container.grouper.stream.GroupByPartitionFactory.getSystemStreamPartitionGrouper(GroupByPartitionFactory.java:27)
        at org.apache.samza.coordinator.JobCoordinator$.getSystemStreamPartitionGrouper(JobCoordinator.scala:138)
        at org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:154)
        at org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:107)
        at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:91)
        at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:97)
        at org.apache.samza.job.local.ProcessJobFactory.getJob(ProcessJobFactory.scala:33)
        at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
        at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
        at org.apache.samza.job.JobRunner.main(JobRunner.scala)

Where system=databus and stream=com.linkedin.events.gals.WhitelistedIps

In general, something like system.stream.name#0 will throw an Exception.


Sometimes the stream name is defined by the full namespace, which contains periods to delimit each level. The broadcast stream property thus cannot be used for stream names that follow this naming convention. However, "task.inputs" will allow this because they do not check against a regex pattern, which seems to be an inconsistency in the samza config parsing. 

A fix could involve changing the regex pattern from :
BROADCAST_STREAM_PATTERN = "[^#\\.]+\\.[^#\\.]+#[\\d]+" to  "[^#\.]+\.[^#]+#[\d]+"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)