You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "jtjeferreira (via GitHub)" <gi...@apache.org> on 2023/08/18 14:54:12 UTC

[GitHub] [incubator-pekko-connectors] jtjeferreira opened a new pull request, #226: kinesis: use stage materializer with IODispatcher instead of injected EC

jtjeferreira opened a new pull request, #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226

   The `KinesisSchedulerSourceStage` needs to run the `software.amazon.kinesis.coordinator.Scheduler` in a thread. The scheduler runs a loop and uses `Thread.sleep`, which needs careful use because it is a blocking operation.
   
   Currently, it uses `Future(scheduler.run())`, which will use the implicit `ExecutionContext` from the constructor. By default, this EC will be the akka default dispatcher which is not suitable for blocking. Also, note the use of `ActorAttributes.IODispatcher` in `initialAttributes` which is not having any effect as we are not using the stage materializer.
   
   This PR removes the EC from the constructor, and uses the stage logic materializer EC (which will be the IODispatcher) which is aligned with other custom stages in this project. 
   
   Questions:
   * Is it okay to use the `IODispatcher`? AFAICT that dispatcher has 16 threads, so if we launch 16 Schedulers, there will be no more threads available
   * How do I test this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1870392498

   > I'm broadly supportive of getting this merged when we branch for pekko-connectors 1.1.0. 
   ok
   
   > On the note of actually providing a solution, having a re-think on this I think that even though the IODispatcher has the deadlocking issue it appears its the best we can do now
   
   ok. I will revert to that solution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on a diff in pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1300169039


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -81,8 +81,14 @@ private[kinesis] class KinesisSchedulerSourceStage(
         override def shardRecordProcessor(): ShardRecordProcessor =
           new ShardProcessor(newRecordCallback)
       })
+      //Run the scheduler loop in a separate thread
+      val thread = new Thread(() => {
+        val result = Try {scheduler.run()}
+        callback.invoke(SchedulerShutdown(result))
+      }, s"KinesisSchedulerSource")
+      thread.setDaemon(true)
+      thread.start()
       schedulerOpt = Some(scheduler)
-      Future(scheduler.run()).onComplete(result => callback.invoke(SchedulerShutdown(result)))

Review Comment:
   > That said, I don't think running this "loop" in the default dispatcher even with "blocking" is a good idea, as it would block a thread for a long time...
   
   What "loop" are you talking about? And by blocking a thread is the core point you are making is that it should be asynchronous rather than waiting for the `scheduler.run` operation to complete?
   
   I am under the impression that the entire point of `Future` is it makes the enclosing operation asynchronous and what `blocking` does is it makes that asynchronous operation run on a pool that is designed to handle blocking operations (i.e. usually a CachedThreadPool of some kind however the way that `blocking` works is its up to the `ExecutionContext` that is pulled in to determine how to best handle blocking operations).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "Roiocam (via GitHub)" <gi...@apache.org>.
Roiocam commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1485587146


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -111,6 +112,22 @@ private[kinesis] class KinesisSchedulerSourceStage(
         failStage(SchedulerUnexpectedShutdown(e))
     }
     override def postStop(): Unit =
