You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:34:52 UTC

[incubator-pekko-samples] branch wip-28011-sharding-rolling-update-patriknw created (now b305b1f)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a change to branch wip-28011-sharding-rolling-update-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git


      at b305b1f  Try with Akka 2.5.26

This branch includes the following new commits:

     new 2ac8b4f  test rolling update latency
     new b305b1f  Try with Akka 2.5.26

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 01/02: test rolling update latency

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-28011-sharding-rolling-update-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 2ac8b4ff3a43395e1ee592c9a1ad1f038b0ba5d1
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Mon Oct 21 15:08:34 2019 +0200

    test rolling update latency
---
 akka-sample-sharding-scala/build.sbt               |  4 +-
 .../src/main/resources/application.conf            | 13 ++++--
 .../src/main/resources/logback.xml                 | 21 ++++++++++
 .../src/main/scala/sample/sharding/Device.scala    | 16 ++++++--
 .../src/main/scala/sample/sharding/Devices.scala   | 47 ++++++++++++++++------
 .../main/scala/sample/sharding/ShardingApp.scala   |  5 ++-
 6 files changed, 85 insertions(+), 21 deletions(-)

diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt
index 32145af..73aff9d 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -1,7 +1,7 @@
 import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
 import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
 
-val akkaVersion = "2.6.0-M4"
+val akkaVersion = "2.6.0-RC1"
 
 lazy val `akka-sample-sharding-scala` = project
   .in(file("."))
@@ -21,6 +21,8 @@ lazy val `akka-sample-sharding-scala` = project
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
       "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
+      "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
+      "ch.qos.logback" % "logback-classic" % "1.2.3",
       "org.scalatest" %% "scalatest" % "3.0.7" % Test
     ),
     mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"),
diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf
index fc64a62..853f3c9 100644
--- a/akka-sample-sharding-scala/src/main/resources/application.conf
+++ b/akka-sample-sharding-scala/src/main/resources/application.conf
@@ -1,5 +1,7 @@
 akka {
-  loglevel = INFO
+  loglevel = DEBUG
+  loggers = ["akka.event.slf4j.Slf4jLogger"]
+  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
 
   actor {
     provider = cluster
@@ -21,9 +23,12 @@ akka {
       "akka://ShardingSystem@127.0.0.1:2551",
       "akka://ShardingSystem@127.0.0.1:2552"]
 
-    # auto downing is NOT safe for production deployments.
-    # you may want to use it during development, read more about it in the docs.
-    auto-down-unreachable-after = 10s
+
+    min-nr-of-members = 3
+
+    log-info-verbose = on
   }
 
 }
