You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Bharath Kumarasubramanian (JIRA)" <ji...@apache.org> on 2019/05/22 22:59:00 UTC

[jira] [Closed] (SAMZA-2172) Async High Level API does not schedule StreamOperatorTasks on separate threads

     [ https://issues.apache.org/jira/browse/SAMZA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bharath Kumarasubramanian closed SAMZA-2172.
--------------------------------------------

> Async High Level API does not schedule StreamOperatorTasks on separate threads
> ------------------------------------------------------------------------------
>
>                 Key: SAMZA-2172
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2172
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Prateek Maheshwari
>            Assignee: Bharath Kumarasubramanian
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> With the Async High Level API changes, we changed StreamOperatorTask from a StreamTask to an AsyncStreamTask. This means that instead of using the AsyncStreamTaskAdapter, its processAsync() is invoked directly on the run loop. This means that the job.container.thread.pool.size has no effect, and the entire DAG (outside of asyncFlatMap) is executed on the run loop, one TaskInstance at a time. 
> {code:java}
> ...
> at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184)
> at org.apache.samza.task.StreamOperatorTask.processAsync(StreamOperatorTask.java:109)
> at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:176)
> at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> at
> at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:174)
> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:470)
> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:412)
> at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:346)
> at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:233)
> at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:763)
> at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:150)
> at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:78)
> at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:76){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)