You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:34:53 UTC

[incubator-pekko-samples] 01/02: test rolling update latency

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-28011-sharding-rolling-update-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 2ac8b4ff3a43395e1ee592c9a1ad1f038b0ba5d1
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Mon Oct 21 15:08:34 2019 +0200

    test rolling update latency
---
 akka-sample-sharding-scala/build.sbt               |  4 +-
 .../src/main/resources/application.conf            | 13 ++++--
 .../src/main/resources/logback.xml                 | 21 ++++++++++
 .../src/main/scala/sample/sharding/Device.scala    | 16 ++++++--
 .../src/main/scala/sample/sharding/Devices.scala   | 47 ++++++++++++++++------
 .../main/scala/sample/sharding/ShardingApp.scala   |  5 ++-
 6 files changed, 85 insertions(+), 21 deletions(-)

diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt
index 32145af..73aff9d 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -1,7 +1,7 @@
 import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
 import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
 
-val akkaVersion = "2.6.0-M4"
+val akkaVersion = "2.6.0-RC1"
 
 lazy val `akka-sample-sharding-scala` = project
   .in(file("."))
@@ -21,6 +21,8 @@ lazy val `akka-sample-sharding-scala` = project
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
       "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
+      "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
+      "ch.qos.logback" % "logback-classic" % "1.2.3",
       "org.scalatest" %% "scalatest" % "3.0.7" % Test
     ),
     mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"),
diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf
index fc64a62..853f3c9 100644
--- a/akka-sample-sharding-scala/src/main/resources/application.conf
+++ b/akka-sample-sharding-scala/src/main/resources/application.conf
@@ -1,5 +1,7 @@
 akka {
-  loglevel = INFO
+  loglevel = DEBUG
+  loggers = ["akka.event.slf4j.Slf4jLogger"]
+  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
 
   actor {
     provider = cluster
@@ -21,9 +23,12 @@ akka {
       "akka://ShardingSystem@127.0.0.1:2551",
       "akka://ShardingSystem@127.0.0.1:2552"]
 
-    # auto downing is NOT safe for production deployments.
-    # you may want to use it during development, read more about it in the docs.
-    auto-down-unreachable-after = 10s
+
+    min-nr-of-members = 3
+
+    log-info-verbose = on
   }
 
 }
