You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/10/05 04:15:01 UTC

[openwhisk] branch master updated: Clean Up Etcd Worker Actor (#5323)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 236ca5e4b Clean Up Etcd Worker Actor (#5323)
236ca5e4b is described below

commit 236ca5e4b894e4cc626f685c1d0eba5c3e6077ec
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Wed Oct 5 00:14:54 2022 -0400

    Clean Up Etcd Worker Actor (#5323)
    
    * clean up etcd worker actor
    
    * revert etcd client local change for unit testing
    
    * fix scala 2.13 compilation
    
    Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
 .../apache/openwhisk/core/etcd/EtcdWorker.scala    | 166 +++++++++++++++++++
 .../core/service/DataManagementService.scala       | 151 +-----------------
 .../core/invoker/FPCInvokerReactive.scala          |   4 +-
 .../openwhisk/core/scheduler/Scheduler.scala       |   4 +-
 .../openwhisk/common/etcd/EtcdWorkerTests.scala    | 176 +++++++++++++++++++++
 5 files changed, 347 insertions(+), 154 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala
new file mode 100644
index 000000000..82d78e693
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdWorker.scala
@@ -0,0 +1,166 @@
+package org.apache.openwhisk.core.etcd
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props, Timers}
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.EtcdWorker.GetLeaseAndRetry
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import org.apache.openwhisk.core.service.{
+  AlreadyExist,
+  Done,
+  ElectLeader,
+  ElectionResult,
+  FinishWork,
+  GetLease,
+  InitialDataStorageResults,
+  Lease,
+  RegisterData,
+  RegisterInitialData,
+  WatcherClosed
+}
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+import scala.util.Success
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor
+    with Timers {
+
+  private val dataManagementService = context.parent
+  private var lease: Option[Lease] = None
+  leaseService ! GetLease
+
+  override def receive: Receive = {
+    case msg: Lease =>
+      lease = Some(msg)
+    case msg: GetLeaseAndRetry =>
+      logging.warn(this, msg.log)
+      if (!msg.skipLeaseRefresh) {
+        if (msg.clearLease) {
+          lease = None
+        }
+        leaseService ! GetLease
+      }
+      sendMessageToSelfAfter(msg.request, retryInterval)
+    // leader election + endpoint management
+    case request: ElectLeader =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .electLeader(request.key, request.value, l)
+            .andThen {
+              case Success(msg) =>
+                request.recipient ! ElectionResult(msg)
+                dataManagementService ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                self ! GetLeaseAndRetry(request, s"a lease is expired while leader election, reissue it: $t")
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                self ! GetLeaseAndRetry(
+                  request,
+                  s"unexpected error happened: $t, retry storing data",
+                  skipLeaseRefresh = true)
+            }
+        case None =>
+          self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
+      }
+
+    // only endpoint management
+    case request: RegisterData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .put(request.key, request.value, l.id)
+            .andThen {
+              case Success(_) =>
+                dataManagementService ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                self ! GetLeaseAndRetry(
+                  request,
+                  s"a lease is expired while registering data ${request.key}, reissue it: $t")
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                self ! GetLeaseAndRetry(
+                  request,
+                  s"unexpected error happened: $t, retry storing data ${request.key}",
+                  skipLeaseRefresh = true)
+            }
+        case None =>
+          self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
+      }
+    // it stores the data iif there is no such one
+    case request: RegisterInitialData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .putTxn(request.key, request.value, 0, l.id)
+            .map { res =>
+              dataManagementService ! FinishWork(request.key)
+              if (res.getSucceeded) {
+                logging.info(this, s"initial data storing succeeds for ${request.key}")
+                request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
+              } else {
+                logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
+                request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
+              }
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                self ! GetLeaseAndRetry(
+                  request,
+                  s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                self ! GetLeaseAndRetry(
+                  request,
+                  s"unexpected error happened: $t, retry storing data ${request.key}",
+                  skipLeaseRefresh = true)
+            }
+        case None =>
+          self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
+      }
+
+    case msg: WatcherClosed =>
+      etcdClient
+        .del(msg.key)
+        .andThen {
+          case Success(_) =>
+            dataManagementService ! FinishWork(msg.key)
+        }
+        .recover {
+          // if there is no lease, reissue it and retry immediately
+          case t: StatusRuntimeException =>
+            self ! GetLeaseAndRetry(msg, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
+          // it should retry forever until the data is stored
+          case t: Throwable =>
+            self ! GetLeaseAndRetry(
+              msg,
+              s"unexpected error happened: $t, retry storing data for ${msg.key}",
+              skipLeaseRefresh = true)
+        }
+  }
+
+  private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
+    timers.startSingleTimer(msg, msg, retryInterval)
+  }
+}
+
+object EtcdWorker {
+  case class GetLeaseAndRetry(request: Any, log: String, clearLease: Boolean = true, skipLeaseRefresh: Boolean = false)
+
+  def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
+                                                            actorSystem: ActorSystem,
+                                                            logging: Logging): Props = {
+    Props(new EtcdWorker(etcdClient, leaseService))
+  }
+}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
index 425832679..c03070bf8 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
@@ -19,18 +19,14 @@ package org.apache.openwhisk.core.service
 
 import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
 import akka.util.Timeout
