You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Boris Shkolnik (JIRA)" <ji...@apache.org> on 2019/05/23 20:42:00 UTC

[jira] [Updated] (SAMZA-2172) Async High Level API does not schedule StreamOperatorTasks on separate threads

     [ https://issues.apache.org/jira/browse/SAMZA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Boris Shkolnik updated SAMZA-2172:
----------------------------------
    Fix Version/s: 1.2

> Async High Level API does not schedule StreamOperatorTasks on separate threads
> ------------------------------------------------------------------------------
>
>                 Key: SAMZA-2172
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2172
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Prateek Maheshwari
>            Assignee: Bharath Kumarasubramanian
>            Priority: Major
>             Fix For: 1.2
>
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> With the Async High Level API changes, we changed StreamOperatorTask from a StreamTask to an AsyncStreamTask. This means that instead of using the AsyncStreamTaskAdapter, its processAsync() is invoked directly on the run loop. This means that the job.container.thread.pool.size has no effect, and the entire DAG (outside of asyncFlatMap) is executed on the run loop, one TaskInstance at a time. 
> {code:java}
> ...
> at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> at org.apache.samza.task.StreamOperatorTask.processAsync(StreamOperatorTask.java:109)
> at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:176)
> at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> at
> at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:174)
> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:470)
> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:412)
> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:346)
> at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:233)
> at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:763)
> at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:150)
> at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:78)
> at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:76){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)