+
+sample.sending-temperatures = on
diff --git a/akka-sample-sharding-scala/src/main/resources/logback.xml b/akka-sample-sharding-scala/src/main/resources/logback.xml
new file mode 100644
index 0000000..bc42d63
--- /dev/null
+++ b/akka-sample-sharding-scala/src/main/resources/logback.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
+        <queueSize>1024</queueSize>
+        <neverBlock>true</neverBlock>
+        <appender-ref ref="STDOUT" />
+    </appender>
+
+    <logger name="akka.cluster.sharding" level="DEBUG" />
+
+    <root level="INFO">
+        <appender-ref ref="ASYNC"/>
+    </root>
+</configuration>
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
index 19b0488..1e166a2 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
@@ -10,9 +10,12 @@ import akka.actor._
 object Device {
   sealed trait Command extends Message
 
-  case class RecordTemperature(deviceId: Int, temperature: Double)
+  case class RecordTemperature(deviceId: Int, temperature: Double, startTime: Long, seqNr: Long)
       extends Command
 
+  case class RecordTemperatureAck(deviceId: Int, startTime: Long, seqNr: Long)
+    extends Command
+
   case class GetTemperature(deviceId: Int) extends Command
 
   case class Temperature(deviceId: Int,
@@ -27,15 +30,18 @@ object Device {
 class Device extends Actor with ActorLogging {
   import Device._
 
+  log.info("Starting Device {}", self.path.name)
+
   override def receive = counting(Vector.empty)
 
   def counting(values: Vector[Double]): Receive = {
-    case RecordTemperature(id, temp) =>
+    case RecordTemperature(id, temp, startTime, seqNr) =>
       val temperatures = values :+ temp
       log.info(
         s"Recording temperature $temp for device $id, average is ${average(temperatures)} after " +
-          s"${temperatures.size} readings"
+          s"${temperatures.size} readings. SeqNr [$seqNr]"
       )
+      sender() ! RecordTemperatureAck(id, startTime, seqNr)
       context.become(counting(temperatures))
 
     case GetTemperature(id) =>
@@ -51,4 +57,8 @@ class Device extends Actor with ActorLogging {
   private def average(values: Vector[Double]): Double =
     if (values.isEmpty) Double.NaN
     else values.sum / values.size
+
+  override def postStop(): Unit = {
+    log.info("Stopped Device {}", self.path.name)
+  }
 }
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
index 6cbe318..d5a8b22 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
@@ -12,6 +12,8 @@ object Devices {
 
   case object ReadTemperatures
 
+  case object Start
+
   def props(): Props =
     Props(new Devices)
 }
@@ -20,14 +22,14 @@ class Devices extends Actor with ActorLogging with Timers {
   import Devices._
 
   private val extractEntityId: ShardRegion.ExtractEntityId = {
-    case msg @ Device.RecordTemperature(id, _) => (id.toString, msg)
+    case msg @ Device.RecordTemperature(id, _,_, _) => (id.toString, msg)
     case msg @ Device.GetTemperature(id)       => (id.toString, msg)
   }
 
-  private val numberOfShards = 100
+  private val numberOfShards = 10
 
   private val extractShardId: ShardRegion.ExtractShardId = {
-    case Device.RecordTemperature(id, _) =>
+    case Device.RecordTemperature(id, _, _, _) =>
       (math.abs(id.hashCode) % numberOfShards).toString
     case Device.GetTemperature(id) =>
       (math.abs(id.hashCode) % numberOfShards).toString
@@ -45,23 +47,44 @@ class Devices extends Actor with ActorLogging with Timers {
   )
 
   val random = new Random()
-  val numberOfDevices = 50
+  val numberOfDevices = 10
 
-  timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 1.second)
-  timers.startTimerWithFixedDelay(
-    ReadTemperatures,
-    ReadTemperatures,
-    15.seconds
-  )
+  if (context.system.settings.config.getBoolean("sample.sending-temperatures")) {
+    timers.startSingleTimer(Start, Start, 20.seconds)
+  }
+
+  private var seqNr = 0L
+  private var sequenceNumbers = Map.empty[Long, Int]
 
   def receive = {
+    case Start =>
+      timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis)
+      timers.startTimerWithFixedDelay(
+        ReadTemperatures,
+        ReadTemperatures,
+        15.seconds
+      )
+
     case UpdateDevice =>
-      val deviceId = random.nextInt(numberOfDevices)
+      seqNr += 1
+      //val deviceId = random.nextInt(numberOfDevices)
+      val deviceId = (seqNr % numberOfDevices).toInt
+      sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId)
       val temperature = 5 + 30 * random.nextDouble()
-      val msg = Device.RecordTemperature(deviceId, temperature)
+      val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr)
       log.info(s"Sending $msg")
       deviceRegion ! msg
 
+    case Device.RecordTemperatureAck(deviceId, startTime, seqNr) =>
+      val durationMs = (System.nanoTime() - startTime) / 1000 / 1000
+      if (durationMs > 500)
+        log.info("Delayed ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
+      else
+        log.info("Ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
+
+      sequenceNumbers -= seqNr
+      log.info("Pending sequence numbers: {}", sequenceNumbers)
+
     case ReadTemperatures =>
       (0 to numberOfDevices).foreach { deviceId =>
         deviceRegion ! Device.GetTemperature(deviceId)
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
index 897ccf5..fa91ce7 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
@@ -18,7 +18,10 @@ object ShardingApp {
     ports foreach { port =>
       // Override the configuration of the port
       val config = ConfigFactory
-        .parseString("akka.remote.artery.canonical.port=" + port)
+        .parseString(s"""
+        akka.remote.artery.canonical.port=$port
+        sample.sending-temperatures = ${port == "2555"}
+        """)
         .withFallback(ConfigFactory.load())
 
       // Create an Akka system


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org