+
+sample.sending-temperatures = on
diff --git a/akka-sample-sharding-scala/src/main/resources/logback.xml b/akka-sample-sharding-scala/src/main/resources/logback.xml
new file mode 100644
index 0000000..bc42d63
--- /dev/null
+++ b/akka-sample-sharding-scala/src/main/resources/logback.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
+        <queueSize>1024</queueSize>
+        <neverBlock>true</neverBlock>
+        <appender-ref ref="STDOUT" />
+    </appender>
+
+    <logger name="akka.cluster.sharding" level="DEBUG" />
+
+    <root level="INFO">
+        <appender-ref ref="ASYNC"/>
+    </root>
+</configuration>
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
index 19b0488..1e166a2 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
@@ -10,9 +10,12 @@ import akka.actor._
 object Device {
   sealed trait Command extends Message
 
-  case class RecordTemperature(deviceId: Int, temperature: Double)
+  case class RecordTemperature(deviceId: Int, temperature: Double, startTime: Long, seqNr: Long)
       extends Command
 
+  case class RecordTemperatureAck(deviceId: Int, startTime: Long, seqNr: Long)
+    extends Command
+
   case class GetTemperature(deviceId: Int) extends Command
 
   case class Temperature(deviceId: Int,
@@ -27,15 +30,18 @@ object Device {
 class Device extends Actor with ActorLogging {
   import Device._
 
+  log.info("Starting Device {}", self.path.name)
+
   override def receive = counting(Vector.empty)
 
   def counting(values: Vector[Double]): Receive = {
-    case RecordTemperature(id, temp) =>
+    case RecordTemperature(id, temp, startTime, seqNr) =>
       val temperatures = values :+ temp
       log.info(
         s"Recording temperature $temp for device $id, average is ${average(temperatures)} after " +
-          s"${temperatures.size} readings"
+          s"${temperatures.size} readings. SeqNr [$seqNr]"
       )
+      sender() ! RecordTemperatureAck(id, startTime, seqNr)
       context.become(counting(temperatures))
 
     case GetTemperature(id) =>
@@ -51,4 +57,8 @@ class Device extends Actor with ActorLogging {
   private def average(values: Vector[Double]): Double =
     if (values.isEmpty) Double.NaN
     else values.sum / values.size
+
+  override def postStop(): Unit = {
+    log.info("Stopped Device {}", self.path.name)
+  }
 }
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
index 6cbe318..d5a8b22 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
@@ -12,6 +12,8 @@ object Devices {
 
   case object ReadTemperatures
 
+  case object Start
+
   def props(): Props =
     Props(new Devices)
 }
@@ -20,14 +22,14 @@ class Devices extends Actor with ActorLogging with Timers {
   import Devices._
 
   private val extractEntityId: ShardRegion.ExtractEntityId = {
-    case msg @ Device.RecordTemperature(id, _) => (id.toString, msg)
+    case msg @ Device.RecordTemperature(id, _,_, _) => (id.toString, msg)
     case msg @ Device.GetTemperature(id)       => (id.toString, msg)
   }
 
-  private val numberOfShards = 100
+  private val numberOfShards = 10
 
   private val extractShardId: ShardRegion.ExtractShardId = {
-    case Device.RecordTemperature(id, _) =>
+    case Device.RecordTemperature(id, _, _, _) =>
       (math.abs(id.hashCode) % numberOfShards).toString
     case Device.GetTemperature(id) =>
       (math.abs(id.hashCode) % numberOfShards).toString
@@ -45,23 +47,44 @@ class Devices extends Actor with ActorLogging with Timers {
   )
 
   val random = new Random()
-  val numberOfDevices = 50
+  val numberOfDevices = 10
 
-  timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 1.second)
-  timers.startTimerWithFixedDelay(
-    ReadTemperatures,
-    ReadTemperatures,
-    15.seconds
-  )
+  if (context.system.settings.config.getBoolean("sample.sending-temperatures")) {
+    timers.startSingleTimer(Start, Start, 20.seconds)
+  }
+
+  private var seqNr = 0L
+  private var sequenceNumbers = Map.empty[Long, Int]
 
   def receive = {
+    case Start =>
+      timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis)
+      timers.startTimerWithFixedDelay(
+        ReadTemperatures,
+        ReadTemperatures,
+        15.seconds
+      )
+
     case UpdateDevice =>
-      val deviceId = random.nextInt(numberOfDevices)
+      seqNr += 1
+      //val deviceId = random.nextInt(numberOfDevices)
+      val deviceId = (seqNr % numberOfDevices).toInt
+      sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId)
       val temperature = 5 + 30 * random.nextDouble()
-      val msg = Device.RecordTemperature(deviceId, temperature)
+      val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr)
       log.info(s"Sending $msg")
       deviceRegion ! msg
 
+    case Device.RecordTemperatureAck(deviceId, startTime, seqNr) =>
+      val durationMs = (System.nanoTime() - startTime) / 1000 / 1000
+      if (durationMs > 500)
+        log.info("Delayed ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
+      else
+        log.info("Ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
+
+      sequenceNumbers -= seqNr
+      log.info("Pending sequence numbers: {}", sequenceNumbers)
+
     case ReadTemperatures =>
       (0 to numberOfDevices).foreach { deviceId =>
         deviceRegion ! Device.GetTemperature(deviceId)
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
index 897ccf5..fa91ce7 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
@@ -18,7 +18,10 @@ object ShardingApp {
     ports foreach { port =>
       // Override the configuration of the port
       val config = ConfigFactory
-        .parseString("akka.remote.artery.canonical.port=" + port)
+        .parseString(s"""
+        akka.remote.artery.canonical.port=$port
+        sample.sending-temperatures = ${port == "2555"}
+        """)
         .withFallback(ConfigFactory.load())
 
       // Create an Akka system


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 02/02: Try with Akka 2.5.26

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-28011-sharding-rolling-update-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit b305b1f43b93da3144840547ade0d31fda237db0
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Tue Oct 29 09:23:24 2019 +0100

    Try with Akka 2.5.26
---
 akka-sample-sharding-scala/build.sbt               |  4 +--
 .../src/main/resources/application.conf            |  7 ++++-
 .../src/main/scala/sample/sharding/Device.scala    |  2 ++
 .../src/main/scala/sample/sharding/Devices.scala   | 30 ++++++++++++++--------
 .../main/scala/sample/sharding/ShardingApp.scala   |  2 +-
 5 files changed, 30 insertions(+), 15 deletions(-)

diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt
index 73aff9d..cce8b17 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -1,7 +1,7 @@
 import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
 import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
 
-val akkaVersion = "2.6.0-RC1"
+val akkaVersion = "2.5.26"
 
 lazy val `akka-sample-sharding-scala` = project
   .in(file("."))
@@ -20,7 +20,7 @@ lazy val `akka-sample-sharding-scala` = project
     javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"),
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
-      "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
+//      "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
       "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
       "ch.qos.logback" % "logback-classic" % "1.2.3",
       "org.scalatest" %% "scalatest" % "3.0.7" % Test
diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf
index 853f3c9..bfb5b46 100644
--- a/akka-sample-sharding-scala/src/main/resources/application.conf
+++ b/akka-sample-sharding-scala/src/main/resources/application.conf
@@ -7,13 +7,15 @@ akka {
     provider = cluster
 
     serialization-bindings {
-      "sample.sharding.Message" = jackson-cbor
+      #"sample.sharding.Message" = jackson-cbor
     }
   }
 
   # For the sample, just bind to loopback and do not allow access from the network
   # the port is overridden by the logic in main class
   remote.artery {
+    enabled = on
+    transport = tcp
     canonical.port = 0
     canonical.hostname = 127.0.0.1
   }
@@ -26,6 +28,9 @@ akka {
 
     min-nr-of-members = 3
 
+    gossip-interval = 100ms
+    leader-actions-interval = 100 ms
+
     log-info-verbose = on
   }
 
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
index 1e166a2..4451d47 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
@@ -59,6 +59,8 @@ class Device extends Actor with ActorLogging {
     else values.sum / values.size
 
   override def postStop(): Unit = {
+//    log.info("Stopping Device {}", self.path.name)
+//    Thread.sleep(200) // FIXME slow stopping
     log.info("Stopped Device {}", self.path.name)
   }
 }
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
index d5a8b22..d16079c 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
@@ -58,26 +58,34 @@ class Devices extends Actor with ActorLogging with Timers {
 
   def receive = {
     case Start =>
-      timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis)
-      timers.startTimerWithFixedDelay(
+//      timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis)
+//      timers.startTimerWithFixedDelay(
+//        ReadTemperatures,
+//        ReadTemperatures,
+//        15.seconds
+//      )
+      timers.startPeriodicTimer(UpdateDevice, UpdateDevice, 200.millis)
+      timers.startPeriodicTimer(
         ReadTemperatures,
         ReadTemperatures,
         15.seconds
       )
 
     case UpdateDevice =>
-      seqNr += 1
-      //val deviceId = random.nextInt(numberOfDevices)
-      val deviceId = (seqNr % numberOfDevices).toInt
-      sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId)
-      val temperature = 5 + 30 * random.nextDouble()
-      val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr)
-      log.info(s"Sending $msg")
-      deviceRegion ! msg
+      (1 to numberOfDevices).foreach {_ =>
+        seqNr += 1
+        //val deviceId = random.nextInt(numberOfDevices)
+        val deviceId = (seqNr % numberOfDevices).toInt
+        sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId)
+        val temperature = 5 + 30 * random.nextDouble()
+        val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr)
+        log.info(s"Sending $msg")
+        deviceRegion ! msg
+      }
 
     case Device.RecordTemperatureAck(deviceId, startTime, seqNr) =>
       val durationMs = (System.nanoTime() - startTime) / 1000 / 1000
-      if (durationMs > 500)
+      if (durationMs > 300)
         log.info("Delayed ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
       else
         log.info("Ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
index fa91ce7..932461c 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
@@ -20,7 +20,7 @@ object ShardingApp {
       val config = ConfigFactory
         .parseString(s"""
         akka.remote.artery.canonical.port=$port
-        sample.sending-temperatures = ${port == "2555"}
+        sample.sending-temperatures = ${port == "2553"}
         """)
         .withFallback(ConfigFactory.load())
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org