-import io.grpc.StatusRuntimeException
 import org.apache.openwhisk.common.Logging
 import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
-import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader}
 import pureconfig.loadConfigOrThrow
 
 import scala.collection.concurrent.TrieMap
 import scala.collection.mutable.{Map, Queue}
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
-import scala.util.Success
 
 // messages received by the actor
 // it is required to specify a recipient directly for the retryable message processing
@@ -181,148 +177,3 @@ object DataManagementService {
     Props(new DataManagementService(watcherService, workerFactory))
   }
 }
-
-private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
-                                                                                  actorSystem: ActorSystem,
-                                                                                  logging: Logging)
-    extends Actor {
-
-  private val dataManagementService = context.parent
-  private var lease: Option[Lease] = None
-  leaseService ! GetLease
-
-  override def receive: Receive = {
-    case msg: Lease =>
-      lease = Some(msg)
-
-    // leader election + endpoint management
-    case request: ElectLeader =>
-      lease match {
-        case Some(l) =>
-          etcdClient
-            .electLeader(request.key, request.value, l)
-            .andThen {
-              case Success(msg) =>
-                request.recipient ! ElectionResult(msg)
-                dataManagementService ! FinishWork(request.key)
-            }
-            .recover {
-              // if there is no lease, reissue it and retry immediately
-              case t: StatusRuntimeException =>
-                logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
-                lease = None
-                leaseService ! GetLease
-                sendMessageToSelfAfter(request, retryInterval)
-
-              // it should retry forever until the data is stored
-              case t: Throwable =>
-                logging.warn(this, s"unexpected error happened: $t, retry storing data")
-                sendMessageToSelfAfter(request, retryInterval)
-            }
-        case None =>
-          logging.warn(this, s"lease not found, retry storing data")
-          leaseService ! GetLease
-          sendMessageToSelfAfter(request, retryInterval)
-      }
-
-    // only endpoint management
-    case request: RegisterData =>
-      lease match {
-        case Some(l) =>
-          etcdClient
-            .put(request.key, request.value, l.id)
-            .andThen {
-              case Success(_) =>
-                dataManagementService ! FinishWork(request.key)
-            }
-            .recover {
-              // if there is no lease, reissue it and retry immediately
-              case t: StatusRuntimeException =>
-                logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
-                lease = None
-                leaseService ! GetLease
-                sendMessageToSelfAfter(request, retryInterval)
-
-              // it should retry forever until the data is stored
-              case t: Throwable =>
-                logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
-                sendMessageToSelfAfter(request, retryInterval)
-            }
-        case None =>
-          logging.warn(this, s"lease not found, retry storing data ${request.key}")
-          leaseService ! GetLease
-          sendMessageToSelfAfter(request, retryInterval)
-      }
-
-    // it stores the data iif there is no such one
-    case request: RegisterInitialData =>
-      lease match {
-        case Some(l) =>
-          etcdClient
-            .putTxn(request.key, request.value, 0, l.id)
-            .map { res =>
-              dataManagementService ! FinishWork(request.key)
-              if (res.getSucceeded) {
-                logging.info(this, s"initial data storing succeeds for ${request.key}")
-                request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
-              } else {
-                logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
-                request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
-              }
-            }
-            .recover {
-              // if there is no lease, reissue it and retry immediately
-              case t: StatusRuntimeException =>
-                logging.warn(
-                  this,
-                  s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
-                lease = None
-                leaseService ! GetLease
-                sendMessageToSelfAfter(request, retryInterval)
-
-              // it should retry forever until the data is stored
-              case t: Throwable =>
-                logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}")
-                sendMessageToSelfAfter(request, retryInterval)
-            }
-        case None =>
-          logging.warn(this, s"lease not found, retry storing data for ${request.key}")
-          leaseService ! GetLease
-          sendMessageToSelfAfter(request, retryInterval)
-      }
-
-    case msg: WatcherClosed =>
-      etcdClient
-        .del(msg.key)
-        .andThen {
-          case Success(_) =>
-            dataManagementService ! FinishWork(msg.key)
-        }
-        .recover {
-          // if there is no lease, reissue it and retry immediately
-          case t: StatusRuntimeException =>
-            logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
-            lease = None
-            leaseService ! GetLease
-            sendMessageToSelfAfter(msg, retryInterval)
-
-          // it should retry forever until the data is stored
-          case t: Throwable =>
-            logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}")
-            sendMessageToSelfAfter(msg, retryInterval)
-        }
-
-  }
-
-  private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
-    actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
-  }
-}
-
-object EtcdWorker {
-  def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
-                                                            actorSystem: ActorSystem,
-                                                            logging: Logging): Props = {
-    Props(new EtcdWorker(etcdClient, leaseService))
-  }
-}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 327c0deda..67660e8a6 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -37,10 +37,10 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
 import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
 import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
 import org.apache.openwhisk.core.etcd.EtcdType._