-      schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown()))
+      schedulerOpt.foreach(scheduler =>
+        if (!scheduler.shutdownComplete()) scheduler.shutdown())
+
+    protected def executionContext(attributes: Attributes): ExecutionContext = {
+      val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match {

Review Comment:
   This is kind of cool, but why do you need two pattern matches? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1687194081

   Thanks I will deep dive into this because it doesn't seem that either solution is ideal


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1931991986

   @pjfanning @mdedetrich I included the latest changes from the main branch and did a fix in `postStop`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1484927141


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -111,6 +112,23 @@ private[kinesis] class KinesisSchedulerSourceStage(
         failStage(SchedulerUnexpectedShutdown(e))
     }
     override def postStop(): Unit =
-      schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown()))
+      schedulerOpt.foreach(scheduler =>
+        if (!scheduler.shutdownComplete()) scheduler.shutdown()
+      )
+
+    protected def executionContext(attributes: Attributes): ExecutionContext = {
+      val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match {
+        case ActorAttributes.Dispatcher("") =>

Review Comment:
   Hmm, I guess its okay then but we should probably make a ticket to have a look at this separately since its code smell.



##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -111,6 +112,23 @@ private[kinesis] class KinesisSchedulerSourceStage(
         failStage(SchedulerUnexpectedShutdown(e))
     }
     override def postStop(): Unit =
-      schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown()))
+      schedulerOpt.foreach(scheduler =>
+        if (!scheduler.shutdownComplete()) scheduler.shutdown()
+      )
+
+    protected def executionContext(attributes: Attributes): ExecutionContext = {
+      val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match {
+        case ActorAttributes.Dispatcher("") =>

Review Comment:
   Hmm, I guess its okay then but we should probably make a ticket to have a look at this separately since it looks like code smell.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1684120777

   > Is it okay to use the IODispatcher? AFAICT that dispatcher has 16 threads, so if we launch 16 Schedulers, there will be no more threads available
   
   I need to look into this in more detail, but if we **know** that there are blocking calls being made then the `DispatcherSelector.blocking()` is what we should be using


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1686584088

   > Then if a user wants to consume from more than 16 kinesis streams (i.e creates more than 16 KinesisSchedulerSource), then the thread pool is fully utilized, creating a deadlock. That's why I opted for creating a thread just for running the kinesis Scheduler, as in the example [documentation](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html) from AWS.
   
   Maybe I misunderstand, but I don't think this would create a deadlock. Deadlocks are in fact caused in the converse situation, i.e. when you don't deal with blocking code correctly (i.e. either by not using `Future#blocking` or some `IODispatcher`. Actually our resident @jrudolph stated as such here https://discuss.lightbend.com/t/blocking-io-dispatcher-without-a-batchingexecutor/5954/6
   
   The only case I can think of when having a legitimate deadlock is if you have multiple blocking IO operators that share the same `IODispatcher` and are combined into a single stream. In this case if you have more than 16 sources you can create a deadlock since the different blocking IO sources will block the `IODispatcher` at exactly the same time. If this is the case then you just need to use multiple `IODispatchers`. https://stackoverflow.com/a/71268911 is a good explanation of the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1484871296


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -111,6 +112,23 @@ private[kinesis] class KinesisSchedulerSourceStage(
         failStage(SchedulerUnexpectedShutdown(e))
     }
     override def postStop(): Unit =
-      schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown()))
+      schedulerOpt.foreach(scheduler =>
+        if (!scheduler.shutdownComplete()) scheduler.shutdown()
+      )
+
+    protected def executionContext(attributes: Attributes): ExecutionContext = {
+      val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match {
+        case ActorAttributes.Dispatcher("") =>

Review Comment:
   this was copied from JMSConnector as per your [comment](https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1686461378) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] jtjeferreira commented on pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1687180966

   > Note that it should be pretty easy to confirm whether a deadlock happens or not when using the `IODispatcher`, you can just materialize 16+ streams at once and see what happens.
   
   I reverted to the `IODispatcher` in 1658b06, added a unit test with mokcs in 6442c27 that works for 15 streams, but fails for 16. Then in 1aeeac1 I reverted to the solution with a separate thread and it works... 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1931799385

   I think from me its good, just needs a rebase to pull in the latest branch so the CI runs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich merged PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1870195310

   On the note of actually providing a solution, having a re-think on this I think that even though the `IODispatcher` has the deadlocking issue it appears its the best we can do now, after all there is only so much you can do to hide the inherit issues that come with underlying blocking operations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1936681246

   @raboof You want to have a quick look at this incase anything was missed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] jtjeferreira commented on a diff in pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1298730541


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -49,7 +49,7 @@ private[kinesis] object KinesisSchedulerSourceStage {
 @InternalApi
 private[kinesis] class KinesisSchedulerSourceStage(
     settings: KinesisSchedulerSourceSettings,
-    schedulerBuilder: ShardRecordProcessorFactory => Scheduler)(implicit ec: ExecutionContext)

Review Comment:
   So I added `println(Thread.currentThread().getName)` to the `Future(scheduler.run())` like this:
   
   ```scala
         Future{
           println(Thread.currentThread().getName)
           scheduler.run()
         }
   ```
   
   and ran the tests and I see `KinesisTests-pekko.actor.default-dispatcher-6` being used
   
   so this PR makes no difference...
   
   If I add the `println` to the `onPull` method I see `KinesisTests-pekko.actor.default-blocking-io-dispatcher-10` being used...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] jtjeferreira commented on pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1686541161

   > a proper `executionContext` which is explicitly designed for these IO blocking operations
   
   I agree that using `IODispatcher` would be an improvement from what we have now. However, do I understand correctly that `IODispatcher` is a [ThreadPool with fixed 16 threads](https://github.com/apache/incubator-pekko/blob/580f12c29fb61b65758af9b0c31494af0af17175/actor/src/main/resources/reference.conf#L573-L581)? 
   
   Then if a user wants to consume from more than 16 kinesis streams (i.e creates more than 16 KinesisSchedulerSource), then the thread pool is fully utilized, creating a deadlock. That's why I opted for creating a thread just for running the kinesis Scheduler, as in the example [documentation](https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html) from AWS.
   
   That said, if you are not comfortable with this solution I can implement what you are suggesting with `IODispatcher`
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] pjfanning commented on pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "pjfanning (via GitHub)" <gi...@apache.org>.
pjfanning commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1690315496

   Since these changes are contentious, I'm marking this as milestone 1.0.1. I'm anxious to get the 1.0.0 release process started.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1484856491


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -111,6 +112,23 @@ private[kinesis] class KinesisSchedulerSourceStage(
         failStage(SchedulerUnexpectedShutdown(e))
     }
     override def postStop(): Unit =
-      schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown()))
+      schedulerOpt.foreach(scheduler =>
+        if (!scheduler.shutdownComplete()) scheduler.shutdown()
+      )
+
+    protected def executionContext(attributes: Attributes): ExecutionContext = {
+      val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match {
+        case ActorAttributes.Dispatcher("") =>

Review Comment:
   Should this just be any strict rather than just `""`?



##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -111,6 +112,23 @@ private[kinesis] class KinesisSchedulerSourceStage(
         failStage(SchedulerUnexpectedShutdown(e))
     }
     override def postStop(): Unit =
-      schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown()))
+      schedulerOpt.foreach(scheduler =>
+        if (!scheduler.shutdownComplete()) scheduler.shutdown()
+      )
+
+    protected def executionContext(attributes: Attributes): ExecutionContext = {
+      val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match {
+        case ActorAttributes.Dispatcher("") =>

Review Comment:
   Should this just be any string rather than just `""`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "pjfanning (via GitHub)" <gi...@apache.org>.
pjfanning commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1870186091

   I'm broadly supportive of getting this merged when we branch for pekko-connectors 1.1.0. There should be such a release early in 2024 - the main reason being that slick 3.5.0 will be released and we can add Scala 3.3 support to the pekko-connectors-slick component.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1870180356

   Hi @mdedetrich , @pjfanning 
   
   what shall we do about this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] pjfanning commented on a diff in pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "pjfanning (via GitHub)" <gi...@apache.org>.
pjfanning commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1300070226


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -81,8 +81,14 @@ private[kinesis] class KinesisSchedulerSourceStage(
         override def shardRecordProcessor(): ShardRecordProcessor =
           new ShardProcessor(newRecordCallback)
       })
