You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:19 UTC
[incubator-pekko-samples] 01/01: alternate ShoppingCart based on ORMultiMap (#44)
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-updated-cart
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 6244bf715be243c06797c0aa4c0e249600329f5e
Author: gosubpl <gi...@gosub.pl>
AuthorDate: Fri Jun 15 09:00:50 2018 +0200
alternate ShoppingCart based on ORMultiMap (#44)
---
.../sample/distributeddata/ShoppingORCart.scala | 143 +++++++++++++++++++++
.../distributeddata/ShoppingORCartSpec.scala | 116 +++++++++++++++++
2 files changed, 259 insertions(+)
diff --git a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala
new file mode 100644
index 0000000..aefbf4e
--- /dev/null
+++ b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala
@@ -0,0 +1,143 @@
+package sample.distributeddata
+
+import java.util.UUID
+
+import scala.concurrent.duration._
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.cluster.Cluster
+import akka.cluster.ddata.{DistributedData, ORMultiMap, ORMultiMapKey}
+
+import scala.collection.immutable.HashSet
+
+object ShoppingORCart {
+ import akka.cluster.ddata.Replicator._
+
+ def props(userId: String): Props = Props(new ShoppingORCart(userId))
+
+ case object GetCart
+ final case class ChangeItemQuantity(item: LineItem)
+ final case class RemoveItem(productId: String)
+
+ final case class Cart(items: Set[LineItem])
+ final case class LineItem(productId: String, title: String, quantity: Int)
+ final case class SingleLineItem(productId: String, title: String, uid: String)
+
+ //#read-write-majority-orcart
+ private val timeout = 3.seconds
+ private val readMajority = ReadMajority(timeout)
+ private val writeMajority = WriteMajority(timeout)
+ //#read-write-majority-orcart
+
+ def getUniqueId(implicit cluster: Cluster): String = {
+ cluster.selfUniqueAddress.toString + UUID.randomUUID().toString
+ }
+
+}
+
+class ShoppingORCart(userId: String) extends Actor {
+ import ShoppingORCart._
+ import akka.cluster.ddata.Replicator._
+
+ val replicator = DistributedData(context.system).replicator
+ implicit val cluster = Cluster(context.system)
+
+ val DataKey = ORMultiMapKey[String, SingleLineItem]("cart-" + userId)
+
+ def receive = receiveGetCart
+ .orElse[Any, Unit](receiveAddItem)
+ .orElse[Any, Unit](receiveRemoveItem)
+ .orElse[Any, Unit](receiveOther)
+
+ //#get-orcart
+ def receiveGetCart: Receive = {
+ case GetCart ⇒
+ replicator ! Get(DataKey, readMajority, Some(sender()))
+
+ case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) ⇒
+ val data = g.get(DataKey)
+ val entrySet = data.entries.groupBy(_._1).map {
+ entry ⇒ LineItem(entry._2.values.head.head.productId, entry._2.values.head.head.title, entry._2.values.head.size)
+ }.toSet
+ val cart = Cart(entrySet)
+ replyTo ! cart
+
+ case NotFound(DataKey, Some(replyTo: ActorRef)) ⇒
+ replyTo ! Cart(Set.empty)
+
+ case GetFailure(DataKey, Some(replyTo: ActorRef)) ⇒
+ // ReadMajority failure, try again with local read
+ replicator ! Get(DataKey, ReadLocal, Some(replyTo))
+ }
+ //#get-orcart
+
+ //#add-item-orcart
+ def receiveAddItem: Receive = {
+ case cmd @ ChangeItemQuantity(item) ⇒
+ val update = Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, Some(cmd)) {
+ cart ⇒ updateCart(cart, item)
+ }
+ replicator ! update
+ }
+ //#add-item-orcart
+
+ def updateCart(data: ORMultiMap[String, SingleLineItem], item: LineItem): ORMultiMap[String, SingleLineItem] =
+ data.get(item.productId) match {
+ case Some(entries) ⇒
+ val existingQuantity = entries.size
+ if (existingQuantity == item.quantity) {
+ data
+ } else if (existingQuantity < item.quantity) {
+ var newEntries = HashSet[SingleLineItem]()
+ (1 to (item.quantity - existingQuantity)).foreach { _ ⇒
+ newEntries = newEntries + SingleLineItem(item.productId, item.title, getUniqueId)
+ }
+ val ops = newEntries.foldLeft(data) { case (d, item) ⇒ d.addBinding(item.productId, item) }
+ ops
+ } else {
+ val existingItems = entries.toVector
+ val ops = (1 to (existingQuantity - item.quantity)).foldLeft(data) {
+ case (d, index) ⇒
+ d.removeBinding(item.productId, existingItems(index - 1))
+ }
+ ops
+ }
+ case None ⇒
+ var items: Set[SingleLineItem] = new HashSet[SingleLineItem]()
+ (1 to item.quantity).foreach { _ ⇒
+ items = items + SingleLineItem(item.productId, item.title, getUniqueId)
+ }
+ data + (item.productId → items)
+ }
+
+ //#remove-item-orcart
+ def receiveRemoveItem: Receive = {
+ case cmd @ RemoveItem(productId) ⇒
+ // Try to fetch latest from a majority of nodes first, since ORMap
+ // remove must have seen the item to be able to remove it.
+ replicator ! Get(DataKey, readMajority, Some(cmd))
+
+ case GetSuccess(DataKey, Some(RemoveItem(productId))) ⇒
+ replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) {
+ _ - productId // be careful, use ORMultiMap.emptyWithValueDeltas for safety, LWWMap and vanilla ORMultiMap can result in merge anomaly
+ }
+
+ case GetFailure(DataKey, Some(RemoveItem(productId))) ⇒
+ // ReadMajority failed, fall back to best effort local value
+ replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) {
+ _ - productId
+ }
+
+ case NotFound(DataKey, Some(RemoveItem(productId))) ⇒
+ // nothing to remove
+ }
+ //#remove-item-orcart
+
+ def receiveOther: Receive = {
+ case _: UpdateSuccess[_] | _: UpdateTimeout[_] ⇒
+ // UpdateTimeout, will eventually be replicated
+ case e: UpdateFailure[_] ⇒ throw new IllegalStateException("Unexpected failure: " + e)
+ }
+
+}
diff --git a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala
new file mode 100644
index 0000000..3a7ca2e
--- /dev/null
+++ b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala
@@ -0,0 +1,116 @@
+package sample.distributeddata
+
+import scala.concurrent.duration._
+import akka.cluster.Cluster
+import akka.cluster.ddata.DistributedData
+import akka.cluster.ddata.Replicator.GetReplicaCount
+import akka.cluster.ddata.Replicator.ReplicaCount
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+
+object ShoppingORCartSpec extends MultiNodeConfig {
+ val node1 = role("node-1")
+ val node2 = role("node-2")
+ val node3 = role("node-3")
+
+ commonConfig(ConfigFactory.parseString("""
+ akka.loglevel = INFO
+ akka.actor.provider = "cluster"
+ akka.log-dead-letters-during-shutdown = off
+ """))
+
+}
+
+class ShoppingORCartSpecMultiJvmNode1 extends ShoppingORCartSpec
+class ShoppingORCartSpecMultiJvmNode2 extends ShoppingORCartSpec
+class ShoppingORCartSpecMultiJvmNode3 extends ShoppingORCartSpec
+
+class ShoppingORCartSpec extends MultiNodeSpec(ShoppingORCartSpec) with STMultiNodeSpec with ImplicitSender {
+ import ShoppingORCartSpec._
+ import ShoppingORCart._
+
+ override def initialParticipants = roles.size
+
+ val cluster = Cluster(system)
+ val shoppingORCart = system.actorOf(ShoppingORCart.props("user-1"))
+
+ def join(from: RoleName, to: RoleName): Unit = {
+ runOn(from) {
+ cluster join node(to).address
+ }
+ enterBarrier(from.name + "-joined")
+ }
+
+ "Demo of a replicated shopping cart" must {
+ "join cluster" in within(20.seconds) {
+ join(node1, node1)
+ join(node2, node1)
+ join(node3, node1)
+
+ awaitAssert {
+ DistributedData(system).replicator ! GetReplicaCount
+ expectMsg(ReplicaCount(roles.size))
+ }
+ enterBarrier("after-1")
+ }
+
+ "handle updates directly after start" in within(15.seconds) {
+ runOn(node2) {
+ shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2))
+ shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("2", "Oranges", quantity = 3))
+ }
+ enterBarrier("updates-done")
+
+ awaitAssert {
+ shoppingORCart ! ShoppingORCart.GetCart
+ val cart = expectMsgType[Cart]
+ cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3)))
+ }
+
+ enterBarrier("after-2")
+ }
+
+ "handle updates from different nodes" in within(5.seconds) {
+ runOn(node2) {
+ shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 5))
+ shoppingORCart ! ShoppingORCart.RemoveItem("2")
+ }
+ runOn(node3) {
+ shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 4))
+ }
+ enterBarrier("updates-done")
+
+ awaitAssert {
+ shoppingORCart ! ShoppingORCart.GetCart
+ val cart = expectMsgType[Cart]
+ cart.items should be(Set(LineItem("1", "Apples", quantity = 5), LineItem("3", "Bananas", quantity = 4)))
+ }
+
+ enterBarrier("after-3")
+ }
+
+ "handle more updates from different nodes" in within(5.seconds) {
+ runOn(node2) {
+ shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2))
+ }
+ runOn(node3) {
+ shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 6))
+ }
+ enterBarrier("updates-done")
+
+ awaitAssert {
+ shoppingORCart ! ShoppingORCart.GetCart
+ val cart = expectMsgType[Cart]
+ cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("3", "Bananas", quantity = 6)))
+ }
+
+ enterBarrier("after-3")
+ }
+
+ }
+
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org