You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:34:53 UTC
[incubator-pekko-samples] 01/02: test rolling update latency
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-28011-sharding-rolling-update-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 2ac8b4ff3a43395e1ee592c9a1ad1f038b0ba5d1
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Mon Oct 21 15:08:34 2019 +0200
test rolling update latency
---
akka-sample-sharding-scala/build.sbt | 4 +-
.../src/main/resources/application.conf | 13 ++++--
.../src/main/resources/logback.xml | 21 ++++++++++
.../src/main/scala/sample/sharding/Device.scala | 16 ++++++--
.../src/main/scala/sample/sharding/Devices.scala | 47 ++++++++++++++++------
.../main/scala/sample/sharding/ShardingApp.scala | 5 ++-
6 files changed, 85 insertions(+), 21 deletions(-)
diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt
index 32145af..73aff9d 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -1,7 +1,7 @@
import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
-val akkaVersion = "2.6.0-M4"
+val akkaVersion = "2.6.0-RC1"
lazy val `akka-sample-sharding-scala` = project
.in(file("."))
@@ -21,6 +21,8 @@ lazy val `akka-sample-sharding-scala` = project
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
+ "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
+ "ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scalatest" %% "scalatest" % "3.0.7" % Test
),
mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"),
diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf
index fc64a62..853f3c9 100644
--- a/akka-sample-sharding-scala/src/main/resources/application.conf
+++ b/akka-sample-sharding-scala/src/main/resources/application.conf
@@ -1,5 +1,7 @@
akka {
- loglevel = INFO
+ loglevel = DEBUG
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
+ logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = cluster
@@ -21,9 +23,12 @@ akka {
"akka://ShardingSystem@127.0.0.1:2551",
"akka://ShardingSystem@127.0.0.1:2552"]
- # auto downing is NOT safe for production deployments.
- # you may want to use it during development, read more about it in the docs.
- auto-down-unreachable-after = 10s
+
+ min-nr-of-members = 3
+
+ log-info-verbose = on
}
}
+
+sample.sending-temperatures = on
diff --git a/akka-sample-sharding-scala/src/main/resources/logback.xml b/akka-sample-sharding-scala/src/main/resources/logback.xml
new file mode 100644
index 0000000..bc42d63
--- /dev/null
+++ b/akka-sample-sharding-scala/src/main/resources/logback.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
+ <queueSize>1024</queueSize>
+ <neverBlock>true</neverBlock>
+ <appender-ref ref="STDOUT" />
+ </appender>
+
+ <logger name="akka.cluster.sharding" level="DEBUG" />
+
+ <root level="INFO">
+ <appender-ref ref="ASYNC"/>
+ </root>
+</configuration>
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
index 19b0488..1e166a2 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
@@ -10,9 +10,12 @@ import akka.actor._
object Device {
sealed trait Command extends Message
- case class RecordTemperature(deviceId: Int, temperature: Double)
+ case class RecordTemperature(deviceId: Int, temperature: Double, startTime: Long, seqNr: Long)
extends Command
+ case class RecordTemperatureAck(deviceId: Int, startTime: Long, seqNr: Long)
+ extends Command
+
case class GetTemperature(deviceId: Int) extends Command
case class Temperature(deviceId: Int,
@@ -27,15 +30,18 @@ object Device {
class Device extends Actor with ActorLogging {
import Device._
+ log.info("Starting Device {}", self.path.name)
+
override def receive = counting(Vector.empty)
def counting(values: Vector[Double]): Receive = {
- case RecordTemperature(id, temp) =>
+ case RecordTemperature(id, temp, startTime, seqNr) =>
val temperatures = values :+ temp
log.info(
s"Recording temperature $temp for device $id, average is ${average(temperatures)} after " +
- s"${temperatures.size} readings"
+ s"${temperatures.size} readings. SeqNr [$seqNr]"
)
+ sender() ! RecordTemperatureAck(id, startTime, seqNr)
context.become(counting(temperatures))
case GetTemperature(id) =>
@@ -51,4 +57,8 @@ class Device extends Actor with ActorLogging {
private def average(values: Vector[Double]): Double =
if (values.isEmpty) Double.NaN
else values.sum / values.size
+
+ override def postStop(): Unit = {
+ log.info("Stopped Device {}", self.path.name)
+ }
}
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
index 6cbe318..d5a8b22 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
@@ -12,6 +12,8 @@ object Devices {
case object ReadTemperatures
+ case object Start
+
def props(): Props =
Props(new Devices)
}
@@ -20,14 +22,14 @@ class Devices extends Actor with ActorLogging with Timers {
import Devices._
private val extractEntityId: ShardRegion.ExtractEntityId = {
- case msg @ Device.RecordTemperature(id, _) => (id.toString, msg)
+ case msg @ Device.RecordTemperature(id, _,_, _) => (id.toString, msg)
case msg @ Device.GetTemperature(id) => (id.toString, msg)
}
- private val numberOfShards = 100
+ private val numberOfShards = 10
private val extractShardId: ShardRegion.ExtractShardId = {
- case Device.RecordTemperature(id, _) =>
+ case Device.RecordTemperature(id, _, _, _) =>
(math.abs(id.hashCode) % numberOfShards).toString
case Device.GetTemperature(id) =>
(math.abs(id.hashCode) % numberOfShards).toString
@@ -45,23 +47,44 @@ class Devices extends Actor with ActorLogging with Timers {
)
val random = new Random()
- val numberOfDevices = 50
+ val numberOfDevices = 10
- timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 1.second)
- timers.startTimerWithFixedDelay(
- ReadTemperatures,
- ReadTemperatures,
- 15.seconds
- )
+ if (context.system.settings.config.getBoolean("sample.sending-temperatures")) {
+ timers.startSingleTimer(Start, Start, 20.seconds)
+ }
+
+ private var seqNr = 0L
+ private var sequenceNumbers = Map.empty[Long, Int]
def receive = {
+ case Start =>
+ timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis)
+ timers.startTimerWithFixedDelay(
+ ReadTemperatures,
+ ReadTemperatures,
+ 15.seconds
+ )
+
case UpdateDevice =>
- val deviceId = random.nextInt(numberOfDevices)
+ seqNr += 1
+ //val deviceId = random.nextInt(numberOfDevices)
+ val deviceId = (seqNr % numberOfDevices).toInt
+ sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId)
val temperature = 5 + 30 * random.nextDouble()
- val msg = Device.RecordTemperature(deviceId, temperature)
+ val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr)
log.info(s"Sending $msg")
deviceRegion ! msg
+ case Device.RecordTemperatureAck(deviceId, startTime, seqNr) =>
+ val durationMs = (System.nanoTime() - startTime) / 1000 / 1000
+ if (durationMs > 500)
+ log.info("Delayed ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
+ else
+ log.info("Ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
+
+ sequenceNumbers -= seqNr
+ log.info("Pending sequence numbers: {}", sequenceNumbers)
+
case ReadTemperatures =>
(0 to numberOfDevices).foreach { deviceId =>
deviceRegion ! Device.GetTemperature(deviceId)
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
index 897ccf5..fa91ce7 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
@@ -18,7 +18,10 @@ object ShardingApp {
ports foreach { port =>
// Override the configuration of the port
val config = ConfigFactory
- .parseString("akka.remote.artery.canonical.port=" + port)
+ .parseString(s"""
+ akka.remote.artery.canonical.port=$port
+ sample.sending-temperatures = ${port == "2555"}
+ """)
.withFallback(ConfigFactory.load())
// Create an Akka system
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org