+      //Run the scheduler loop in a separate thread
+      val thread = new Thread(() => {
+        val result = Try {scheduler.run()}
+        callback.invoke(SchedulerShutdown(result))
+      }, s"KinesisSchedulerSource")
+      thread.setDaemon(true)
+      thread.start()
       schedulerOpt = Some(scheduler)
-      Future(scheduler.run()).onComplete(result => callback.invoke(SchedulerShutdown(result)))

Review Comment:
   @jtjeferreira I'm not dead set against using a short lived daemon thread like you have in the current version of the PR but did you check the Future blocking approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1686461378

   @jtjeferreira So I did a bit of digging for some existing art, i.e. some other connector which has a similar problem (i.e. wrapping some other library that is internally blocking) and I came across the jms-connector, specifically
   
   https://github.com/apache/incubator-pekko-connectors/blob/a5796711f4b19f358f976b189581b980da34b2b0/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala#L239-L252
   
   Using the above for inspiration, at https://github.com/apache/incubator-pekko-connectors/blob/c207c292ee9fecec2444637c84d63b9127280422/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisSchedulerSource.scala#L49 you can access the actor attributes to create a proper `executionContext` which is explicitly designed for these IO blocking operations and just pass it in the `KinesisSchedulerSourceStage`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1690335842

   > Since these changes are contentious, I'm marking this as milestone 1.0.1. I'm anxious to get the 1.0.0 release process started.
   
   I would argue that regardless of what happens with the PR it should be 1.1.x since it modifies existing underlying behaviour which can create potential surprises for people when migrating to connectors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] pjfanning commented on a diff in pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "pjfanning (via GitHub)" <gi...@apache.org>.
pjfanning commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1298573380


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -49,7 +49,7 @@ private[kinesis] object KinesisSchedulerSourceStage {
 @InternalApi
 private[kinesis] class KinesisSchedulerSourceStage(
     settings: KinesisSchedulerSourceSettings,
-    schedulerBuilder: ShardRecordProcessorFactory => Scheduler)(implicit ec: ExecutionContext)

Review Comment:
   Doesn't the existing code pass the materialzer executionContext as the implicit value anyway?



##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -49,7 +49,7 @@ private[kinesis] object KinesisSchedulerSourceStage {
 @InternalApi
 private[kinesis] class KinesisSchedulerSourceStage(
     settings: KinesisSchedulerSourceSettings,
-    schedulerBuilder: ShardRecordProcessorFactory => Scheduler)(implicit ec: ExecutionContext)

Review Comment:
   Doesn't the existing code pass the materializer executionContext as the implicit value anyway?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1960859849

   Going to go ahead and merge this, if there are any issues we can always patch them up later (there will be many milestones for connectors).
   
   Many thanks @jtjeferreira


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] jtjeferreira commented on a diff in pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1298611691


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -49,7 +49,7 @@ private[kinesis] object KinesisSchedulerSourceStage {
 @InternalApi
 private[kinesis] class KinesisSchedulerSourceStage(
     settings: KinesisSchedulerSourceSettings,
-    schedulerBuilder: ShardRecordProcessorFactory => Scheduler)(implicit ec: ExecutionContext)

Review Comment:
   yes, but I think the `GraphStageLogic` materializer is different as it takes into consideration the `initialAttributes: Attributes`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] mdedetrich commented on a diff in pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1298843915


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -81,8 +81,14 @@ private[kinesis] class KinesisSchedulerSourceStage(
         override def shardRecordProcessor(): ShardRecordProcessor =
           new ShardProcessor(newRecordCallback)
       })