-import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker}
 import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
 import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
-import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
+import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService}
 import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
 import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest}
 import org.apache.openwhisk.spi.SpiLoader
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index f32520cc7..2038fc1c3 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -36,11 +36,11 @@ import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentEx
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys}
 import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString
-import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker}
 import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager}
 import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
 import org.apache.openwhisk.core.scheduler.queue._
-import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
+import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService}
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.grpc.ActivationServiceHandler
 import org.apache.openwhisk.http.BasicHttpService
diff --git a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala
new file mode 100644
index 000000000..a4203fc28
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdWorkerTests.scala
@@ -0,0 +1,176 @@
+package org.apache.openwhisk.common.etcd
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe}
+import akka.util.Timeout
+import com.ibm.etcd.api.{DeleteRangeResponse, PutResponse, TxnResponse}
+import common.StreamLogging
+import io.grpc.{Status, StatusRuntimeException}
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader, EtcdWorker}
+import org.apache.openwhisk.core.service.{
+  AlreadyExist,
+  Done,
+  ElectLeader,
+  ElectionResult,
+  FinishWork,
+  GetLease,
+  InitialDataStorageResults,
+  Lease,
+  RegisterData,
+  RegisterInitialData,
+  WatcherClosed
+}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class EtcdWorkerTests
+    extends TestKit(ActorSystem("EtcdWorker"))
+    with ImplicitSender
+    with FlatSpecLike
+    with ScalaFutures
+    with Matchers
+    with MockFactory
+    with BeforeAndAfterAll
+    with StreamLogging {
+
+  implicit val timeout: Timeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = system.dispatcher
+  val leaseService = TestProbe()
+  val leaseId = 10
+  val leaseTtl = 10
+  leaseService.setAutoPilot((sender: ActorRef, msg: Any) =>
+    msg match {
+      case GetLease =>
+        sender ! Lease(leaseId, leaseTtl)
+        TestActor.KeepRunning
+
+      case _ =>
+        TestActor.KeepRunning
+  })
+
+  //val dataManagementService = TestProbe()
+  val schedulerId = SchedulerInstanceId("scheduler0")
+  val instanceId = schedulerId
+
+  behavior of "EtcdWorker"
+
+  it should "elect leader and send completion ack to parent" in {
+    val mockEtcd = mock[EtcdClient]
+
+    val key = "testKey"
+    val value = "testValue"
+    val leader = Right(EtcdLeader(key, value, leaseId))
+    val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+    (mockEtcd
+      .electLeader(_: String, _: String, _: Lease))
+      .expects(key, value, *)
+      .returns(Future.successful(leader))
+
+    etcdWorker ! ElectLeader(key, value, recipient = self)
+
+    expectMsg(ElectionResult(leader))
+    expectMsg(FinishWork(key))
+  }
+
+  it should "register initial data when doesn't exit and send completion ack to parent" in {
+    val mockEtcd = mock[EtcdClient]
+
+    val key = "testKey"
+    val value = "testValue"
+    val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+    (mockEtcd
+      .putTxn(_: String, _: String, _: Long, _: Long))
+      .expects(key, value, *, *)
+      .returns(Future.successful(TxnResponse.newBuilder().setSucceeded(true).build()))
+
+    etcdWorker ! RegisterInitialData(key, value, recipient = Some(self))
+
+    expectMsg(FinishWork(key))
+    expectMsg(InitialDataStorageResults(key, Right(Done())))
+  }
+
+  it should "attempt to register initial data when exists and send completion ack to parent" in {
+    val mockEtcd = mock[EtcdClient]
+
+    val key = "testKey"
+    val value = "testValue"
+    val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+    (mockEtcd
+      .putTxn(_: String, _: String, _: Long, _: Long))
+      .expects(key, value, *, *)
+      .returns(Future.successful(TxnResponse.newBuilder().setSucceeded(false).build()))
+
+    etcdWorker ! RegisterInitialData(key, value, recipient = Some(self))
+
+    expectMsg(FinishWork(key))
+    expectMsg(InitialDataStorageResults(key, Left(AlreadyExist())))
+  }
+
+  it should "register data and send completion ack to parent" in {
+    val mockEtcd = mock[EtcdClient]
+
+    val key = "testKey"
+    val value = "testValue"
+    val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+    (mockEtcd
+      .put(_: String, _: String, _: Long))
+      .expects(key, value, leaseId)
+      .returns(Future.successful(PutResponse.newBuilder().build()))
+
+    etcdWorker ! RegisterData(key, value)
+
+    expectMsg(FinishWork(key))
+  }
+
+  it should "delete data when watcher closed" in {
+    val mockEtcd = mock[EtcdClient]
+
+    val key = "testKey"
+    val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+
+    (mockEtcd
+      .del(_: String))
+      .expects(key)
+      .returns(Future.successful(DeleteRangeResponse.newBuilder().build()))
+
+    etcdWorker ! WatcherClosed(key, false)
+
+    expectMsg(FinishWork(key))
+  }
+
+  it should "retry request after failure if lease does not exist" in {
+    val mockEtcd = mock[EtcdClient]
+
+    val key = "testKey"
+    val etcdWorker = TestActorRef(EtcdWorker.props(mockEtcd, leaseService.ref), self)
+    var firstAttempt = true
+    (mockEtcd
+      .del(_: String))
+      .expects(key)
+      .onCall((_: String) => {
+        if (firstAttempt) {
+          firstAttempt = false
+          Future.failed(new StatusRuntimeException(Status.RESOURCE_EXHAUSTED))
+        } else {
+          Future.successful(DeleteRangeResponse.newBuilder().build())
+        }
+      })
+      .twice()
+
+    etcdWorker ! WatcherClosed(key, false)
+
+    expectMsg(FinishWork(key))
+  }
+}