You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:52 UTC

[incubator-pekko-samples] 02/02: Revert "alternate ShoppingCart based on ORMultiMap (#44)"

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch revert-44-wip/orcart-example
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 818193fcc81c6a58bb279fde54f31734a1a58d5f
Author: Johan Andrén <jo...@markatta.com>
AuthorDate: Fri Jun 15 09:03:00 2018 +0200

    Revert "alternate ShoppingCart based on ORMultiMap (#44)"
    
    This reverts commit 0744270dd525b0a41ce11cc6a8446fc750d18495.
---
 .../sample/distributeddata/ShoppingORCart.scala    | 143 ---------------------
 .../distributeddata/ShoppingORCartSpec.scala       | 116 -----------------
 2 files changed, 259 deletions(-)

diff --git a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala
deleted file mode 100644
index aefbf4e..0000000
--- a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-package sample.distributeddata
-
-import java.util.UUID
-
-import scala.concurrent.duration._
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.Props
-import akka.cluster.Cluster
-import akka.cluster.ddata.{DistributedData, ORMultiMap, ORMultiMapKey}
-
-import scala.collection.immutable.HashSet
-
-object ShoppingORCart {
-  import akka.cluster.ddata.Replicator._
-
-  def props(userId: String): Props = Props(new ShoppingORCart(userId))
-
-  case object GetCart
-  final case class ChangeItemQuantity(item: LineItem)
-  final case class RemoveItem(productId: String)
-
-  final case class Cart(items: Set[LineItem])
-  final case class LineItem(productId: String, title: String, quantity: Int)
-  final case class SingleLineItem(productId: String, title: String, uid: String)
-
-  //#read-write-majority-orcart
-  private val timeout = 3.seconds
-  private val readMajority = ReadMajority(timeout)
-  private val writeMajority = WriteMajority(timeout)
-  //#read-write-majority-orcart
-
-  def getUniqueId(implicit cluster: Cluster): String = {
-    cluster.selfUniqueAddress.toString + UUID.randomUUID().toString
-  }
-
-}
-
-class ShoppingORCart(userId: String) extends Actor {
-  import ShoppingORCart._
-  import akka.cluster.ddata.Replicator._
-
-  val replicator = DistributedData(context.system).replicator
-  implicit val cluster = Cluster(context.system)
-
-  val DataKey = ORMultiMapKey[String, SingleLineItem]("cart-" + userId)
-
-  def receive = receiveGetCart
-    .orElse[Any, Unit](receiveAddItem)
-    .orElse[Any, Unit](receiveRemoveItem)
-    .orElse[Any, Unit](receiveOther)
-
-  //#get-orcart
-  def receiveGetCart: Receive = {
-    case GetCart ⇒
-      replicator ! Get(DataKey, readMajority, Some(sender()))
-
-    case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) ⇒
-      val data = g.get(DataKey)
-      val entrySet = data.entries.groupBy(_._1).map {
-        entry ⇒ LineItem(entry._2.values.head.head.productId, entry._2.values.head.head.title, entry._2.values.head.size)
-      }.toSet
-      val cart = Cart(entrySet)
-      replyTo ! cart
-
-    case NotFound(DataKey, Some(replyTo: ActorRef)) ⇒
-      replyTo ! Cart(Set.empty)
-
-    case GetFailure(DataKey, Some(replyTo: ActorRef)) ⇒
-      // ReadMajority failure, try again with local read
-      replicator ! Get(DataKey, ReadLocal, Some(replyTo))
-  }
-  //#get-orcart
-
-  //#add-item-orcart
-  def receiveAddItem: Receive = {
-    case cmd @ ChangeItemQuantity(item) ⇒
-      val update = Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, Some(cmd)) {
-        cart ⇒ updateCart(cart, item)
-      }
-      replicator ! update
-  }
-  //#add-item-orcart
-
-  def updateCart(data: ORMultiMap[String, SingleLineItem], item: LineItem): ORMultiMap[String, SingleLineItem] =
-    data.get(item.productId) match {
-      case Some(entries) ⇒
-        val existingQuantity = entries.size
-        if (existingQuantity == item.quantity) {
-          data
-        } else if (existingQuantity < item.quantity) {
-          var newEntries = HashSet[SingleLineItem]()
-          (1 to (item.quantity - existingQuantity)).foreach { _ ⇒
-            newEntries = newEntries + SingleLineItem(item.productId, item.title, getUniqueId)
-          }
-          val ops = newEntries.foldLeft(data) { case (d, item) ⇒ d.addBinding(item.productId, item) }
-          ops
-        } else {
-          val existingItems = entries.toVector
-          val ops = (1 to (existingQuantity - item.quantity)).foldLeft(data) {
-            case (d, index) ⇒
-              d.removeBinding(item.productId, existingItems(index - 1))
-          }
-          ops
-        }
-      case None ⇒
-        var items: Set[SingleLineItem] = new HashSet[SingleLineItem]()
-        (1 to item.quantity).foreach { _ ⇒
-          items = items + SingleLineItem(item.productId, item.title, getUniqueId)
-        }
-        data + (item.productId → items)
-    }
-
-  //#remove-item-orcart
-  def receiveRemoveItem: Receive = {
-    case cmd @ RemoveItem(productId) ⇒
-      // Try to fetch latest from a majority of nodes first, since ORMap
-      // remove must have seen the item to be able to remove it.
-      replicator ! Get(DataKey, readMajority, Some(cmd))
-
-    case GetSuccess(DataKey, Some(RemoveItem(productId))) ⇒
-      replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) {
-        _ - productId // be careful, use ORMultiMap.emptyWithValueDeltas for safety, LWWMap and vanilla ORMultiMap can result in merge anomaly
-      }
-
-    case GetFailure(DataKey, Some(RemoveItem(productId))) ⇒
-      // ReadMajority failed, fall back to best effort local value
-      replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) {
-        _ - productId
-      }
-
-    case NotFound(DataKey, Some(RemoveItem(productId))) ⇒
-    // nothing to remove
-  }
-  //#remove-item-orcart
-
-  def receiveOther: Receive = {
-    case _: UpdateSuccess[_] | _: UpdateTimeout[_] ⇒
-    // UpdateTimeout, will eventually be replicated
-    case e: UpdateFailure[_]                       ⇒ throw new IllegalStateException("Unexpected failure: " + e)
-  }
-
-}
diff --git a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala
deleted file mode 100644
index 3a7ca2e..0000000
--- a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-package sample.distributeddata
-
-import scala.concurrent.duration._
-import akka.cluster.Cluster
-import akka.cluster.ddata.DistributedData
-import akka.cluster.ddata.Replicator.GetReplicaCount
-import akka.cluster.ddata.Replicator.ReplicaCount
-import akka.remote.testconductor.RoleName
-import akka.remote.testkit.MultiNodeConfig
-import akka.remote.testkit.MultiNodeSpec
-import akka.testkit._
-import com.typesafe.config.ConfigFactory
-
-object ShoppingORCartSpec extends MultiNodeConfig {
-  val node1 = role("node-1")
-  val node2 = role("node-2")
-  val node3 = role("node-3")
-
-  commonConfig(ConfigFactory.parseString("""
-    akka.loglevel = INFO
-    akka.actor.provider = "cluster"
-    akka.log-dead-letters-during-shutdown = off
-    """))
-
-}
-
-class ShoppingORCartSpecMultiJvmNode1 extends ShoppingORCartSpec
-class ShoppingORCartSpecMultiJvmNode2 extends ShoppingORCartSpec
-class ShoppingORCartSpecMultiJvmNode3 extends ShoppingORCartSpec
-
-class ShoppingORCartSpec extends MultiNodeSpec(ShoppingORCartSpec) with STMultiNodeSpec with ImplicitSender {
-  import ShoppingORCartSpec._
-  import ShoppingORCart._
-
-  override def initialParticipants = roles.size
-
-  val cluster = Cluster(system)
-  val shoppingORCart = system.actorOf(ShoppingORCart.props("user-1"))
-
-  def join(from: RoleName, to: RoleName): Unit = {
-    runOn(from) {
-      cluster join node(to).address
-    }
-    enterBarrier(from.name + "-joined")
-  }
-
-  "Demo of a replicated shopping cart" must {
-    "join cluster" in within(20.seconds) {
-      join(node1, node1)
-      join(node2, node1)
-      join(node3, node1)
-
-      awaitAssert {
-        DistributedData(system).replicator ! GetReplicaCount
-        expectMsg(ReplicaCount(roles.size))
-      }
-      enterBarrier("after-1")
-    }
-
-    "handle updates directly after start" in within(15.seconds) {
-      runOn(node2) {
-        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2))
-        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("2", "Oranges", quantity = 3))
-      }
-      enterBarrier("updates-done")
-
-      awaitAssert {
-        shoppingORCart ! ShoppingORCart.GetCart
-        val cart = expectMsgType[Cart]
-        cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3)))
-      }
-
-      enterBarrier("after-2")
-    }
-
-    "handle updates from different nodes" in within(5.seconds) {
-      runOn(node2) {
-        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 5))
-        shoppingORCart ! ShoppingORCart.RemoveItem("2")
-      }
-      runOn(node3) {
-        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 4))
-      }
-      enterBarrier("updates-done")
-
-      awaitAssert {
-        shoppingORCart ! ShoppingORCart.GetCart
-        val cart = expectMsgType[Cart]
-        cart.items should be(Set(LineItem("1", "Apples", quantity = 5), LineItem("3", "Bananas", quantity = 4)))
-      }
-
-      enterBarrier("after-3")
-    }
-
-    "handle more updates from different nodes" in within(5.seconds) {
-      runOn(node2) {
-        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2))
-      }
-      runOn(node3) {
-        shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 6))
-      }
-      enterBarrier("updates-done")
-
-      awaitAssert {
-        shoppingORCart ! ShoppingORCart.GetCart
-        val cart = expectMsgType[Cart]
-        cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("3", "Bananas", quantity = 6)))
-      }
-
-      enterBarrier("after-3")
-    }
-
-  }
-
-}
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org