+      //Run the scheduler loop in a separate thread
+      val thread = new Thread(() => {
+        val result = Try {scheduler.run()}
+        callback.invoke(SchedulerShutdown(result))
+      }, s"KinesisSchedulerSource")
+      thread.setDaemon(true)
+      thread.start()
       schedulerOpt = Some(scheduler)
-      Future(scheduler.run()).onComplete(result => callback.invoke(SchedulerShutdown(result)))

Review Comment:
   There is a `blocking` method on Scala's `Future` which might be a quick fix/solution?
   
   Should be something like
   
   ```scala
   Future {
     blocking {
       scheduler.run()
     }
   }.onComplete(result => callback.invoke(SchedulerShutdown(result)))
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] jtjeferreira commented on a diff in pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1300152995


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -81,8 +81,14 @@ private[kinesis] class KinesisSchedulerSourceStage(
         override def shardRecordProcessor(): ShardRecordProcessor =
           new ShardProcessor(newRecordCallback)
       })
+      //Run the scheduler loop in a separate thread
+      val thread = new Thread(() => {
+        val result = Try {scheduler.run()}
+        callback.invoke(SchedulerShutdown(result))
+      }, s"KinesisSchedulerSource")
+      thread.setDaemon(true)
+      thread.start()
       schedulerOpt = Some(scheduler)
-      Future(scheduler.run()).onComplete(result => callback.invoke(SchedulerShutdown(result)))

Review Comment:
   > using a short lived daemon thread
   
   AFAICT this is will not be a short lived thread, but a long lived thread that will keep running as long as we want to read records from Kinesis.
   
   That said, I don't think running this "loop" in the default dispatcher even with "blocking" is a good idea, as it would block a thread for a long time... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko-connectors] jtjeferreira commented on a diff in pull request #226: kinesis: use stage materializer with IODispatcher instead of injected EC

Posted by "jtjeferreira (via GitHub)" <gi...@apache.org>.
jtjeferreira commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1300186697


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -81,8 +81,14 @@ private[kinesis] class KinesisSchedulerSourceStage(
         override def shardRecordProcessor(): ShardRecordProcessor =
           new ShardProcessor(newRecordCallback)
       })
+      //Run the scheduler loop in a separate thread
+      val thread = new Thread(() => {
+        val result = Try {scheduler.run()}
+        callback.invoke(SchedulerShutdown(result))
+      }, s"KinesisSchedulerSource")
+      thread.setDaemon(true)
+      thread.start()
       schedulerOpt = Some(scheduler)
-      Future(scheduler.run()).onComplete(result => callback.invoke(SchedulerShutdown(result)))

Review Comment:
   > What "loop" are you talking about? 
   
   I mean this loop 
   https://github.com/awslabs/amazon-kinesis-client/blob/a1731dc49b2d76b8dd33781d5cabc5264b7079f9/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java#L323-L325



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "pjfanning (via GitHub)" <gi...@apache.org>.
pjfanning commented on PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#issuecomment-1931778854

   @jtjeferreira is this something that can be merged to our main branch for a future 1.1.0 release?
   
   @mdedetrich any objections?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] kinesis: use stage materializer with IODispatcher instead of injected EC [incubator-pekko-connectors]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #226:
URL: https://github.com/apache/incubator-pekko-connectors/pull/226#discussion_r1484856491


##########
kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala:
##########
@@ -111,6 +112,23 @@ private[kinesis] class KinesisSchedulerSourceStage(
         failStage(SchedulerUnexpectedShutdown(e))
     }
     override def postStop(): Unit =
-      schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown()))
+      schedulerOpt.foreach(scheduler =>
+        if (!scheduler.shutdownComplete()) scheduler.shutdown()
+      )
+
+    protected def executionContext(attributes: Attributes): ExecutionContext = {
+      val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match {
+        case ActorAttributes.Dispatcher("") =>

Review Comment:
   Should this just be matching against any string rather than just `""`, i.e. is there a case we can having something aside from `""`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org