You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/10/05 04:15:01 UTC
[openwhisk] branch master updated: Clean Up Etcd Worker Actor (#5323)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 236ca5e4b Clean Up Etcd Worker Actor (#5323)
236ca5e4b is described below
commit 236ca5e4b894e4cc626f685c1d0eba5c3e6077ec
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Wed Oct 5 00:14:54 2022 -0400
Clean Up Etcd Worker Actor (#5323)
* clean up etcd worker actor
* revert etcd client local change for unit testing
* fix scala 2.13 compilation
Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
.../apache/openwhisk/core/etcd/EtcdWorker.scala | 166 +++++++++++++++++++
.../core/service/DataManagementService.scala | 151 +-----------------
.../core/invoker/FPCInvokerReactive.scala | 4 +-
.../openwhisk/core/scheduler/Scheduler.scala | 4 +-
.../openwhisk/common/etcd/EtcdWorkerTests.scala | 176 +++++++++++++++++++++
5 files changed, 347 insertions(+), 154 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala
new file mode 100644
index 000000000..82d78e693
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala
@@ -0,0 +1,166 @@
+package org.apache.openwhisk.core.etcd
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props, Timers}
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.EtcdWorker.GetLeaseAndRetry
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import org.apache.openwhisk.core.service.{
+ AlreadyExist,
+ Done,
+ ElectLeader,
+ ElectionResult,
+ FinishWork,
+ GetLease,
+ InitialDataStorageResults,
+ Lease,
+ RegisterData,
+ RegisterInitialData,
+ WatcherClosed
+}
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+import scala.util.Success
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+ actorSystem: ActorSystem,
+ logging: Logging)
+ extends Actor
+ with Timers {
+
+ private val dataManagementService = context.parent
+ private var lease: Option[Lease] = None
+ leaseService ! GetLease
+
+ override def receive: Receive = {
+ case msg: Lease =>
+ lease = Some(msg)
+ case msg: GetLeaseAndRetry =>
+ logging.warn(this, msg.log)
+ if (!msg.skipLeaseRefresh) {
+ if (msg.clearLease) {
+ lease = None
+ }
+ leaseService ! GetLease
+ }
+ sendMessageToSelfAfter(msg.request, retryInterval)
+ // leader election + endpoint management
+ case request: ElectLeader =>
+ lease match {
+ case Some(l) =>
+ etcdClient
+ .electLeader(request.key, request.value, l)
+ .andThen {
+ case Success(msg) =>
+ request.recipient ! ElectionResult(msg)
+ dataManagementService ! FinishWork(request.key)
+ }
+ .recover {
+ // if there is no lease, reissue it and retry immediately
+ case t: StatusRuntimeException =>
+ self ! GetLeaseAndRetry(request, s"a lease is expired while leader election, reissue it: $t")
+ // it should retry forever until the data is stored
+ case t: Throwable =>
+ self ! GetLeaseAndRetry(
+ request,
+ s"unexpected error happened: $t, retry storing data",
+ skipLeaseRefresh = true)
+ }
+ case None =>
+ self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
+ }
+
+ // only endpoint management
+ case request: RegisterData =>
+ lease match {
+ case Some(l) =>
+ etcdClient
+ .put(request.key, request.value, l.id)
+ .andThen {
+ case Success(_) =>
+ dataManagementService ! FinishWork(request.key)
+ }
+ .recover {
+ // if there is no lease, reissue it and retry immediately
+ case t: StatusRuntimeException =>
+ self ! GetLeaseAndRetry(
+ request,
+ s"a lease is expired while registering data ${request.key}, reissue it: $t")
+ // it should retry forever until the data is stored
+ case t: Throwable =>
+ self ! GetLeaseAndRetry(
+ request,
+ s"unexpected error happened: $t, retry storing data ${request.key}",
+ skipLeaseRefresh = true)
+ }
+ case None =>
+ self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
+ }
+ // it stores the data iif there is no such one
+ case request: RegisterInitialData =>
+ lease match {
+ case Some(l) =>
+ etcdClient
+ .putTxn(request.key, request.value, 0, l.id)
+ .map { res =>
+ dataManagementService ! FinishWork(request.key)
+ if (res.getSucceeded) {
+ logging.info(this, s"initial data storing succeeds for ${request.key}")
+ request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
+ } else {
+ logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
+ request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
+ }
+ }
+ .recover {
+ // if there is no lease, reissue it and retry immediately
+ case t: StatusRuntimeException =>
+ self ! GetLeaseAndRetry(
+ request,
+ s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
+ // it should retry forever until the data is stored
+ case t: Throwable =>
+ self ! GetLeaseAndRetry(
+ request,
+ s"unexpected error happened: $t, retry storing data ${request.key}",
+ skipLeaseRefresh = true)
+ }
+ case None =>
+ self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
+ }
+
+ case msg: WatcherClosed =>
+ etcdClient
+ .del(msg.key)
+ .andThen {
+ case Success(_) =>
+ dataManagementService ! FinishWork(msg.key)
+ }
+ .recover {
+ // if there is no lease, reissue it and retry immediately
+ case t: StatusRuntimeException =>
+ self ! GetLeaseAndRetry(msg, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
+ // it should retry forever until the data is stored
+ case t: Throwable =>
+ self ! GetLeaseAndRetry(
+ msg,
+ s"unexpected error happened: $t, retry storing data for ${msg.key}",
+ skipLeaseRefresh = true)
+ }
+ }
+
+ private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
+ timers.startSingleTimer(msg, msg, retryInterval)
+ }
+}
+
+object EtcdWorker {
+ case class GetLeaseAndRetry(request: Any, log: String, clearLease: Boolean = true, skipLeaseRefresh: Boolean = false)
+
+ def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
+ actorSystem: ActorSystem,
+ logging: Logging): Props = {
+ Props(new EtcdWorker(etcdClient, leaseService))
+ }
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
index 425832679..c03070bf8 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
@@ -19,18 +19,14 @@ package org.apache.openwhisk.core.service
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.util.Timeout
-import io.grpc.StatusRuntimeException
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
-import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader}
import pureconfig.loadConfigOrThrow
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.{Map, Queue}
-import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
-import scala.util.Success
// messages received by the actor
// it is required to specify a recipient directly for the retryable message processing
@@ -181,148 +177,3 @@ object DataManagementService {
Props(new DataManagementService(watcherService, workerFactory))
}
}
-
-private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
- actorSystem: ActorSystem,
- logging: Logging)
- extends Actor {
-
- private val dataManagementService = context.parent
- private var lease: Option[Lease] = None
- leaseService ! GetLease
-
- override def receive: Receive = {
- case msg: Lease =>
- lease = Some(msg)
-
- // leader election + endpoint management
- case request: ElectLeader =>
- lease match {
- case Some(l) =>
- etcdClient
- .electLeader(request.key, request.value, l)
- .andThen {
- case Success(msg) =>
- request.recipient ! ElectionResult(msg)
- dataManagementService ! FinishWork(request.key)
- }
- .recover {
- // if there is no lease, reissue it and retry immediately
- case t: StatusRuntimeException =>
- logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
- lease = None
- leaseService ! GetLease
- sendMessageToSelfAfter(request, retryInterval)
-
- // it should retry forever until the data is stored
- case t: Throwable =>
- logging.warn(this, s"unexpected error happened: $t, retry storing data")
- sendMessageToSelfAfter(request, retryInterval)
- }
- case None =>
- logging.warn(this, s"lease not found, retry storing data")
- leaseService ! GetLease
- sendMessageToSelfAfter(request, retryInterval)
- }
-
- // only endpoint management
- case request: RegisterData =>
- lease match {
- case Some(l) =>
- etcdClient
- .put(request.key, request.value, l.id)
- .andThen {
- case Success(_) =>
- dataManagementService ! FinishWork(request.key)
- }
- .recover {
- // if there is no lease, reissue it and retry immediately
- case t: StatusRuntimeException =>
- logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
- lease = None
- leaseService ! GetLease
- sendMessageToSelfAfter(request, retryInterval)
-
- // it should retry forever until the data is stored
- case t: Throwable =>
- logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
- sendMessageToSelfAfter(request, retryInterval)
- }
- case None =>
- logging.warn(this, s"lease not found, retry storing data ${request.key}")
- leaseService ! GetLease
- sendMessageToSelfAfter(request, retryInterval)
- }
-
- // it stores the data iif there is no such one
- case request: RegisterInitialData =>
- lease match {
- case Some(l) =>
- etcdClient
- .putTxn(request.key, request.value, 0, l.id)
- .map { res =>
- dataManagementService ! FinishWork(request.key)
- if (res.getSucceeded) {
- logging.info(this, s"initial data storing succeeds for ${request.key}")
- request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
- } else {
- logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
- request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
- }
- }
- .recover {
- // if there is no lease, reissue it and retry immediately
- case t: StatusRuntimeException =>
- logging.warn(
- this,
- s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
- lease = None
- leaseService ! GetLease
- sendMessageToSelfAfter(request, retryInterval)
-
- // it should retry forever until the data is stored
- case t: Throwable =>
- logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}")
- sendMessageToSelfAfter(request, retryInterval)
- }
- case None =>
- logging.warn(this, s"lease not found, retry storing data for ${request.key}")
- leaseService ! GetLease
- sendMessageToSelfAfter(request, retryInterval)
- }
-
- case msg: WatcherClosed =>
- etcdClient
- .del(msg.key)
- .andThen {
- case Success(_) =>
- dataManagementService ! FinishWork(msg.key)
- }
- .recover {
- // if there is no lease, reissue it and retry immediately
- case t: StatusRuntimeException =>
- logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
- lease = None
- leaseService ! GetLease
- sendMessageToSelfAfter(msg, retryInterval)
-
- // it should retry forever until the data is stored
- case t: Throwable =>
- logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}")
- sendMessageToSelfAfter(msg, retryInterval)
- }
-
- }
-
- private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
- actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
- }
-}
-
-object EtcdWorker {
- def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
- actorSystem: ActorSystem,
- logging: Logging): Props = {
- Props(new EtcdWorker(etcdClient, leaseService))
- }
-}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 327c0deda..67660e8a6 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -37,10 +37,10 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
import org.apache.openwhisk.core.etcd.EtcdType._
-import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker}
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
-import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
+import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest}
import org.apache.openwhisk.spi.SpiLoader
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index f32520cc7..2038fc1c3 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -36,11 +36,11 @@ import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentEx
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys}
import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString
-import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker}
import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager}
import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
import org.apache.openwhisk.core.scheduler.queue._
-import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
+import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.grpc.ActivationServiceHandler
import org.apache.openwhisk.http.BasicHttpService
diff --git a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala
new file mode 100644
index 000000000..a4203fc28
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala
@@ -0,0 +1,176 @@
+package org.apache.openwhisk.common.etcd
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe}
+import akka.util.Timeout
+import com.ibm.etcd.api.{DeleteRangeResponse, PutResponse, TxnResponse}
+import common.StreamLogging
+import io.grpc.{Status, StatusRuntimeException}
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader, EtcdWorker}
+import org.apache.openwhisk.core.service.{
+ AlreadyExist,
+ Done,
+ ElectLeader,
+ ElectionResult,
+ FinishWork,
+ GetLease,
+ InitialDataStorageResults,
+ Lease,
+ RegisterData,
+ RegisterInitialData,
+ WatcherClosed
+}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class EtcdWorkerTests
+ extends TestKit(ActorSystem("EtcdWorker"))
+ with ImplicitSender
+ with FlatSpecLike
+ with ScalaFutures
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll
+ with StreamLogging {
+
+ implicit val timeout: Timeout = Timeout(5.seconds)
+ implicit val ec: ExecutionContext = system.dispatcher
+ val leaseService = TestProbe()
+ val leaseId = 10
+ val leaseTtl = 10
+ leaseService.setAutoPilot((sender: ActorRef, msg: Any) =>
+ msg match {
+ case GetLease =>
+ sender ! Lease(leaseId, leaseTtl)
+ TestActor.KeepRunning
+
+ case _ =>
+ TestActor.KeepRunning
+ })
+
+ //val dataManagementService = TestProbe()
+ val schedulerId = SchedulerInstanceId("scheduler0")
+ val instanceId = schedulerId
+
+ behavior of "EtcdWorker"
+
+ it should "elect leader and send completion ack to parent" in {
+ val mockEtcd = mock[EtcdClient]
+
+ val key = "testKey"
+ val value = "testValue"
+ val leader = Right(EtcdLeader(key, value, leaseId))
+ val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+ (mockEtcd
+ .electLeader(_: String, _: String, _: Lease))
+ .expects(key, value, *)
+ .returns(Future.successful(leader))
+
+ etcdWorker ! ElectLeader(key, value, recipient = self)
+
+ expectMsg(ElectionResult(leader))
+ expectMsg(FinishWork(key))
+ }
+
+ it should "register initial data when doesn't exit and send completion ack to parent" in {
+ val mockEtcd = mock[EtcdClient]
+
+ val key = "testKey"
+ val value = "testValue"
+ val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+ (mockEtcd
+ .putTxn(_: String, _: String, _: Long, _: Long))
+ .expects(key, value, *, *)
+ .returns(Future.successful(TxnResponse.newBuilder().setSucceeded(true).build()))
+
+ etcdWorker ! RegisterInitialData(key, value, recipient = Some(self))
+
+ expectMsg(FinishWork(key))
+ expectMsg(InitialDataStorageResults(key, Right(Done())))
+ }
+
+ it should "attempt to register initial data when exists and send completion ack to parent" in {
+ val mockEtcd = mock[EtcdClient]
+
+ val key = "testKey"
+ val value = "testValue"
+ val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+ (mockEtcd
+ .putTxn(_: String, _: String, _: Long, _: Long))
+ .expects(key, value, *, *)
+ .returns(Future.successful(TxnResponse.newBuilder().setSucceeded(false).build()))
+
+ etcdWorker ! RegisterInitialData(key, value, recipient = Some(self))
+
+ expectMsg(FinishWork(key))
+ expectMsg(InitialDataStorageResults(key, Left(AlreadyExist())))
+ }
+
+ it should "register data and send completion ack to parent" in {
+ val mockEtcd = mock[EtcdClient]
+
+ val key = "testKey"
+ val value = "testValue"
+ val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+ (mockEtcd
+ .put(_: String, _: String, _: Long))
+ .expects(key, value, leaseId)
+ .returns(Future.successful(PutResponse.newBuilder().build()))
+
+ etcdWorker ! RegisterData(key, value)
+
+ expectMsg(FinishWork(key))
+ }
+
+ it should "delete data when watcher closed" in {
+ val mockEtcd = mock[EtcdClient]
+
+ val key = "testKey"
+ val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+ (mockEtcd
+ .del(_: String))
+ .expects(key)
+ .returns(Future.successful(DeleteRangeResponse.newBuilder().build()))
+
+ etcdWorker ! WatcherClosed(key, false)
+
+ expectMsg(FinishWork(key))
+ }
+
+ it should "retry request after failure if lease does not exist" in {
+ val mockEtcd = mock[EtcdClient]
+
+ val key = "testKey"
+ val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+ var firstAttempt = true
+ (mockEtcd
+ .del(_: String))
+ .expects(key)
+ .onCall((_: String) => {
+ if (firstAttempt) {
+ firstAttempt = false
+ Future.failed(new StatusRuntimeException(Status.RESOURCE_EXHAUSTED))
+ } else {
+ Future.successful(DeleteRangeResponse.newBuilder().build())
+ }
+ })
+ .twice()
+
+ etcdWorker ! WatcherClosed(key, false)
+
+ expectMsg(FinishWork(key))
+ }
+}