You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "pjfanning (via GitHub)" <gi...@apache.org> on 2023/08/04 17:56:56 UTC

[GitHub] [incubator-pekko] pjfanning commented on a diff in pull request #4: !str Logging error instead of failing the `keepAlive` operator.

pjfanning commented on code in PR #4:
URL: https://github.com/apache/incubator-pekko/pull/4#discussion_r1284704564


##########
stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala:
##########
@@ -268,33 +268,29 @@ import pekko.stream.stage._
             push(out, grab(in))
             if (isClosed(in)) completeStage()
             else pull(in)
-          } else {
-            val now = System.nanoTime()
-            // Idle timeout triggered a while ago and we were just waiting for pull.
-            // In the case of now == deadline, the deadline has not passed strictly, but scheduling another thunk
-            // for that seems wasteful.
-            if (now - nextDeadline >= 0) {
-              nextDeadline = now + timeout.toNanos
-              push(out, inject())
-            } else
-              scheduleOnce(GraphStageLogicTimer, FiniteDuration(nextDeadline - now, TimeUnit.NANOSECONDS))
-          }
+          } else emitInjectedElementOrReschedule(onTimer = false)
         }
 
-        override protected def onTimer(timerKey: Any): Unit = {
+        private def emitInjectedElementOrReschedule(onTimer: Boolean): Unit = {
           val now = System.nanoTime()
-          // Timer is reliably cancelled if a regular element arrives first. Scheduler rather schedules too late
-          // than too early so the deadline must have passed at this time.
-          assert(
-            now - nextDeadline >= 0,
-            s"Timer should have triggered only after deadline but now is $now and deadline was $nextDeadline diff ${now - nextDeadline}.")
-          push(out, inject())
-          nextDeadline = now + timeout.toNanos
+          val diff = now - nextDeadline
+          if (diff < 0) {
+            if (onTimer) {
+              // Clock may be non-monotonic, see https://stackoverflow.com/questions/51344787/in-what-cases-clock-monotonic-might-not-be-available
+              log.info(
+                s"Timer should have triggered only after deadline but now is $now and deadline was $nextDeadline diff $diff. (time running backwards?) Reschedule instead of emitting.")
+            }
+            scheduleOnce(GraphStageLogicTimer, FiniteDuration(-diff, TimeUnit.NANOSECONDS))
+          } else {
+            push(out, inject())
+            nextDeadline = now + timeout.toNanos
+          }
         }
-      }
 
-    override def toString = "IdleTimer"
+        override protected def onTimer(timerKey: Any): Unit = emitInjectedElementOrReschedule(onTimer = true)
+      }
 
+    override def toString = "IdleInject"

Review Comment:
   why change the `toString` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org