You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2021/02/10 23:23:47 UTC

[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5063: [New Scheduler] Add DataManagementService

bdoyle0182 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574136574



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")

Review comment:
       do these really need to be info level?

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.

Review comment:
       ```suggestion
    * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
+                                                                                  actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor {
+
+  private val parent = context.parent
+  private var lease: Option[Lease] = None
+  leaseService ! GetLease
+
+  override def receive: Receive = {
+    case msg: Lease =>
+      lease = Some(msg)
+
+    // leader election + endpoint management
+    case request: ElectLeader =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .electLeader(request.key, request.value, l)
+            .andThen {
+              case Success(msg) =>
+                request.recipient ! ElectionResult(msg)
+                parent ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // only endpoint management
+    case request: RegisterData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .put(request.key, request.value, l.id)
+            .andThen {
+              case Success(_) =>
+                parent ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data ${request.key}")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // it stores the data iif there is no such one
+    case request: RegisterInitialData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .putTxn(request.key, request.value, 0, l.id)
+            .map { res =>
+              parent ! FinishWork(request.key)
+              if (res.getSucceeded) {
+                logging.debug(this, s"data is stored.")

Review comment:
       what data is stored?

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
+                                                                                  actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor {
+
+  private val parent = context.parent

Review comment:
       Is the parent always `dataManagementService`? If so it would help to be more descriptive than `parent`

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
##########
@@ -0,0 +1,45 @@
+package org.apache.openwhisk.core.service
+
+// messages received by this actor
+case class WatchEndpoint(key: String,
+                         value: String,
+                         isPrefix: Boolean,
+                         name: String,
+                         listenEvents: Set[EtcdEvent] = Set.empty)
+case class UnWatchEndpoint(watchKey: String, isPrefix: Boolean, watchName: String, needFeedback: Boolean = false)

Review comment:
       ```suggestion
   case class UnwatchEndpoint(watchKey: String, isPrefix: Boolean, watchName: String, needFeedback: Boolean = false)
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories

Review comment:
       ```suggestion
           operations.remove(key) // remove empty queue from the map to free memory
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>

Review comment:
       ```suggestion
       case msg: UnregisterData =>
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing

Review comment:
       nit: don't think this comment really adds anything

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data

Review comment:
       ```suggestion
       // It should not receive "prefixed" data
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed

Review comment:
       ```suggestion
         // send WatchEndpoint first as the put operation will be retried until success if failed
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
##########
@@ -0,0 +1,16 @@
+package org.apache.openwhisk.core.service
+
+// Data
+sealed trait KeepAliveServiceData
+case object NoData extends KeepAliveServiceData
+case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
+
+// Events received by the actor
+case object ReGrantLease

Review comment:
       ```suggestion
   case object RegrantLease
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
+                                                                                  actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor {
+
+  private val parent = context.parent
+  private var lease: Option[Lease] = None
+  leaseService ! GetLease
+
+  override def receive: Receive = {
+    case msg: Lease =>
+      lease = Some(msg)
+
+    // leader election + endpoint management
+    case request: ElectLeader =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .electLeader(request.key, request.value, l)
+            .andThen {
+              case Success(msg) =>
+                request.recipient ! ElectionResult(msg)
+                parent ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // only endpoint management
+    case request: RegisterData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .put(request.key, request.value, l.id)
+            .andThen {
+              case Success(_) =>
+                parent ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data ${request.key}")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // it stores the data iif there is no such one
+    case request: RegisterInitialData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .putTxn(request.key, request.value, 0, l.id)
+            .map { res =>
+              parent ! FinishWork(request.key)
+              if (res.getSucceeded) {
+                logging.debug(this, s"data is stored.")
+                request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
+              } else {
+                logging.debug(this, s"data is already stored for: $request")
+                request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
+              }
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(
+                  this,
+                  s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data for ${request.key}")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    case msg: WatcherClosed =>
+      etcdClient
+        .del(msg.key)
+        .andThen {
+          case Success(_) =>
+            parent ! FinishWork(msg.key)
+        }
+        .recover {
+          // if there is no lease, reissue it and retry immediately
+          case t: StatusRuntimeException =>
+            logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
+            lease = None
+            leaseService ! GetLease
+            sendMessageToSelfAfter(msg, retryInterval)
+
+          // it should retry forever until the data is stored
+          case t: Throwable =>
+            logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}")
+            sendMessageToSelfAfter(msg, retryInterval)
+        }
+
+  }
+
+  private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration): Future[Unit] = {

Review comment:
       akka should have a built in `scheduleOnce` method rather than have your own method right?
   
   i.e. `actorSystem.scheduler.scheduleOnce(50 milliseconds, this, "foo")`

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second

Review comment:
       Should change to configuration before merging. Is 1 second a good default for this?

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
+                                                                                  actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor {
+
+  private val parent = context.parent

Review comment:
       Is the parent always `dataManagementService`? If so it would help to be more descriptive than `parent`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org