You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:34:54 UTC

[incubator-pekko-samples] 02/02: Try with Akka 2.5.26

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-28011-sharding-rolling-update-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit b305b1f43b93da3144840547ade0d31fda237db0
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Tue Oct 29 09:23:24 2019 +0100

    Try with Akka 2.5.26
---
 akka-sample-sharding-scala/build.sbt               |  4 +--
 .../src/main/resources/application.conf            |  7 ++++-
 .../src/main/scala/sample/sharding/Device.scala    |  2 ++
 .../src/main/scala/sample/sharding/Devices.scala   | 30 ++++++++++++++--------
 .../main/scala/sample/sharding/ShardingApp.scala   |  2 +-
 5 files changed, 30 insertions(+), 15 deletions(-)

diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt
index 73aff9d..cce8b17 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -1,7 +1,7 @@
 import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
 import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
 
-val akkaVersion = "2.6.0-RC1"
+val akkaVersion = "2.5.26"
 
 lazy val `akka-sample-sharding-scala` = project
   .in(file("."))
@@ -20,7 +20,7 @@ lazy val `akka-sample-sharding-scala` = project
     javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"),
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
-      "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
+//      "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
       "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
       "ch.qos.logback" % "logback-classic" % "1.2.3",
       "org.scalatest" %% "scalatest" % "3.0.7" % Test
diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf
index 853f3c9..bfb5b46 100644
--- a/akka-sample-sharding-scala/src/main/resources/application.conf
+++ b/akka-sample-sharding-scala/src/main/resources/application.conf
@@ -7,13 +7,15 @@ akka {
     provider = cluster
 
     serialization-bindings {
-      "sample.sharding.Message" = jackson-cbor
+      #"sample.sharding.Message" = jackson-cbor
     }
   }
 
   # For the sample, just bind to loopback and do not allow access from the network
   # the port is overridden by the logic in main class
   remote.artery {
+    enabled = on
+    transport = tcp
     canonical.port = 0
     canonical.hostname = 127.0.0.1
   }
@@ -26,6 +28,9 @@ akka {
 
     min-nr-of-members = 3
 
+    gossip-interval = 100ms
+    leader-actions-interval = 100 ms
+
     log-info-verbose = on
   }
 
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
index 1e166a2..4451d47 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
@@ -59,6 +59,8 @@ class Device extends Actor with ActorLogging {
     else values.sum / values.size
 
   override def postStop(): Unit = {
+//    log.info("Stopping Device {}", self.path.name)
+//    Thread.sleep(200) // FIXME slow stopping
     log.info("Stopped Device {}", self.path.name)
   }
 }
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
index d5a8b22..d16079c 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
@@ -58,26 +58,34 @@ class Devices extends Actor with ActorLogging with Timers {
 
   def receive = {
     case Start =>
-      timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis)
-      timers.startTimerWithFixedDelay(
+//      timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 100.millis)
+//      timers.startTimerWithFixedDelay(
+//        ReadTemperatures,
+//        ReadTemperatures,
+//        15.seconds
+//      )
+      timers.startPeriodicTimer(UpdateDevice, UpdateDevice, 200.millis)
+      timers.startPeriodicTimer(
         ReadTemperatures,
         ReadTemperatures,
         15.seconds
       )
 
     case UpdateDevice =>
-      seqNr += 1
-      //val deviceId = random.nextInt(numberOfDevices)
-      val deviceId = (seqNr % numberOfDevices).toInt
-      sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId)
-      val temperature = 5 + 30 * random.nextDouble()
-      val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr)
-      log.info(s"Sending $msg")
-      deviceRegion ! msg
+      (1 to numberOfDevices).foreach {_ =>
+        seqNr += 1
+        //val deviceId = random.nextInt(numberOfDevices)
+        val deviceId = (seqNr % numberOfDevices).toInt
+        sequenceNumbers = sequenceNumbers.updated(seqNr, deviceId)
+        val temperature = 5 + 30 * random.nextDouble()
+        val msg = Device.RecordTemperature(deviceId, temperature, System.nanoTime(), seqNr)
+        log.info(s"Sending $msg")
+        deviceRegion ! msg
+      }
 
     case Device.RecordTemperatureAck(deviceId, startTime, seqNr) =>
       val durationMs = (System.nanoTime() - startTime) / 1000 / 1000
-      if (durationMs > 500)
+      if (durationMs > 300)
         log.info("Delayed ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
       else
         log.info("Ack of device {} seqNr {} after {} ms", deviceId, seqNr, durationMs)
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
index fa91ce7..932461c 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/ShardingApp.scala
@@ -20,7 +20,7 @@ object ShardingApp {
       val config = ConfigFactory
         .parseString(s"""
         akka.remote.artery.canonical.port=$port
-        sample.sending-temperatures = ${port == "2555"}
+        sample.sending-temperatures = ${port == "2553"}
         """)
         .withFallback(ConfigFactory.load())
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org