You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:34:40 UTC

[incubator-pekko-samples] branch wip-rolling-update-typed-patriknw created (now dc25a53)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a change to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git


      at dc25a53  roll 3 - java

This branch includes the following new commits:

     new 4344cf2  Akka 2.6.0
     new 6ee737c  test ask - scala
     new 359e08d  roll 1 - scala
     new 0ceac95  roll 2 - scala
     new b27976b  roll 3 - scala
     new ba48a34  test ask - java
     new 47bf16a  roll 1 - java
     new b415647  roll 2 - java
     new dc25a53  roll 3 - java

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 09/09: roll 3 - java

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit dc25a53d14c9db3edf088abcd6c9b8d0b0a320d5
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Fri Jul 12 09:52:38 2019 +0200

    roll 3 - java
    
    * custom ShardingMessageExtractor can be removed
---
 .../src/main/java/sample/sharding/Device.java      | 44 +++-------------------
 1 file changed, 5 insertions(+), 39 deletions(-)

diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java
index 0310987..4a7a867 100644
--- a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java
+++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java
@@ -10,9 +10,6 @@ import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
-import akka.cluster.sharding.typed.HashCodeMessageExtractor;
-import akka.cluster.sharding.typed.ShardingEnvelope;
-import akka.cluster.sharding.typed.ShardingMessageExtractor;
 import akka.cluster.sharding.typed.javadsl.ClusterSharding;
 import akka.cluster.sharding.typed.javadsl.Entity;
 import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
