You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:52 UTC
[incubator-pekko-samples] 02/02: Revert "alternate ShoppingCart based on ORMultiMap (#44)"
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch revert-44-wip/orcart-example
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 818193fcc81c6a58bb279fde54f31734a1a58d5f
Author: Johan Andrén <jo...@markatta.com>
AuthorDate: Fri Jun 15 09:03:00 2018 +0200
Revert "alternate ShoppingCart based on ORMultiMap (#44)"
This reverts commit 0744270dd525b0a41ce11cc6a8446fc750d18495.
---
.../sample/distributeddata/ShoppingORCart.scala | 143 ---------------------
.../distributeddata/ShoppingORCartSpec.scala | 116 -----------------
2 files changed, 259 deletions(-)
diff --git a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala
deleted file mode 100644
index aefbf4e..0000000
--- a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-package sample.distributeddata
-
-import java.util.UUID
-
-import scala.concurrent.duration._
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.Props
-import akka.cluster.Cluster
-import akka.cluster.ddata.{DistributedData, ORMultiMap, ORMultiMapKey}
-
-import scala.collection.immutable.HashSet
-
-object ShoppingORCart {
- import akka.cluster.ddata.Replicator._
-
- def props(userId: String): Props = Props(new ShoppingORCart(userId))
-
- case object GetCart
- final case class ChangeItemQuantity(item: LineItem)
- final case class RemoveItem(productId: String)
-
- final case class Cart(items: Set[LineItem])
- final case class LineItem(productId: String, title: String, quantity: Int)
- final case class SingleLineItem(productId: String, title: String, uid: String)
-
- //#read-write-majority-orcart
- private val timeout = 3.seconds
- private val readMajority = ReadMajority(timeout)
- private val writeMajority = WriteMajority(timeout)
- //#read-write-majority-orcart
-
- def getUniqueId(implicit cluster: Cluster): String = {
- cluster.selfUniqueAddress.toString + UUID.randomUUID().toString
- }
-
-}
-
-class ShoppingORCart(userId: String) extends Actor {
- import ShoppingORCart._
- import akka.cluster.ddata.Replicator._
-
- val replicator = DistributedData(context.system).replicator
- implicit val cluster = Cluster(context.system)
-
- val DataKey = ORMultiMapKey[String, SingleLineItem]("cart-" + userId)
-
- def receive = receiveGetCart
- .orElse[Any, Unit](receiveAddItem)
- .orElse[Any, Unit](receiveRemoveItem)
- .orElse[Any, Unit](receiveOther)
-
- //#get-orcart
- def receiveGetCart: Receive = {
- case GetCart ⇒
- replicator ! Get(DataKey, readMajority, Some(sender()))
-
- case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) ⇒
- val data = g.get(DataKey)
- val entrySet = data.entries.groupBy(_._1).map {
- entry ⇒ LineItem(entry._2.values.head.head.productId, entry._2.values.head.head.title, entry._2.values.head.size)
- }.toSet
- val cart = Cart(entrySet)
- replyTo ! cart
-
- case NotFound(DataKey, Some(replyTo: ActorRef)) ⇒
- replyTo ! Cart(Set.empty)
-
- case GetFailure(DataKey, Some(replyTo: ActorRef)) ⇒
- // ReadMajority failure, try again with local read
- replicator ! Get(DataKey, ReadLocal, Some(replyTo))
- }
- //#get-orcart
-
- //#add-item-orcart
- def receiveAddItem: Receive = {
- case cmd @ ChangeItemQuantity(item) ⇒
- val update = Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, Some(cmd)) {
- cart ⇒ updateCart(cart, item)
- }
- replicator ! update
- }
- //#add-item-orcart
-
- def updateCart(data: ORMultiMap[String, SingleLineItem], item: LineItem): ORMultiMap[String, SingleLineItem] =
- data.get(item.productId) match {
- case Some(entries) ⇒
- val existingQuantity = entries.size
- if (existingQuantity == item.quantity) {
- data
- } else if (existingQuantity < item.quantity) {
- var newEntries = HashSet[SingleLineItem]()
- (1 to (item.quantity - existingQuantity)).foreach { _ ⇒
- newEntries = newEntries + SingleLineItem(item.productId, item.title, getUniqueId)
- }
- val ops = newEntries.foldLeft(data) { case (d, item) ⇒ d.addBinding(item.productId, item) }
- ops
- } else {
- val existingItems = entries.toVector
- val ops = (1 to (existingQuantity - item.quantity)).foldLeft(data) {
- case (d, index) ⇒
- d.removeBinding(item.productId, existingItems(index - 1))
- }
- ops
- }
- case None ⇒
- var items: Set[SingleLineItem] = new HashSet[SingleLineItem]()
- (1 to item.quantity).foreach { _ ⇒
- items = items + SingleLineItem(item.productId, item.title, getUniqueId)
- }
- data + (item.productId → items)
- }
-
- //#remove-item-orcart
- def receiveRemoveItem: Receive = {
- case cmd @ RemoveItem(productId) ⇒
- // Try to fetch latest from a majority of nodes first, since ORMap
- // remove must have seen the item to be able to remove it.
- replicator ! Get(DataKey, readMajority, Some(cmd))
-
- case GetSuccess(DataKey, Some(RemoveItem(productId))) ⇒
- replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) {
- _ - productId // be careful, use ORMultiMap.emptyWithValueDeltas for safety, LWWMap and vanilla ORMultiMap can result in merge anomaly
- }
-
- case GetFailure(DataKey, Some(RemoveItem(productId))) ⇒
- // ReadMajority failed, fall back to best effort local value
- replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) {
- _ - productId
- }
-
- case NotFound(DataKey, Some(RemoveItem(productId))) ⇒
- // nothing to remove
- }
- //#remove-item-orcart
-
- def receiveOther: Receive = {
- case _: UpdateSuccess[_] | _: UpdateTimeout[_] ⇒
- // UpdateTimeout, will eventually be replicated
- case e: UpdateFailure[_] ⇒ throw new IllegalStateException("Unexpected failure: " + e)
- }
-
-}
diff --git a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala
deleted file mode 100644
index 3a7ca2e..0000000
--- a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-package sample.distributeddata
-
-import scala.concurrent.duration._
-import akka.cluster.Cluster
-import akka.cluster.ddata.DistributedData
-import akka.cluster.ddata.Replicator.GetReplicaCount
-import akka.cluster.ddata.Replicator.ReplicaCount
-import akka.remote.testconductor.RoleName
-import akka.remote.testkit.MultiNodeConfig
-import akka.remote.testkit.MultiNodeSpec
-import akka.testkit._
-import com.typesafe.config.ConfigFactory
-
-object ShoppingORCartSpec extends MultiNodeConfig {
- val node1 = role("node-1")
- val node2 = role("node-2")
- val node3 = role("node-3")
-
- commonConfig(ConfigFactory.parseString("""
- akka.loglevel = INFO
- akka.actor.provider = "cluster"
- akka.log-dead-letters-during-shutdown = off
- """))
-
-}
-
-class ShoppingORCartSpecMultiJvmNode1 extends ShoppingORCartSpec
-class ShoppingORCartSpecMultiJvmNode2 extends ShoppingORCartSpec
-class ShoppingORCartSpecMultiJvmNode3 extends ShoppingORCartSpec
-
-class ShoppingORCartSpec extends MultiNodeSpec(ShoppingORCartSpec) with STMultiNodeSpec with ImplicitSender {
- import ShoppingORCartSpec._
- import ShoppingORCart._
-
- override def initialParticipants = roles.size
-
- val cluster = Cluster(system)
- val shoppingORCart = system.actorOf(ShoppingORCart.props("user-1"))
-
- def join(from: RoleName, to: RoleName): Unit = {
- runOn(from) {
- cluster join node(to).address
- }
- enterBarrier(from.name + "-joined")
- }
-
- "Demo of a replicated shopping cart" must {
- "join cluster" in within(20.seconds) {
- join(node1, node1)
- join(node2, node1)
- join(node3, node1)
-
- awaitAssert {
- DistributedData(system).replicator ! GetReplicaCount
- expectMsg(ReplicaCount(roles.size))
- }
- enterBarrier("after-1")
- }
-
- "handle updates directly after start" in within(15.seconds) {
- runOn(node2) {
- shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2))
- shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("2", "Oranges", quantity = 3))
- }
- enterBarrier("updates-done")
-
- awaitAssert {
- shoppingORCart ! ShoppingORCart.GetCart
- val cart = expectMsgType[Cart]
- cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3)))
- }
-
- enterBarrier("after-2")
- }
-
- "handle updates from different nodes" in within(5.seconds) {
- runOn(node2) {
- shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 5))
- shoppingORCart ! ShoppingORCart.RemoveItem("2")
- }
- runOn(node3) {
- shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 4))
- }
- enterBarrier("updates-done")
-
- awaitAssert {
- shoppingORCart ! ShoppingORCart.GetCart
- val cart = expectMsgType[Cart]
- cart.items should be(Set(LineItem("1", "Apples", quantity = 5), LineItem("3", "Bananas", quantity = 4)))
- }
-
- enterBarrier("after-3")
- }
-
- "handle more updates from different nodes" in within(5.seconds) {
- runOn(node2) {
- shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2))
- }
- runOn(node3) {
- shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 6))
- }
- enterBarrier("updates-done")
-
- awaitAssert {
- shoppingORCart ! ShoppingORCart.GetCart
- val cart = expectMsgType[Cart]
- cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("3", "Bananas", quantity = 6)))
- }
-
- enterBarrier("after-3")
- }
-
- }
-
-}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org