You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:19 UTC

[incubator-pekko-samples] 01/01: alternate ShoppingCart based on ORMultiMap (#44)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-updated-cart
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 6244bf715be243c06797c0aa4c0e249600329f5e
Author: gosubpl <gi...@gosub.pl>
AuthorDate: Fri Jun 15 09:00:50 2018 +0200

    alternate ShoppingCart based on ORMultiMap (#44)
---
 .../sample/distributeddata/ShoppingORCart.scala    | 143 +++++++++++++++++++++
 .../distributeddata/ShoppingORCartSpec.scala       | 116 +++++++++++++++++
 2 files changed, 259 insertions(+)

diff --git a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala
new file mode 100644
index 0000000..aefbf4e
--- /dev/null
+++ b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala
@@ -0,0 +1,143 @@
+package sample.distributeddata
+
+import java.util.UUID
+
+import scala.concurrent.duration._
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.cluster.Cluster
+import akka.cluster.ddata.{DistributedData, ORMultiMap, ORMultiMapKey}
+
+import scala.collection.immutable.HashSet
+
+object ShoppingORCart {
+  import akka.cluster.ddata.Replicator._
+
+  def props(userId: String): Props = Props(new ShoppingORCart(userId))
+
+  case object GetCart
+  final case class ChangeItemQuantity(item: LineItem)
+  final case class RemoveItem(productId: String)
+
+  final case class Cart(items: Set[LineItem])
+  final case class LineItem(productId: String, title: String, quantity: Int)
+  final case class SingleLineItem(productId: String, title: String, uid: String)
+
+  //#read-write-majority-orcart
+  private val timeout = 3.seconds
+  private val readMajority = ReadMajority(timeout)
+  private val writeMajority = WriteMajority(timeout)
+  //#read-write-majority-orcart
+
+  def getUniqueId(implicit cluster: Cluster): String = {
+    cluster.selfUniqueAddress.toString + UUID.randomUUID().toString
+  }
+
+}
+
+class ShoppingORCart(userId: String) extends Actor {
+  import ShoppingORCart._
+  import akka.cluster.ddata.Replicator._
+
+  val replicator = DistributedData(context.system).replicator
+  implicit val cluster = Cluster(context.system)
+
+  val DataKey = ORMultiMapKey[String, SingleLineItem]("cart-" + userId)
+
+  def receive = receiveGetCart
+    .orElse[Any, Unit](receiveAddItem)
+    .orElse[Any, Unit](receiveRemoveItem)
+    .orElse[Any, Unit](receiveOther)
+
+  //#get-orcart
+  def receiveGetCart: Receive = {
+    case GetCart ⇒
+      replicator ! Get(DataKey, readMajority, Some(sender()))
+
+    case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) ⇒
+      val data = g.get(DataKey)
+      val entrySet = data.entries.groupBy(_._1).map {
+        entry ⇒ LineItem(entry._2.values.head.head.productId, entry._2.values.head.head.title, entry._2.values.head.size)
+      }.toSet
+      val cart = Cart(entrySet)
+      replyTo ! cart
+
+    case NotFound(DataKey, Some(replyTo: ActorRef)) ⇒
+      replyTo ! Cart(Set.empty)
+
+    case GetFailure(DataKey, Some(replyTo: ActorRef)) ⇒
+      // ReadMajority failure, try again with local read
+      replicator ! Get(DataKey, ReadLocal, Some(replyTo))
+  }
+  //#get-orcart
+
+  //#add-item-orcart
+  def receiveAddItem: Receive = {
+    case cmd @ ChangeItemQuantity(item) ⇒
+      val update = Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, Some(cmd)) {
+        cart ⇒ updateCart(cart, item)
+      }
+      replicator ! update
+  }
+  //#add-item-orcart
+
+  def updateCart(data: ORMultiMap[String, SingleLineItem], item: LineItem): ORMultiMap[String, SingleLineItem] =
+    data.get(item.productId) match {
+      case Some(entries) ⇒
+        val existingQuantity = entries.size
+        if (existingQuantity == item.quantity) {
+          data
+        } else if (existingQuantity < item.quantity) {
+          var newEntries = HashSet[SingleLineItem]()
+          (1 to (item.quantity - existingQuantity)).foreach { _ ⇒
+            newEntries = newEntries + SingleLineItem(item.productId, item.title, getUniqueId)
+          }
+          val ops = newEntries.foldLeft(data) { case (d, item) ⇒ d.addBinding(item.productId, item) }
+          ops
+        } else {
+          val existingItems = entries.toVector
+          val ops = (1 to (existingQuantity - item.quantity)).foldLeft(data) {
+            case (d, index) ⇒
+              d.removeBinding(item.productId, existingItems(index - 1))
+          }
+          ops
+        }
+      case None ⇒
+        var items: Set[SingleLineItem] = new HashSet[SingleLineItem]()
+        (1 to item.quantity).foreach { _ ⇒
+          items = items + SingleLineItem(item.productId, item.title, getUniqueId)
+        }
+        data + (item.productId → items)
+    }
+
+  //#remove-item-orcart
+  def receiveRemoveItem: Receive = {
+    case cmd @ RemoveItem(productId) ⇒
+      // Try to fetch latest from a majority of nodes first, since ORMap
+      // remove must have seen the item to be able to remove it.
+      replicator ! Get(DataKey, readMajority, Some(cmd))
+
+    case GetSuccess(DataKey, Some(RemoveItem(productId))) ⇒
+      replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) {
+        _ - productId // be careful, use ORMultiMap.emptyWithValueDeltas for safety, LWWMap and vanilla ORMultiMap can result in merge anomaly
+      }
+
+    case GetFailure(DataKey, Some(RemoveItem(productId))) ⇒
+      // ReadMajority failed, fall back to best effort local value
+      replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) {
+        _ - productId
+      }
+
+    case NotFound(DataKey, Some(RemoveItem(productId))) ⇒
+    // nothing to remove
+  }
+  //#remove-item-orcart
+
+  def receiveOther: Receive = {
+    case _: UpdateSuccess[_] | _: UpdateTimeout[_] ⇒
+    // UpdateTimeout, will eventually be replicated
+    case e: UpdateFailure[_]                       ⇒ throw new IllegalStateException("Unexpected failure: " + e)
+  }
+
+}
diff --git a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala
new file mode 100644
index 0000000..3a7ca2e
--- /dev/null
+++ b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala
@@ -0,0 +1,116 @@
+package sample.distributeddata
+
+import scala.concurrent.duration._
+import akka.cluster.Cluster
+import akka.cluster.ddata.DistributedData
+import akka.cluster.ddata.Replicator.GetReplicaCount
+import akka.cluster.ddata.Replicator.ReplicaCount
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+
+object ShoppingORCartSpec extends MultiNodeConfig {
+  val node1 = role("node-1")
+  val node2 = role("node-2")
+  val node3 = role("node-3")
+
+  commonConfig(ConfigFactory.parseString("""
+    akka.loglevel = INFO
+    akka.actor.provider = "cluster"
+    akka.log-dead-letters-during-shutdown = off
+    """))
+
+}
+
+class ShoppingORCartSpecMultiJvmNode1 extends ShoppingORCartSpec
+class ShoppingORCartSpecMultiJvmNode2 extends ShoppingORCartSpec
+class ShoppingORCartSpecMultiJvmNode3 extends ShoppingORCartSpec
+
+class ShoppingORCartSpec extends MultiNodeSpec(ShoppingORCartSpec) with STMultiNodeSpec with ImplicitSender {
+  import ShoppingORCartSpec._
+  import ShoppingORCart._
+
+  override def initialParticipants = roles.size
+
+  val cluster = Cluster(system)
+  val shoppingORCart = system.actorOf(ShoppingORCart.props("user-1"))
+
+  def join(from: RoleName, to: RoleName): Unit = {
+    runOn(from) {
+      cluster join node(to).address
+    }
+    enterBarrier(from.name + "-joined")
+  }
+
+  "Demo of a replicated shopping cart" must {
+    "join cluster" in within(20.seconds) {
+      join(node1, node1)
+      join(node2, node1)
+      join(node3, node1)
+
+      awaitAssert {
+        DistributedData(system).replicator ! GetReplicaCount
+        expectMsg(ReplicaCount(roles.size))
+      }
+      enterBarrier("after-1")
+    }
+
+    "handle updates directly after start" in within(15.seconds) {
+      runOn(node2) {
+        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2))
+        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("2", "Oranges", quantity = 3))
+      }
+      enterBarrier("updates-done")
+
+      awaitAssert {
+        shoppingORCart ! ShoppingORCart.GetCart
+        val cart = expectMsgType[Cart]
+        cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3)))
+      }
+
+      enterBarrier("after-2")
+    }
+
+    "handle updates from different nodes" in within(5.seconds) {
+      runOn(node2) {
+        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 5))
+        shoppingORCart ! ShoppingORCart.RemoveItem("2")
+      }
+      runOn(node3) {
+        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 4))
+      }
+      enterBarrier("updates-done")
+
+      awaitAssert {
+        shoppingORCart ! ShoppingORCart.GetCart
+        val cart = expectMsgType[Cart]
+        cart.items should be(Set(LineItem("1", "Apples", quantity = 5), LineItem("3", "Bananas", quantity = 4)))
+      }
+
+      enterBarrier("after-3")
+    }
+
+    "handle more updates from different nodes" in within(5.seconds) {
+      runOn(node2) {
+        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2))
+      }
+      runOn(node3) {
+        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 6))
+      }
+      enterBarrier("updates-done")
+
+      awaitAssert {
+        shoppingORCart ! ShoppingORCart.GetCart
+        val cart = expectMsgType[Cart]
+        cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("3", "Bananas", quantity = 6)))
+      }
+
+      enterBarrier("after-3")
+    }
+
+  }
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org