@@ -23,42 +20,11 @@ public class Device extends AbstractBehavior<Device.Command> {
   public static final EntityTypeKey<Command> TYPE_KEY = EntityTypeKey.create(Command.class, "Device");
 
   public static void init(ActorSystem<?> system) {
-    ShardingMessageExtractor<Object, Command> messageExtractor = new ShardingMessageExtractor<Object, Command>() {
-      final HashCodeMessageExtractor<Command> delegate = new HashCodeMessageExtractor<>(
-        system.settings().config().getInt("akka.cluster.sharding.number-of-shards"));
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public String entityId(Object message) {
-        if (message instanceof RecordTemperature)
-          return String.valueOf(((RecordTemperature) message).deviceId);
-        else if (message instanceof GetTemperature)
-          return String.valueOf(((GetTemperature) message).deviceId);
-        else if (message instanceof ShardingEnvelope)
-          return delegate.entityId((ShardingEnvelope<Command>) message);
-        else
-          return null;
-      }
-
-      @Override
-      public String shardId(String entityId) {
-        return delegate.shardId(entityId);
-      }
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public Command unwrapMessage(Object message) {
-        if (message instanceof Command)
-          return (Command) message;
-        else if (message instanceof ShardingEnvelope)
-          return delegate.unwrapMessage((ShardingEnvelope<Command>) message);
-        else
-          return null;
-      }
-    };
-
-    ClusterSharding.get(system).init(Entity.of(TYPE_KEY, context -> Device.create())
-      .withMessageExtractor(messageExtractor));
+    // If the original hashing function was using
+    // `(math.abs(id.hashCode) % numberOfShards).toString`
+    // the default HashCodeMessageExtractor in Typed can be used.
+    // That is also compatible with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
+    ClusterSharding.get(system).init(Entity.of(TYPE_KEY, context -> Device.create()));
   }
 
   public interface Command extends Message {}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 04/09: roll 2 - scala

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 0ceac9576baa50b67de176af362656bcbfa0dea4
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Wed Jun 19 11:35:49 2019 +0200

    roll 2 - scala
    
    * convert actors to Typed
    * use custom ShardingMessageExtractor in ClusterSharding.init
    * use same akka.cluster.sharding.number-of-shards config as was in roll 1
---
 akka-sample-sharding-typed-scala/LICENSE           |  10 ++
 akka-sample-sharding-typed-scala/README.md         |  39 +++++++
 akka-sample-sharding-typed-scala/build.sbt         |  33 ++++++
 .../project/build.properties                       |   1 +
 .../project/plugins.sbt                            |   2 +
 .../src/main/resources/application.conf            |  31 ++++++
 .../src/main/scala/sample/sharding/Device.scala    | 113 +++++++++++++++++++++
 .../src/main/scala/sample/sharding/Devices.scala   |  70 +++++++++++++
 .../src/main/scala/sample/sharding/Message.scala   |   9 ++
 .../main/scala/sample/sharding/ShardingApp.scala   |  29 ++++++
 10 files changed, 337 insertions(+)

diff --git a/akka-sample-sharding-typed-scala/LICENSE b/akka-sample-sharding-typed-scala/LICENSE
new file mode 100644
index 0000000..4239f09
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/LICENSE
@@ -0,0 +1,10 @@
+Akka sample by Lightbend
+
+Licensed under Public Domain (CC0)
+
+To the extent possible under law, the person who associated CC0 with
+this Template has waived all copyright and related or neighboring
+rights to this Template.
+
+You should have received a copy of the CC0 legalcode along with this
+work.  If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
diff --git a/akka-sample-sharding-typed-scala/README.md b/akka-sample-sharding-typed-scala/README.md
new file mode 100644
index 0000000..bbb52b6
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/README.md
@@ -0,0 +1,39 @@
+This tutorial contains a sample illustrating [Akka Cluster Sharding](http://doc.akka.io/docs/akka/current/scala/cluster-sharding.html#an-example).
+
+## Example overview
+
+First of all, make sure the correct settings in [application.conf](src/main/resources/application.conf) are set as described in the akka-sample-cluster tutorial.
+
+Open [ShardingApp.scala](src/main/scala/sample/sharding/ShardingApp.scala).
+
+This small program starts an ActorSystem with Cluster Sharding enabled. It joins the cluster and starts a `Devices` actor. This actor starts the infrastructure to shard `Device` instances and starts sending messages to arbitrary devices.
+
+To run this sample, type `sbt "runMain sample.sharding.ShardingApp"` if it is not already started.
+
+`ShardingApp` starts three actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and then open three terminal windows.
+
+In the first terminal window, start the first seed node with the following command:
+
+    sbt "runMain sample.sharding.ShardingApp 2551"
+
+2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'.
+
+You'll see a log message when `Devices` sends a message to record the current temperature, and for each of those you'll see a log message from the `Device` showing the action taken and the new average temperature.
+
+In the second terminal window, start the second seed node with the following command:
+
+    sbt "runMain sample.sharding.ShardingApp 2552"
+
+2552 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node and becomes a member of the cluster. Its status changed to 'Up'. Switch over to the first terminal window and see in the log output that the member joined.
+
+Some of the devices that were originally on the `ActorSystem` on port 2551 will be migrated to the newly joined `ActorSystem` on port 2552. The migration is straightforward: the old actor is stopped and a fresh actor is started on the newly created `ActorSystem`. Notice this means the average is reset: if you want your state to be persisted you'll need to take care of this yourself. For this reason Cluster Sharding and Akka Persistence are such a popular combination.
+
+Start another node in the third terminal window with the following command:
+
+    sbt "runMain sample.sharding.ShardingApp 0"
+
+Now you don't need to specify the port number, 0 means that it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal windows.
+
+Start even more nodes in the same way, if you like.
+
+Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows. The other nodes will detect the failure after a while, which you can see in the log output in the other terminals.
diff --git a/akka-sample-sharding-typed-scala/build.sbt b/akka-sample-sharding-typed-scala/build.sbt
new file mode 100644
index 0000000..55e116b
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/build.sbt
@@ -0,0 +1,33 @@
+import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
+import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
+
+val akkaVersion = "2.6.0-M4"
+
+lazy val `akka-sample-sharding-typed-scala` = project
+  .in(file("."))
+  .settings(multiJvmSettings: _*)
+  .settings(
+    organization := "com.typesafe.akka.samples",
+    scalaVersion := "2.12.8",
+    scalacOptions in Compile ++= Seq(
+      "-deprecation",
+      "-feature",
+      "-unchecked",
+      "-Xlog-reflective-calls",
+      "-Xlint"
+    ),
+    javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
+    javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"),
+    libraryDependencies ++= Seq(
+      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
+      "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
+      "org.scalatest" %% "scalatest" % "3.0.7" % Test
+    ),
+    mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"),
+    // disable parallel tests
+    parallelExecution in Test := false,
+    licenses := Seq(
+      ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))
+    )
+  )
+  .configs(MultiJvm)
diff --git a/akka-sample-sharding-typed-scala/project/build.properties b/akka-sample-sharding-typed-scala/project/build.properties
new file mode 100644
index 0000000..c0bab04
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/project/build.properties
@@ -0,0 +1 @@
+sbt.version=1.2.8
diff --git a/akka-sample-sharding-typed-scala/project/plugins.sbt b/akka-sample-sharding-typed-scala/project/plugins.sbt
new file mode 100644
index 0000000..2d02635
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/project/plugins.sbt
@@ -0,0 +1,2 @@
+addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
+addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0")
diff --git a/akka-sample-sharding-typed-scala/src/main/resources/application.conf b/akka-sample-sharding-typed-scala/src/main/resources/application.conf
new file mode 100644
index 0000000..c5e13e4
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/src/main/resources/application.conf
@@ -0,0 +1,31 @@
+akka {
+  loglevel = INFO
+
+  actor {
+    provider = "cluster"
+
+    serialization-bindings {
+      "sample.sharding.Message" = jackson-cbor
+    }
+  }
+
+  # For the sample, just bind to loopback and do not allow access from the network
+  # the port is overridden by the logic in main class
+  remote.artery {
+    canonical.port = 0
+    canonical.hostname = 127.0.0.1
+  }
+
+  cluster {
+    seed-nodes = [
+      "akka://ShardingSystem@127.0.0.1:2551",
+      "akka://ShardingSystem@127.0.0.1:2552"]
+
+    # auto downing is NOT safe for production deployments.
+    # you may want to use it during development, read more about it in the docs.
+    auto-down-unreachable-after = 10s
+
+    sharding.number-of-shards = 100
+  }
+
+}
diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala
new file mode 100644
index 0000000..6854b4d
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala
@@ -0,0 +1,113 @@
+package sample.sharding
+
+import akka.actor.typed.ActorRef
+import akka.actor.typed.ActorSystem
+import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.Behaviors
+import akka.cluster.sharding.typed.HashCodeMessageExtractor
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.cluster.sharding.typed.ShardingMessageExtractor
+import akka.cluster.sharding.typed.scaladsl.ClusterSharding
+import akka.cluster.sharding.typed.scaladsl.Entity
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+
+/**
+  * This is just an example: cluster sharding would be overkill for just keeping a small amount of data,
+  * but becomes useful when you have a collection of 'heavy' actors (in terms of processing or state)
+  * so you need to distribute them across several nodes.
+  */
+object Device {
+  val TypeKey = EntityTypeKey[Device.Command]("Device")
+
+  def init(system: ActorSystem[_]): Unit = {
+
+    val messageExtractor =
+      new ShardingMessageExtractor[Any, Command] {
+
+        // Note that `HashCodeMessageExtractor` is using
+        // `(math.abs(id.hashCode) % numberOfShards).toString`.
+        // If the old Untyped nodes were using a different hashing function
+        // this delegate HashCodeMessageExtractor can't be used and
+        // same hashing function as before must be implemented here.
+        // `akka.cluster.sharding.typed.HashCodeMessageExtractor` is compatible
+        // with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
+        val delegate = new HashCodeMessageExtractor[Device.Command](
+          system.settings.config
+            .getInt("akka.cluster.sharding.number-of-shards")
+        )
+
+        override def entityId(message: Any): String = {
+          message match {
+            case Device.RecordTemperature(deviceId, _) =>
+              deviceId.toString
+            case Device.GetTemperature(deviceId, _) =>
+              deviceId.toString
+            case env: ShardingEnvelope[Device.Command] =>
+              delegate.entityId(env)
+          }
+        }
+
+        override def shardId(entityId: String): String = {
+          delegate.shardId(entityId)
+        }
+
+        override def unwrapMessage(message: Any): Command = {
+          message match {
+            case m: Device.RecordTemperature => m
+            case m: Device.GetTemperature    => m
+            case env: ShardingEnvelope[Device.RecordTemperature] =>
+              delegate.unwrapMessage(env)
+          }
+        }
+      }
+
+    ClusterSharding(system).init(
+      Entity(TypeKey, _ => Device())
+        .withMessageExtractor(messageExtractor)
+    )
+  }
+
+  sealed trait Command extends Message
+
+  case class RecordTemperature(deviceId: Int, temperature: Double)
+      extends Command
+
+  case class GetTemperature(deviceId: Int, replyTo: ActorRef[Temperature])
+      extends Command
+
+  case class Temperature(deviceId: Int,
+                         average: Double,
+                         latest: Double,
+                         readings: Int)
+      extends Message
+
+  def apply(): Behavior[Command] =
+    counting(Vector.empty)
+
+  private def counting(values: Vector[Double]): Behavior[Command] = {
+    Behaviors.receive { (context, cmd) =>
+      cmd match {
+        case RecordTemperature(id, temp) =>
+          val temperatures = values :+ temp
+          context.log.info(
+            s"Recording temperature $temp for device $id, average is ${average(temperatures)} after " +
+              s"${temperatures.size} readings"
+          )
+          counting(temperatures)
+
+        case GetTemperature(id, replyTo) =>
+          val reply =
+            if (values.isEmpty)
+              Temperature(id, Double.NaN, Double.NaN, 0)
+            else
+              Temperature(id, average(values), values.last, values.size)
+          replyTo ! reply
+          Behaviors.same
+      }
+    }
+  }
+
+  private def average(values: Vector[Double]): Double =
+    if (values.isEmpty) Double.NaN
+    else values.sum / values.size
+}
diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala
new file mode 100644
index 0000000..60c353b
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala
@@ -0,0 +1,70 @@
+package sample.sharding
+
+import scala.concurrent.duration._
+import scala.util.Random
+
+import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.Behaviors
+import akka.cluster.sharding.typed.scaladsl.ClusterSharding
+
+object Devices {
+  sealed trait Command
+
+  private case object UpdateDevice extends Command
+
+  private case object ReadTemperatures extends Command
+
+  private case class GetTemperatureReply(temp: Device.Temperature)
+      extends Command
+
+  def apply(): Behavior[Command] = {
+    Behaviors.setup { context =>
+      Device.init(context.system)
+      val sharding = ClusterSharding(context.system)
+
+      Behaviors.withTimers { timers =>
+        val random = new Random()
+        val numberOfDevices = 50
+
+        timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 1.second)
+        timers.startTimerWithFixedDelay(
+          ReadTemperatures,
+          ReadTemperatures,
+          15.seconds
+        )
+
+        val temperatureAdapter =
+          context.messageAdapter[Device.Temperature](GetTemperatureReply(_))
+
+        Behaviors.receiveMessage {
+          case UpdateDevice =>
+            val deviceId = random.nextInt(numberOfDevices)
+            val temperature = 5 + 30 * random.nextDouble()
+            val msg = Device.RecordTemperature(deviceId, temperature)
+            context.log.info(s"Sending $msg")
+            sharding.entityRefFor(Device.TypeKey, deviceId.toString) ! msg
+            Behaviors.same
+
+          case ReadTemperatures =>
+            (0 to numberOfDevices).foreach { deviceId =>
+              val entityRef =
+                sharding.entityRefFor(Device.TypeKey, deviceId.toString)
+              entityRef ! Device.GetTemperature(deviceId, temperatureAdapter)
+            }
+            Behaviors.same
+
+          case GetTemperatureReply(temp: Device.Temperature) =>
+            if (temp.readings > 0)
+              context.log.info(
+                "Temperature of device {} is {} with average {} after {} readings",
+                temp.deviceId,
+                temp.latest,
+                temp.average,
+                temp.readings
+              )
+            Behaviors.same
+        }
+      }
+    }
+  }
+}
diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala
new file mode 100644
index 0000000..84baedc
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala
@@ -0,0 +1,9 @@
+/**
+  * Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
+  */
+package sample.sharding
+
+/**
+  * Marker interface for actor messages that are serialized.
+  */
+trait Message
diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala
new file mode 100644
index 0000000..cff9352
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala
@@ -0,0 +1,29 @@
+package sample.sharding
+
+import akka.actor.typed.ActorSystem
+import com.typesafe.config.ConfigFactory
+
+object ShardingApp {
+  def main(args: Array[String]): Unit = {
+    if (args.isEmpty)
+      startup(Seq("2551", "2552", "0"))
+    else
+      startup(args)
+  }
+
+  def startup(ports: Seq[String]): Unit = {
+    // In a production application you wouldn't typically start multiple ActorSystem instances in the
+    // same JVM, here we do it to easily demonstrate these ActorSytems (which would be in separate JVM's)
+    // talking to each other.
+    ports foreach { port =>
+      // Override the configuration of the port
+      val config = ConfigFactory
+        .parseString("akka.remote.artery.canonical.port=" + port)
+        .withFallback(ConfigFactory.load())
+
+      // Create an Akka system, with Devices actor that starts the sharding and sends random messages
+      ActorSystem(Devices(), "ShardingSystem", config)
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 07/09: roll 1 - java

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 47bf16a8acd8743e78d8aca170a8f64f334aa582
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Fri Jul 12 07:37:27 2019 +0200

    roll 1 - java
    
    * update to Akka 2.6.x first
    * and add dependeny `akka-cluster-sharding-typed` although the actors remain untyped in first rollout
    * define akka.cluster.sharding.number-of-shards config
      * same value as you have used previously for number of shards
      * same value as you will use for Typed
      * otherwise Typed node will not be able to join in roll 2
    * add akka.cluster.sharding.typed.ShardingEnvelope to message extractor
    * sender and ask
      * add replyTo ActorRef to the messages that are using sender for the reply
      * change from `akka.pattern.Patterns.ask` to `akka.pattern.Patterns.askWithReply` to populate
        the replyTo field in the messages
      * use the replyTo field instead of getSender() unless it is null
---
 akka-sample-sharding-java/build.sbt                     |  1 +
 .../src/main/java/sample/sharding/Device.java           | 17 ++++++++++++-----
 .../src/main/java/sample/sharding/Devices.java          | 15 ++++++++++++---
 .../src/main/resources/application.conf                 |  3 +++
 4 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/akka-sample-sharding-java/build.sbt b/akka-sample-sharding-java/build.sbt
index e1aa209..5b4f742 100644
--- a/akka-sample-sharding-java/build.sbt
+++ b/akka-sample-sharding-java/build.sbt
@@ -20,6 +20,7 @@ val `akka-sample-sharding-java` = project
     javacOptions in doc in Compile := Seq("-parameters", "-Xdoclint:none"),
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
+      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
       "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
       "org.scalatest" %% "scalatest" % "3.0.7" % Test
     ),
diff --git a/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java b/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java
index 7d6a717..8d722d8 100644
--- a/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java
+++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java
@@ -32,10 +32,12 @@ public class Device extends AbstractActor {
 
   public static class GetTemperature implements Command {
     public final int deviceId;
+    public final ActorRef replyTo;
 
     @JsonCreator
-    public GetTemperature(int deviceId) {
+    public GetTemperature(int deviceId, ActorRef replyTo) {
       this.deviceId = deviceId;
+      this.replyTo = replyTo;
     }
   }
 
@@ -77,13 +79,18 @@ public class Device extends AbstractActor {
   }
 
   private void receiveGetTemperature(GetTemperature cmd) {
+    Temperature reply;
     if (temperatures.isEmpty()) {
-      getSender().tell(new Temperature(cmd.deviceId, Double.NaN,
-        Double.NaN, 0), getSelf());
+      reply = new Temperature(cmd.deviceId, Double.NaN, Double.NaN, 0);
     } else {
-      getSender().tell(new Temperature(cmd.deviceId, average(temperatures),
-        temperatures.get(temperatures.size() - 1), temperatures.size()), getSelf());
+      reply = new Temperature(cmd.deviceId, average(temperatures),
+        temperatures.get(temperatures.size() - 1), temperatures.size());
     }
+
+    if (cmd.replyTo == null)
+      getSender().tell(reply, getSelf());
+    else
+      cmd.replyTo.tell(reply, getSelf());
   }
 
   private double sum(List<Double> values) {
diff --git a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
index 2ada983..72d4e00 100644
--- a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
+++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
@@ -14,6 +14,7 @@ import akka.event.LoggingAdapter;
 import akka.cluster.sharding.ClusterSharding;
 import akka.cluster.sharding.ClusterShardingSettings;
 import akka.cluster.sharding.ShardRegion;
+import akka.cluster.sharding.typed.ShardingEnvelope;
 import akka.pattern.Patterns;
 
 public class Devices extends AbstractActorWithTimers {
@@ -33,7 +34,13 @@ public class Devices extends AbstractActorWithTimers {
         return String.valueOf(((Device.RecordTemperature) message).deviceId);
       else if (message instanceof Device.GetTemperature)
         return String.valueOf(((Device.GetTemperature) message).deviceId);
-      else
+      else if (message instanceof ShardingEnvelope) {
+        ShardingEnvelope envelope = (ShardingEnvelope) message;
+        if (envelope.message() instanceof Device.RecordTemperature)
+          return String.valueOf(((Device.RecordTemperature) envelope.message()).deviceId);
+        else
+          return null;
+      } else
         return null;
     }
   };
@@ -88,10 +95,12 @@ public class Devices extends AbstractActorWithTimers {
   private void receiveReadTemperatures() {
     for (int deviceId = 0; deviceId < numberOfDevices; deviceId++) {
       if (deviceId >= 40) {
-        CompletionStage<Object> reply = Patterns.ask(deviceRegion, new Device.GetTemperature(deviceId), Duration.ofSeconds(3));
+        final int id = deviceId;
+        CompletionStage<Object> reply = Patterns.askWithReplyTo(deviceRegion, replyTo ->
+          new Device.GetTemperature(id, replyTo), Duration.ofSeconds(3));
         Patterns.pipe(reply, getContext().getDispatcher()).to(getSelf());
       } else {
-        deviceRegion.tell(new Device.GetTemperature(deviceId), getSelf());
+        deviceRegion.tell(new Device.GetTemperature(deviceId, getSelf()), getSelf());
       }
     }
   }
diff --git a/akka-sample-sharding-java/src/main/resources/application.conf b/akka-sample-sharding-java/src/main/resources/application.conf
index 903f3af..8af5c0b 100644
--- a/akka-sample-sharding-java/src/main/resources/application.conf
+++ b/akka-sample-sharding-java/src/main/resources/application.conf
@@ -27,3 +27,6 @@ akka {
   }
 
 }
+
+akka.cluster.sharding.number-of-shards = 100
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 05/09: roll 3 - scala

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit b27976b9454e700bb233cf4924976167a07dad44
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Wed Jun 19 11:46:51 2019 +0200

    roll 3 - scala
    
    * custom ShardingMessageExtractor can be removed
---
 .../src/main/scala/sample/sharding/Device.scala    | 52 +++-------------------
 1 file changed, 5 insertions(+), 47 deletions(-)

diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala
index 6854b4d..162e43e 100644
--- a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala
+++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala
@@ -4,9 +4,6 @@ import akka.actor.typed.ActorRef
 import akka.actor.typed.ActorSystem
 import akka.actor.typed.Behavior
 import akka.actor.typed.scaladsl.Behaviors
-import akka.cluster.sharding.typed.HashCodeMessageExtractor
-import akka.cluster.sharding.typed.ShardingEnvelope
-import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.cluster.sharding.typed.scaladsl.ClusterSharding
 import akka.cluster.sharding.typed.scaladsl.Entity
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
@@ -21,50 +18,11 @@ object Device {
 
   def init(system: ActorSystem[_]): Unit = {
 
-    val messageExtractor =
-      new ShardingMessageExtractor[Any, Command] {
-
-        // Note that `HashCodeMessageExtractor` is using
-        // `(math.abs(id.hashCode) % numberOfShards).toString`.
-        // If the old Untyped nodes were using a different hashing function
-        // this delegate HashCodeMessageExtractor can't be used and
-        // same hashing function as before must be implemented here.
-        // `akka.cluster.sharding.typed.HashCodeMessageExtractor` is compatible
-        // with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
-        val delegate = new HashCodeMessageExtractor[Device.Command](
-          system.settings.config
-            .getInt("akka.cluster.sharding.number-of-shards")
-        )
-
-        override def entityId(message: Any): String = {
-          message match {
-            case Device.RecordTemperature(deviceId, _) =>
-              deviceId.toString
-            case Device.GetTemperature(deviceId, _) =>
-              deviceId.toString
-            case env: ShardingEnvelope[Device.Command] =>
-              delegate.entityId(env)
-          }
-        }
-
-        override def shardId(entityId: String): String = {
-          delegate.shardId(entityId)
-        }
-
-        override def unwrapMessage(message: Any): Command = {
-          message match {
-            case m: Device.RecordTemperature => m
-            case m: Device.GetTemperature    => m
-            case env: ShardingEnvelope[Device.RecordTemperature] =>
-              delegate.unwrapMessage(env)
-          }
-        }
-      }
-
-    ClusterSharding(system).init(
-      Entity(TypeKey, _ => Device())
-        .withMessageExtractor(messageExtractor)
-    )
+    // If the original hashing function was using
+    // `(math.abs(id.hashCode) % numberOfShards).toString`
+    // the default HashCodeMessageExtractor in Typed can be used.
+    // That is also compatible with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
+    ClusterSharding(system).init(Entity(TypeKey, _ => Device()))
   }
 
   sealed trait Command extends Message


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 01/09: Akka 2.6.0

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 4344cf2134fd340dffe8d294a252e31053b88a6a
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Wed Nov 6 14:02:17 2019 +0100

    Akka 2.6.0
---
 akka-sample-sharding-scala/build.sbt                   |  4 ++--
 .../src/main/resources/application.conf                |  4 ----
 .../src/main/resources/logback.xml                     | 18 ++++++++++++++++++
 3 files changed, 20 insertions(+), 6 deletions(-)

diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt
index 32145af..67ecc9d 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -1,7 +1,7 @@
 import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
 import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
 
-val akkaVersion = "2.6.0-M4"
+val akkaVersion = "2.6.0"
 
 lazy val `akka-sample-sharding-scala` = project
   .in(file("."))
@@ -21,7 +21,7 @@ lazy val `akka-sample-sharding-scala` = project
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
       "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
-      "org.scalatest" %% "scalatest" % "3.0.7" % Test
+      "org.scalatest" %% "scalatest" % "3.0.8" % Test
     ),
     mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"),
     // disable parallel tests
diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf
index fc64a62..6cdfff3 100644
--- a/akka-sample-sharding-scala/src/main/resources/application.conf
+++ b/akka-sample-sharding-scala/src/main/resources/application.conf
@@ -20,10 +20,6 @@ akka {
     seed-nodes = [
       "akka://ShardingSystem@127.0.0.1:2551",
       "akka://ShardingSystem@127.0.0.1:2552"]
-
-    # auto downing is NOT safe for production deployments.
-    # you may want to use it during development, read more about it in the docs.
-    auto-down-unreachable-after = 10s
   }
 
 }
diff --git a/akka-sample-sharding-scala/src/main/resources/logback.xml b/akka-sample-sharding-scala/src/main/resources/logback.xml
new file mode 100644
index 0000000..cbf0f47
--- /dev/null
+++ b/akka-sample-sharding-scala/src/main/resources/logback.xml
@@ -0,0 +1,18 @@
+<configuration>
+     <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+         <encoder>
+             <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
+         </encoder>
+     </appender>
+
+     <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
+         <queueSize>1024</queueSize>
+         <neverBlock>true</neverBlock>
+         <appender-ref ref="STDOUT" />
+     </appender>
+
+     <root level="INFO">
+         <appender-ref ref="ASYNC"/>
+     </root>
+
+ </configuration> 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 02/09: test ask - scala

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 6ee737c99a6ff32cb65bf76562c1322e04b95d2a
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Wed Jun 19 18:19:06 2019 +0200

    test ask - scala
    
    * use ask for deviceId >= 40
---
 .../src/main/scala/sample/sharding/Devices.scala               | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
index 6cbe318..5803a76 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
@@ -5,6 +5,9 @@ import scala.util.Random
 
 import akka.actor._
 import akka.cluster.sharding._
+import akka.pattern.ask
+import akka.pattern.pipe
+import akka.util.Timeout
 
 object Devices {
   // Update a random device
@@ -64,7 +67,12 @@ class Devices extends Actor with ActorLogging with Timers {
 
     case ReadTemperatures =>
       (0 to numberOfDevices).foreach { deviceId =>
-        deviceRegion ! Device.GetTemperature(deviceId)
+        if (deviceId >= 40) {
+          import context.dispatcher
+          implicit val timeout = Timeout(3.seconds)
+          deviceRegion.ask(Device.GetTemperature(deviceId)).pipeTo(self)
+        } else
+          deviceRegion ! Device.GetTemperature(deviceId)
       }
 
     case temp: Device.Temperature =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 08/09: roll 2 - java

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit b41564795b17d5afdb41ba13be753bed65c05f8f
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Fri Jul 12 09:41:27 2019 +0200

    roll 2 - java
    
    * convert actors to Typed
    * use custom ShardingMessageExtractor in ClusterSharding.init
    * use same akka.cluster.sharding.number-of-shards config as was in roll 1
---
 akka-sample-sharding-typed-java/.gitignore         |  18 +++
 akka-sample-sharding-typed-java/COPYING            | 121 +++++++++++++++
 akka-sample-sharding-typed-java/LICENSE            |  10 ++
 akka-sample-sharding-typed-java/README.md          |  39 +++++
 akka-sample-sharding-typed-java/build.sbt          |  32 ++++
 .../project/build.properties                       |   1 +
 .../project/plugins.sbt                            |   1 +
 .../src/main/java/sample/sharding/Device.java      | 164 +++++++++++++++++++++
 .../src/main/java/sample/sharding/Devices.java     | 106 +++++++++++++
 .../src/main/java/sample/sharding/Message.java     |   7 +
 .../src/main/java/sample/sharding/ShardingApp.java |  27 ++++
 .../src/main/resources/application.conf            |  32 ++++
 12 files changed, 558 insertions(+)

diff --git a/akka-sample-sharding-typed-java/.gitignore b/akka-sample-sharding-typed-java/.gitignore
new file mode 100644
index 0000000..b0814a0
--- /dev/null
+++ b/akka-sample-sharding-typed-java/.gitignore
@@ -0,0 +1,18 @@
+*#
+*.iml
+*.ipr
+*.iws
+*.pyc
+*.tm.epoch
+*.vim
+*-shim.sbt
+.idea/
+/project/plugins/project
+project/boot
+target/
+/logs
+.cache
+.classpath
+.project
+.settings
+native/
diff --git a/akka-sample-sharding-typed-java/COPYING b/akka-sample-sharding-typed-java/COPYING
new file mode 100644
index 0000000..0e259d4
--- /dev/null
+++ b/akka-sample-sharding-typed-java/COPYING
@@ -0,0 +1,121 @@
+Creative Commons Legal Code
+
+CC0 1.0 Universal
+
+    CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
+    LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
+    ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
+    INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
+    REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
+    PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
+    THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
+    HEREUNDER.
+
+Statement of Purpose
+
+The laws of most jurisdictions throughout the world automatically confer
+exclusive Copyright and Related Rights (defined below) upon the creator
+and subsequent owner(s) (each and all, an "owner") of an original work of
+authorship and/or a database (each, a "Work").
+
+Certain owners wish to permanently relinquish those rights to a Work for
+the purpose of contributing to a commons of creative, cultural and
+scientific works ("Commons") that the public can reliably and without fear
+of later claims of infringement build upon, modify, incorporate in other
+works, reuse and redistribute as freely as possible in any form whatsoever
+and for any purposes, including without limitation commercial purposes.
+These owners may contribute to the Commons to promote the ideal of a free
+culture and the further production of creative, cultural and scientific
+works, or to gain reputation or greater distribution for their Work in
+part through the use and efforts of others.
+
+For these and/or other purposes and motivations, and without any
+expectation of additional consideration or compensation, the person
+associating CC0 with a Work (the "Affirmer"), to the extent that he or she
+is an owner of Copyright and Related Rights in the Work, voluntarily
+elects to apply CC0 to the Work and publicly distribute the Work under its
+terms, with knowledge of his or her Copyright and Related Rights in the
+Work and the meaning and intended legal effect of CC0 on those rights.
+
+1. Copyright and Related Rights. A Work made available under CC0 may be
+protected by copyright and related or neighboring rights ("Copyright and
+Related Rights"). Copyright and Related Rights include, but are not
+limited to, the following:
+
+  i. the right to reproduce, adapt, distribute, perform, display,
+     communicate, and translate a Work;
+ ii. moral rights retained by the original author(s) and/or performer(s);
+iii. publicity and privacy rights pertaining to a person's image or
+     likeness depicted in a Work;
+ iv. rights protecting against unfair competition in regards to a Work,
+     subject to the limitations in paragraph 4(a), below;
+  v. rights protecting the extraction, dissemination, use and reuse of data
+     in a Work;
+ vi. database rights (such as those arising under Directive 96/9/EC of the
+     European Parliament and of the Council of 11 March 1996 on the legal
+     protection of databases, and under any national implementation
+     thereof, including any amended or successor version of such
+     directive); and
+vii. other similar, equivalent or corresponding rights throughout the
+     world based on applicable law or treaty, and any national
+     implementations thereof.
+
+2. Waiver. To the greatest extent permitted by, but not in contravention
+of, applicable law, Affirmer hereby overtly, fully, permanently,
+irrevocably and unconditionally waives, abandons, and surrenders all of
+Affirmer's Copyright and Related Rights and associated claims and causes
+of action, whether now known or unknown (including existing as well as
+future claims and causes of action), in the Work (i) in all territories
+worldwide, (ii) for the maximum duration provided by applicable law or
+treaty (including future time extensions), (iii) in any current or future
+medium and for any number of copies, and (iv) for any purpose whatsoever,
+including without limitation commercial, advertising or promotional
+purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
+member of the public at large and to the detriment of Affirmer's heirs and
+successors, fully intending that such Waiver shall not be subject to
+revocation, rescission, cancellation, termination, or any other legal or
+equitable action to disrupt the quiet enjoyment of the Work by the public
+as contemplated by Affirmer's express Statement of Purpose.
+
+3. Public License Fallback. Should any part of the Waiver for any reason
+be judged legally invalid or ineffective under applicable law, then the
+Waiver shall be preserved to the maximum extent permitted taking into
+account Affirmer's express Statement of Purpose. In addition, to the
+extent the Waiver is so judged Affirmer hereby grants to each affected
+person a royalty-free, non transferable, non sublicensable, non exclusive,
+irrevocable and unconditional license to exercise Affirmer's Copyright and
+Related Rights in the Work (i) in all territories worldwide, (ii) for the
+maximum duration provided by applicable law or treaty (including future
+time extensions), (iii) in any current or future medium and for any number
+of copies, and (iv) for any purpose whatsoever, including without
+limitation commercial, advertising or promotional purposes (the
+"License"). The License shall be deemed effective as of the date CC0 was
+applied by Affirmer to the Work. Should any part of the License for any
+reason be judged legally invalid or ineffective under applicable law, such
+partial invalidity or ineffectiveness shall not invalidate the remainder
+of the License, and in such case Affirmer hereby affirms that he or she
+will not (i) exercise any of his or her remaining Copyright and Related
+Rights in the Work or (ii) assert any associated claims and causes of
+action with respect to the Work, in either case contrary to Affirmer's
+express Statement of Purpose.
+
+4. Limitations and Disclaimers.
+
+ a. No trademark or patent rights held by Affirmer are waived, abandoned,
+    surrendered, licensed or otherwise affected by this document.
+ b. Affirmer offers the Work as-is and makes no representations or
+    warranties of any kind concerning the Work, express, implied,
+    statutory or otherwise, including without limitation warranties of
+    title, merchantability, fitness for a particular purpose, non
+    infringement, or the absence of latent or other defects, accuracy, or
+    the present or absence of errors, whether or not discoverable, all to
+    the greatest extent permissible under applicable law.
+ c. Affirmer disclaims responsibility for clearing rights of other persons
+    that may apply to the Work or any use thereof, including without
+    limitation any person's Copyright and Related Rights in the Work.
+    Further, Affirmer disclaims responsibility for obtaining any necessary
+    consents, permissions or other rights required for any use of the
+    Work.
+ d. Affirmer understands and acknowledges that Creative Commons is not a
+    party to this document and has no duty or obligation with respect to
+    this CC0 or use of the Work.
diff --git a/akka-sample-sharding-typed-java/LICENSE b/akka-sample-sharding-typed-java/LICENSE
new file mode 100644
index 0000000..4239f09
--- /dev/null
+++ b/akka-sample-sharding-typed-java/LICENSE
@@ -0,0 +1,10 @@
+Akka sample by Lightbend
+
+Licensed under Public Domain (CC0)
+
+To the extent possible under law, the person who associated CC0 with
+this Template has waived all copyright and related or neighboring
+rights to this Template.
+
+You should have received a copy of the CC0 legalcode along with this
+work.  If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
diff --git a/akka-sample-sharding-typed-java/README.md b/akka-sample-sharding-typed-java/README.md
new file mode 100644
index 0000000..ff10cd6
--- /dev/null
+++ b/akka-sample-sharding-typed-java/README.md
@@ -0,0 +1,39 @@
+This tutorial contains a sample illustrating [Akka Cluster Sharding](http://doc.akka.io/docs/akka/current/java/cluster-sharding.html#an-example).
+
+## Example overview
+
+First of all, make sure the correct settings in [application.conf](src/main/resources/application.conf) are set as described in the akka-sample-cluster tutorial.
+
+Open [ShardingApp.java](src/main/java/sample/sharding/ShardingApp.java).
+
+This small program starts an ActorSystem with Cluster Sharding enabled. It joins the cluster and starts a `Devices` actor. This actor starts the infrastructure to shard `Device` instances and starts sending messages to arbitrary devices.
+
+To run this sample, type `sbt "runMain sample.sharding.ShardingApp"` if it is not already started.
+
+`ShardingApp` starts three actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and then open three terminal windows.
+
+In the first terminal window, start the first seed node with the following command:
+
+    sbt "runMain sample.sharding.ShardingApp 2551"
+
+2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'.
+
+You'll see a log message when `Devices` sends a message to record the current temperature, and for each of those you'll see a log message from the `Device` showing the action taken and the new average temperature.
+
+In the second terminal window, start the second seed node with the following command:
+
+    sbt "runMain sample.sharding.ShardingApp 2552"
+
+2552 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node and becomes a member of the cluster. Its status changed to 'Up'. Switch over to the first terminal window and see in the log output that the member joined.
+
+Some of the devices that were originally on the `ActorSystem` on port 2551 will be migrated to the newly joined `ActorSystem` on port 2552. The migration is straightforward: the old actor is stopped and a fresh actor is started on the newly created `ActorSystem`. Notice this means the average is reset: if you want your state to be persisted you'll need to take care of this yourself. For this reason Cluster Sharding and Akka Persistence are such a popular combination.
+
+Start another node in the third terminal window with the following command:
+
+    sbt "runMain sample.sharding.ShardingApp 0"
+
+Now you don't need to specify the port number, 0 means that it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal windows.
+
+Start even more nodes in the same way, if you like.
+
+Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows. The other nodes will detect the failure after a while, which you can see in the log output in the other terminals.
diff --git a/akka-sample-sharding-typed-java/build.sbt b/akka-sample-sharding-typed-java/build.sbt
new file mode 100644
index 0000000..71085fe
--- /dev/null
+++ b/akka-sample-sharding-typed-java/build.sbt
@@ -0,0 +1,32 @@
+val akkaVersion = "2.6.0-M4"
+
+val `akka-sample-sharding-typed-java` = project
+  .in(file("."))
+  .settings(
+    organization := "com.typesafe.akka.samples",
+    scalaVersion := "2.12.8",
+    scalacOptions in Compile ++= Seq(
+      "-deprecation",
+      "-feature",
+      "-unchecked",
+      "-Xlog-reflective-calls",
+      "-Xlint"
+    ),
+    javacOptions in Compile ++= Seq(
+      "-parameters",
+      "-Xlint:unchecked",
+      "-Xlint:deprecation"
+    ),
+    javacOptions in doc in Compile := Seq("-parameters", "-Xdoclint:none"),
+    libraryDependencies ++= Seq(
+      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
+      "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
+      "org.scalatest" %% "scalatest" % "3.0.7" % Test
+    ),
+    mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"),
+    // disable parallel tests
+    parallelExecution in Test := false,
+    licenses := Seq(
+      ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))
+    )
+  )
diff --git a/akka-sample-sharding-typed-java/project/build.properties b/akka-sample-sharding-typed-java/project/build.properties
new file mode 100644
index 0000000..c0bab04
--- /dev/null
+++ b/akka-sample-sharding-typed-java/project/build.properties
@@ -0,0 +1 @@
+sbt.version=1.2.8
diff --git a/akka-sample-sharding-typed-java/project/plugins.sbt b/akka-sample-sharding-typed-java/project/plugins.sbt
new file mode 100644
index 0000000..a7f83b3
--- /dev/null
+++ b/akka-sample-sharding-typed-java/project/plugins.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0")
diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java
new file mode 100644
index 0000000..0310987
--- /dev/null
+++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java
@@ -0,0 +1,164 @@
+package sample.sharding;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.ActorSystem;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.sharding.typed.HashCodeMessageExtractor;
+import akka.cluster.sharding.typed.ShardingEnvelope;
+import akka.cluster.sharding.typed.ShardingMessageExtractor;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.cluster.sharding.typed.javadsl.Entity;
+import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+public class Device extends AbstractBehavior<Device.Command> {
+
+  public static final EntityTypeKey<Command> TYPE_KEY = EntityTypeKey.create(Command.class, "Device");
+
+  public static void init(ActorSystem<?> system) {
+    ShardingMessageExtractor<Object, Command> messageExtractor = new ShardingMessageExtractor<Object, Command>() {
+      final HashCodeMessageExtractor<Command> delegate = new HashCodeMessageExtractor<>(
+        system.settings().config().getInt("akka.cluster.sharding.number-of-shards"));
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public String entityId(Object message) {
+        if (message instanceof RecordTemperature)
+          return String.valueOf(((RecordTemperature) message).deviceId);
+        else if (message instanceof GetTemperature)
+          return String.valueOf(((GetTemperature) message).deviceId);
+        else if (message instanceof ShardingEnvelope)
+          return delegate.entityId((ShardingEnvelope<Command>) message);
+        else
+          return null;
+      }
+
+      @Override
+      public String shardId(String entityId) {
+        return delegate.shardId(entityId);
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Command unwrapMessage(Object message) {
+        if (message instanceof Command)
+          return (Command) message;
+        else if (message instanceof ShardingEnvelope)
+          return delegate.unwrapMessage((ShardingEnvelope<Command>) message);
+        else
+          return null;
+      }
+    };
+
+    ClusterSharding.get(system).init(Entity.of(TYPE_KEY, context -> Device.create())
+      .withMessageExtractor(messageExtractor));
+  }
+
+  public interface Command extends Message {}
+
+  public static class RecordTemperature implements Command {
+    public final int deviceId;
+    public final double temperature;
+
+    public RecordTemperature(int deviceId, double temperature) {
+      this.deviceId = deviceId;
+      this.temperature = temperature;
+    }
+
+    @Override
+    public String toString() {
+      return "RecordTemperature(" + deviceId + ", " + temperature + ")";
+    }
+  }
+
+  public static class GetTemperature implements Command {
+    public final int deviceId;
+    public final ActorRef<Temperature> replyTo;
+
+    @JsonCreator
+    public GetTemperature(int deviceId, ActorRef<Temperature> replyTo) {
+      this.deviceId = deviceId;
+      this.replyTo = replyTo;
+    }
+  }
+
+  public static class Temperature implements Message {
+    public final int deviceId;
+    public final double average;
+    public final double latest;
+    public final int readings;
+
+    public Temperature(int deviceId, double average, double latest, int readings) {
+      this.deviceId = deviceId;
+      this.average = average;
+      this.latest = latest;
+      this.readings = readings;
+    }
+
+    @Override
+    public String toString() {
+      return "Temperature(" + deviceId + ", " + average + ", " + latest+ ", " + readings + ")";
+    }
+  }
+
+  public static Behavior<Command> create() {
+    return Behaviors.setup(Device::new);
+  }
+
+  private final ActorContext<Command> context;
+  private List<Double> temperatures = new ArrayList<Double>();
+
+  private Device(ActorContext<Command> context) {
+    this.context = context;
+  }
+
+  @Override
+  public Receive<Command> createReceive() {
+    return newReceiveBuilder()
+      .onMessage(RecordTemperature.class, this::receiveRecordTemperature)
+      .onMessage(GetTemperature.class, this::receiveGetTemperature)
+      .build();
+  }
+
+  private Behavior<Command> receiveRecordTemperature(RecordTemperature cmd) {
+    temperatures.add(cmd.temperature);
+    context.getLog().info("Recording temperature {} for device {}, average is {} after {} readings",
+      cmd.temperature, cmd.deviceId, average(temperatures), temperatures.size());
+    return this;
+  }
+
+  private Behavior<Command> receiveGetTemperature(GetTemperature cmd) {
+    Temperature reply;
+    if (temperatures.isEmpty()) {
+      reply = new Temperature(cmd.deviceId, Double.NaN, Double.NaN, 0);
+    } else {
+      reply = new Temperature(cmd.deviceId, average(temperatures),
+        temperatures.get(temperatures.size() - 1), temperatures.size());
+    }
+
+    cmd.replyTo.tell(reply);
+    return this;
+  }
+
+  private double sum(List<Double> values) {
+    double result = 0.0;
+    for (double d : values) {
+      result += d;
+    }
+    return result;
+  }
+
+  private double average(List<Double> values) {
+    if (values.isEmpty())
+      return Double.NaN;
+    else
+      return sum(values) / values.size();
+  }
+}
diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Devices.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Devices.java
new file mode 100644
index 0000000..5fe32d1
--- /dev/null
+++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Devices.java
@@ -0,0 +1,106 @@
+package sample.sharding;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.cluster.sharding.typed.javadsl.EntityRef;
+
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.CompletionStage;
+
+public class Devices extends AbstractBehavior<Devices.Command> {
+
+  public interface Command {}
+
+  public enum UpdateDevice implements Command {
+    INSTANCE
+  }
+
+  public enum ReadTemperatures implements Command {
+    INSTANCE
+  }
+
+  private static class GetTemperatureReply implements Command {
+    final Device.Temperature temp;
+
+    private GetTemperatureReply(Device.Temperature temp) {
+      this.temp = temp;
+    }
+  }
+
+  public static Behavior<Command> create() {
+    return Behaviors.setup(context ->
+        Behaviors.withTimers(timers -> {
+          Device.init(context.getSystem());
+
+          timers.startTimerWithFixedDelay(UpdateDevice.INSTANCE, UpdateDevice.INSTANCE, Duration.ofSeconds(1));
+          timers.startTimerWithFixedDelay(ReadTemperatures.INSTANCE, ReadTemperatures.INSTANCE, Duration.ofSeconds(15));
+
+        return new Devices(context);
+      }));
+  }
+
+  private final ActorContext<Command> context;
+  private final ClusterSharding sharding;
+  private final ActorRef<Device.Temperature> temperatureAdapter;
+  private final Random random = new Random();
+
+  private final int numberOfDevices = 50;
+
+  public Devices(ActorContext<Command> context) {
+    this.context = context;
+
+    this.sharding = ClusterSharding.get(context.getSystem());
+
+    this.temperatureAdapter =
+      context.messageAdapter(Device.Temperature.class, GetTemperatureReply::new);
+  }
+
+  @Override
+  public Receive<Command> createReceive() {
+    return newReceiveBuilder()
+          .onMessage(UpdateDevice.class, m -> receiveUpdateDevice())
+      .onMessage(ReadTemperatures.class, m -> receiveReadTemperatures())
+      .onMessage(GetTemperatureReply.class, this::receiveTemperature)
+      .build();
+  }
+
+  private Behavior<Command> receiveUpdateDevice() {
+    int deviceId = random.nextInt(numberOfDevices);
+    double temperature = 5 + 30 * random.nextDouble();
+    Device.RecordTemperature msg = new Device.RecordTemperature(deviceId, temperature);
+    context.getLog().info("Sending {}", msg);
+    entityRefFor(deviceId).tell(msg);
+    return this;
+  }
+
+  private Behavior<Command> receiveReadTemperatures() {
+    for (int deviceId = 0; deviceId < numberOfDevices; deviceId++) {
+      entityRefFor(deviceId).tell(new Device.GetTemperature(deviceId, temperatureAdapter));
+    }
+    return this;
+  }
+
+  private EntityRef<Device.Command> entityRefFor(int deviceId) {
+    return sharding.entityRefFor(Device.TYPE_KEY, String.valueOf(deviceId));
+  }
+
+  private Behavior<Command> receiveTemperature(GetTemperatureReply reply) {
+    Device.Temperature temp = reply.temp;
+    if (temp.readings > 0)
+      context.getLog().info(
+        "Temperature of device {} is {} with average {} after {} readings",
+        temp.deviceId,
+        temp.latest,
+        temp.average,
+        temp.readings
+      );
+    return this;
+  }
+
+}
diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Message.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Message.java
new file mode 100644
index 0000000..e3e9fa7
--- /dev/null
+++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Message.java
@@ -0,0 +1,7 @@
+package sample.sharding;
+
+/**
+ * Marker interface for actor messages that are serialized.
+ */
+public interface Message {
+}
diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/ShardingApp.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/ShardingApp.java
new file mode 100644
index 0000000..3412140
--- /dev/null
+++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/ShardingApp.java
@@ -0,0 +1,27 @@
+package sample.sharding;
+
+import akka.actor.typed.ActorSystem;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class ShardingApp {
+
+  public static void main(String[] args) {
+    if (args.length == 0)
+      startup(new String[] { "2551", "2552", "0" });
+    else
+      startup(args);
+  }
+
+  public static void startup(String[] ports) {
+    for (String port : ports) {
+      // Override the configuration of the port
+      Config config = ConfigFactory.parseString(
+          "akka.remote.artery.canonical.port=" + port).withFallback(
+          ConfigFactory.load());
+
+      // Create an Akka system
+      ActorSystem.create(Devices.create(), "ShardingSystem", config);
+    }
+  }
+}
diff --git a/akka-sample-sharding-typed-java/src/main/resources/application.conf b/akka-sample-sharding-typed-java/src/main/resources/application.conf
new file mode 100644
index 0000000..0eef996
--- /dev/null
+++ b/akka-sample-sharding-typed-java/src/main/resources/application.conf
@@ -0,0 +1,32 @@
+akka {
+  loglevel = INFO
+
+  actor {
+    provider = "cluster"
+
+    serialization-bindings {
+      "sample.sharding.Message" = jackson-cbor
+    }
+  }
+  
+  # For the sample, just bind to loopback and do not allow access from the network
+  # the port is overridden by the logic in main class
+  remote.artery {
+    canonical.port = 0
+    canonical.hostname = 127.0.0.1
+  }
+
+  cluster {
+    seed-nodes = [
+      "akka://ShardingSystem@127.0.0.1:2551",
+      "akka://ShardingSystem@127.0.0.1:2552"]
+
+    # auto downing is NOT safe for production deployments.
+    # you may want to use it during development, read more about it in the docs.
+    auto-down-unreachable-after = 10s
+  }
+
+  sharding.number-of-shards = 100
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 03/09: roll 1 - scala

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 359e08dbafbd7ee8c0ac9acf52925956cf704d55
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Wed Jun 19 11:25:28 2019 +0200

    roll 1 - scala
    
    * update to Akka 2.6.x first
    * and add dependeny `akka-cluster-sharding-typed` although the actors remain untyped in first rollout
    * define akka.cluster.sharding.number-of-shards config
      * same value as you have used previously for number of shards
      * same value as you will use for Typed
      * otherwise Typed node will not be able to join in roll 2
    * add akka.cluster.sharding.typed.ShardingEnvelope to message extractor
    * sender and ask
      * add replyTo ActorRef to the messages that are using sender for the reply
      * change from `akka.pattern.ask` to `akka.pattern.extended.ask` to populate
        the replyTo field in the messages
      * use the replyTo field instead of sender() unless it is null
---
 akka-sample-sharding-scala/build.sbt                  |  1 +
 .../src/main/resources/application.conf               |  3 +++
 .../src/main/scala/sample/sharding/Device.scala       | 10 +++++++---
 .../src/main/scala/sample/sharding/Devices.scala      | 19 ++++++++++++++-----
 4 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt
index 67ecc9d..f18acde 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -20,6 +20,7 @@ lazy val `akka-sample-sharding-scala` = project
     javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"),
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
+      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
       "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
       "org.scalatest" %% "scalatest" % "3.0.8" % Test
     ),
diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf
index 6cdfff3..193471a 100644
--- a/akka-sample-sharding-scala/src/main/resources/application.conf
+++ b/akka-sample-sharding-scala/src/main/resources/application.conf
@@ -23,3 +23,6 @@ akka {
   }
 
 }
+
+akka.cluster.sharding.number-of-shards = 100
+
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
index 19b0488..610f5f4 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
@@ -13,7 +13,7 @@ object Device {
   case class RecordTemperature(deviceId: Int, temperature: Double)
       extends Command
 
-  case class GetTemperature(deviceId: Int) extends Command
+  case class GetTemperature(deviceId: Int, replyTo: ActorRef) extends Command
 
   case class Temperature(deviceId: Int,
                          average: Double,
@@ -38,13 +38,17 @@ class Device extends Actor with ActorLogging {
       )
       context.become(counting(temperatures))
 
-    case GetTemperature(id) =>
+    case GetTemperature(id, replyTo) =>
       val reply =
         if (values.isEmpty)
           Temperature(id, Double.NaN, Double.NaN, 0)
         else
           Temperature(id, average(values), values.last, values.size)
-      sender() ! reply
+
+      if (replyTo == null)
+        sender() ! reply
+      else
+        replyTo ! reply
 
   }
 
diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
index 5803a76..9d97f92 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
@@ -5,7 +5,8 @@ import scala.util.Random
 
 import akka.actor._
 import akka.cluster.sharding._
-import akka.pattern.ask
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.pattern.extended.ask // note extended.ask
 import akka.pattern.pipe
 import akka.util.Timeout
 
@@ -24,7 +25,9 @@ class Devices extends Actor with ActorLogging with Timers {
 
   private val extractEntityId: ShardRegion.ExtractEntityId = {
     case msg @ Device.RecordTemperature(id, _) => (id.toString, msg)
-    case msg @ Device.GetTemperature(id)       => (id.toString, msg)
+    case msg @ Device.GetTemperature(id, _)    => (id.toString, msg)
+    case ShardingEnvelope(_, msg @ Device.RecordTemperature(id, _)) =>
+      (id.toString, msg)
   }
 
   private val numberOfShards = 100
@@ -32,11 +35,15 @@ class Devices extends Actor with ActorLogging with Timers {
   private val extractShardId: ShardRegion.ExtractShardId = {
     case Device.RecordTemperature(id, _) =>
       (math.abs(id.hashCode) % numberOfShards).toString
-    case Device.GetTemperature(id) =>
+    case Device.GetTemperature(id, _) =>
       (math.abs(id.hashCode) % numberOfShards).toString
     // Needed if you want to use 'remember entities':
     case ShardRegion.StartEntity(id) =>
       (math.abs(id.hashCode) % numberOfShards).toString
+    case ShardingEnvelope(_, Device.RecordTemperature(id, _)) =>
+      (math.abs(id.hashCode) % numberOfShards).toString
+    case ShardingEnvelope(_, Device.GetTemperature(id, _)) =>
+      (math.abs(id.hashCode) % numberOfShards).toString
   }
 
   val deviceRegion: ActorRef = ClusterSharding(context.system).start(
@@ -70,9 +77,11 @@ class Devices extends Actor with ActorLogging with Timers {
         if (deviceId >= 40) {
           import context.dispatcher
           implicit val timeout = Timeout(3.seconds)
-          deviceRegion.ask(Device.GetTemperature(deviceId)).pipeTo(self)
+          deviceRegion
+            .ask(replyTo => Device.GetTemperature(deviceId, replyTo))
+            .pipeTo(self)
         } else
-          deviceRegion ! Device.GetTemperature(deviceId)
+          deviceRegion ! Device.GetTemperature(deviceId, self)
       }
 
     case temp: Device.Temperature =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 06/09: test ask - java

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit ba48a34466ec1899cb2dd74e7b844bd2eed2dd1f
Author: Patrik Nordwall <pa...@gmail.com>
AuthorDate: Fri Jul 12 07:20:36 2019 +0200

    test ask - java
---
 .../src/main/java/sample/sharding/Devices.java                | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
index aba56b6..2ada983 100644
--- a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
+++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
@@ -2,6 +2,7 @@ package sample.sharding;
 
 import java.time.Duration;
 import java.util.Random;
+import java.util.concurrent.CompletionStage;
 
 import akka.actor.AbstractActorWithTimers;
 
@@ -13,6 +14,7 @@ import akka.event.LoggingAdapter;
 import akka.cluster.sharding.ClusterSharding;
 import akka.cluster.sharding.ClusterShardingSettings;
 import akka.cluster.sharding.ShardRegion;
+import akka.pattern.Patterns;
 
 public class Devices extends AbstractActorWithTimers {
 
@@ -84,8 +86,13 @@ public class Devices extends AbstractActorWithTimers {
   }
 
   private void receiveReadTemperatures() {
-    for (int id = 0; id < numberOfDevices; id++) {
-      deviceRegion.tell(new Device.GetTemperature(id), getSelf());
+    for (int deviceId = 0; deviceId < numberOfDevices; deviceId++) {
+      if (deviceId >= 40) {
+        CompletionStage<Object> reply = Patterns.ask(deviceRegion, new Device.GetTemperature(deviceId), Duration.ofSeconds(3));
+        Patterns.pipe(reply, getContext().getDispatcher()).to(getSelf());
+      } else {
+        deviceRegion.tell(new Device.GetTemperature(deviceId), getSelf());
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org