You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2021/02/10 04:10:15 UTC

[GitHub] [openwhisk] style95 opened a new pull request #5063: [New Scheduler] Add DataManagementService

style95 opened a new pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063


   ## Description
   This component is in charge of storing data to ETCD.
   It is based on eventual consistency.
   If it fails to store data for some reason, it keeps retrying until data is stored.
   
   ## Related issue and scope
   <!--- Please include a link to a related issue if there is one. -->
   - [ ] I opened an issue to propose and discuss this change (#????)
   
   ## My changes affect the following components
   <!--- Select below all system components are affected by your change. -->
   <!--- Enter an `x` in all applicable boxes. -->
   - [ ] API
   - [ ] Controller
   - [ ] Message Bus (e.g., Kafka)
   - [ ] Loadbalancer
   - [ ] Invoker
   - [ ] Intrinsic actions (e.g., sequences, conductors)
   - [ ] Data stores (e.g., CouchDB)
   - [ ] Tests
   - [ ] Deployment
   - [ ] CLI
   - [ ] General tooling
   - [ ] Documentation
   
   ## Types of changes
   <!--- What types of changes does your code introduce? Use `x` in all the boxes that apply: -->
   - [ ] Bug fix (generally a non-breaking change which closes an issue).
   - [x] Enhancement or new feature (adds new functionality).
   - [ ] Breaking change (a bug fix or enhancement which changes existing behavior).
   
   ## Checklist:
   <!--- Please review the points below which help you make sure you've covered all aspects of the change you're making. -->
   
   - [x] I signed an [Apache CLA](https://github.com/apache/openwhisk/blob/master/CONTRIBUTING.md).
   - [x] I reviewed the [style guides](https://github.com/apache/openwhisk/wiki/Contributing:-Git-guidelines#code-readiness) and followed the recommendations (Travis CI will check :).
   - [x] I added tests to cover my changes.
   - [ ] My changes require further changes to the documentation.
   - [ ] I updated the documentation where necessary.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #5063: [New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-799145396


   It's ready to merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io commented on pull request #5063: [New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-796587203


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=h1) Report
   > Merging [#5063](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=desc) (c7ad646) into [master](https://codecov.io/gh/apache/openwhisk/commit/e05aa44b0cab519c82cf84a8171671a21d779562?el=desc) (e05aa44) will **decrease** coverage by `6.97%`.
   > The diff coverage is `44.69%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5063/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #5063      +/-   ##
   ==========================================
   - Coverage   81.63%   74.66%   -6.98%     
   ==========================================
     Files         205      207       +2     
     Lines       10013    10183     +170     
     Branches      442      467      +25     
   ==========================================
   - Hits         8174     7603     -571     
   - Misses       1839     2580     +741     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...la/org/apache/openwhisk/core/etcd/EtcdClient.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZXRjZC9FdGNkQ2xpZW50LnNjYWxh) | `26.22% <ø> (ø)` | |
   | [...apache/openwhisk/core/service/WatcherService.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvc2VydmljZS9XYXRjaGVyU2VydmljZS5zY2FsYQ==) | `91.66% <ø> (+2.08%)` | :arrow_up: |
   | [...openwhisk/core/service/DataManagementService.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvc2VydmljZS9EYXRhTWFuYWdlbWVudFNlcnZpY2Uuc2NhbGE=) | `44.27% <44.27%> (ø)` | |
   | [.../scala/org/apache/openwhisk/core/WhiskConfig.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvV2hpc2tDb25maWcuc2NhbGE=) | `95.48% <100.00%> (+0.02%)` | :arrow_up: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-95.85%)` | :arrow_down: |
   | [...sk/core/database/cosmosdb/CosmosDBViewMapper.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJWaWV3TWFwcGVyLnNjYWxh) | `0.00% <0.00%> (-93.90%)` | :arrow_down: |
   | [...tabase/cosmosdb/cache/CacheInvalidatorConfig.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3JDb25maWcuc2NhbGE=) | `0.00% <0.00%> (-92.31%)` | :arrow_down: |
   | ... and [22 more](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=footer). Last update [e05aa44...c7ad646](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on pull request #5063: [New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-802137852


   Just one comment. LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 merged pull request #5063: [New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 merged pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574208971



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(

Review comment:
       I will include test cases into this PR after setting up the CI pipeline for scheduler components.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574209680



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")

Review comment:
       So I'm still learning what this is doing, but what does each request involve? Is it every activation or some sort of metadata setup? If it's every activation it would seem spammy to me otherwise I think it's fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-792575959


   Waiting for this PR(https://github.com/apache/openwhisk/pull/5067) to be merged.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5063: [New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
bdoyle0182 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574136574



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")

Review comment:
       do these really need to be info level?

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.

Review comment:
       ```suggestion
    * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
+                                                                                  actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor {
+
+  private val parent = context.parent
+  private var lease: Option[Lease] = None
+  leaseService ! GetLease
+
+  override def receive: Receive = {
+    case msg: Lease =>
+      lease = Some(msg)
+
+    // leader election + endpoint management
+    case request: ElectLeader =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .electLeader(request.key, request.value, l)
+            .andThen {
+              case Success(msg) =>
+                request.recipient ! ElectionResult(msg)
+                parent ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // only endpoint management
+    case request: RegisterData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .put(request.key, request.value, l.id)
+            .andThen {
+              case Success(_) =>
+                parent ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data ${request.key}")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // it stores the data iif there is no such one
+    case request: RegisterInitialData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .putTxn(request.key, request.value, 0, l.id)
+            .map { res =>
+              parent ! FinishWork(request.key)
+              if (res.getSucceeded) {
+                logging.debug(this, s"data is stored.")

Review comment:
       what data is stored?

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
+                                                                                  actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor {
+
+  private val parent = context.parent

Review comment:
       Is the parent always `dataManagementService`? If so it would help to be more descriptive than `parent`

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
##########
@@ -0,0 +1,45 @@
+package org.apache.openwhisk.core.service
+
+// messages received by this actor
+case class WatchEndpoint(key: String,
+                         value: String,
+                         isPrefix: Boolean,
+                         name: String,
+                         listenEvents: Set[EtcdEvent] = Set.empty)
+case class UnWatchEndpoint(watchKey: String, isPrefix: Boolean, watchName: String, needFeedback: Boolean = false)

Review comment:
       ```suggestion
   case class UnwatchEndpoint(watchKey: String, isPrefix: Boolean, watchName: String, needFeedback: Boolean = false)
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories

Review comment:
       ```suggestion
           operations.remove(key) // remove empty queue from the map to free memory
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>

Review comment:
       ```suggestion
       case msg: UnregisterData =>
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing

Review comment:
       nit: don't think this comment really adds anything

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data

Review comment:
       ```suggestion
       // It should not receive "prefixed" data
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed

Review comment:
       ```suggestion
         // send WatchEndpoint first as the put operation will be retried until success if failed
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
##########
@@ -0,0 +1,16 @@
+package org.apache.openwhisk.core.service
+
+// Data
+sealed trait KeepAliveServiceData
+case object NoData extends KeepAliveServiceData
+case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
+
+// Events received by the actor
+case object ReGrantLease

Review comment:
       ```suggestion
   case object RegrantLease
   ```

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
+                                                                                  actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor {
+
+  private val parent = context.parent
+  private var lease: Option[Lease] = None
+  leaseService ! GetLease
+
+  override def receive: Receive = {
+    case msg: Lease =>
+      lease = Some(msg)
+
+    // leader election + endpoint management
+    case request: ElectLeader =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .electLeader(request.key, request.value, l)
+            .andThen {
+              case Success(msg) =>
+                request.recipient ! ElectionResult(msg)
+                parent ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // only endpoint management
+    case request: RegisterData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .put(request.key, request.value, l.id)
+            .andThen {
+              case Success(_) =>
+                parent ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data ${request.key}")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // it stores the data iif there is no such one
+    case request: RegisterInitialData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .putTxn(request.key, request.value, 0, l.id)
+            .map { res =>
+              parent ! FinishWork(request.key)
+              if (res.getSucceeded) {
+                logging.debug(this, s"data is stored.")
+                request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
+              } else {
+                logging.debug(this, s"data is already stored for: $request")
+                request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
+              }
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(
+                  this,
+                  s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data for ${request.key}")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    case msg: WatcherClosed =>
+      etcdClient
+        .del(msg.key)
+        .andThen {
+          case Success(_) =>
+            parent ! FinishWork(msg.key)
+        }
+        .recover {
+          // if there is no lease, reissue it and retry immediately
+          case t: StatusRuntimeException =>
+            logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
+            lease = None
+            leaseService ! GetLease
+            sendMessageToSelfAfter(msg, retryInterval)
+
+          // it should retry forever until the data is stored
+          case t: Throwable =>
+            logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}")
+            sendMessageToSelfAfter(msg, retryInterval)
+        }
+
+  }
+
+  private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration): Future[Unit] = {

Review comment:
       akka should have a built in `scheduleOnce` method rather than have your own method right?
   
   i.e. `actorSystem.scheduler.scheduleOnce(50 milliseconds, this, "foo")`

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second

Review comment:
       Should change to configuration before merging. Is 1 second a good default for this?

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retry until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: DeRegisterData =>
+      watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // it is supposed not to receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
+        // do nothing
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  // Todo: Change to configuration
+  val retryInterval: FiniteDuration = 1.second
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
+                                                                                  actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
+                                                                 actorSystem: ActorSystem,
+                                                                 logging: Logging)
+    extends Actor {
+
+  private val parent = context.parent

Review comment:
       Is the parent always `dataManagementService`? If so it would help to be more descriptive than `parent`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-777157617


   I think we need to merge a PR for CI tests for scheduler components before merging this PR.
   Since some downstream is running their CI tests based on the upstream core repo, we need to differentiate scheduler tests from the others. Need to see if we can setup another dedicated pipeline for scheduler components in Travis.
   And when all components contribution is over, we can merge the pipeline into existing unit/system pipelines.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574209393



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memory
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>

Review comment:
       Leader election happens when a queue is created.
   This is to guarantee only one scheduler creates a certain queue.
   So it happens relatively fewer times.
   
   

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memory
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {

Review comment:
       With the retry nature of this component, if there is a precedent request(being retried), it would store the new request to a buffer.

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memory
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save a request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retried until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retried until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // The put|delete operations against the same key will overwrite the previous results.
+        // For example, if we put a value, delete it and put a new value again, the final result will be the new value.
+        // So we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
+            case _                                                           => true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher is stopped.
+    case msg: UnregisterData =>
+      watcherService ! UnwatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
+
+    // It should not receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>

Review comment:
       To reduce the loads against ETCD, it does not store data if there is no change in the value.

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memory
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save a request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retried until success if failed
+      if (request.failoverEnabled)

Review comment:
       If the failover is enabled, it would watch the key and if the key is deleted for some reason, it would try to restore it.

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memory
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save a request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>

Review comment:
       Actions under the same namespace share some data such as namespace throttling data.
   So it is required to store the data if there is no data yet but not overwrite an existing one.
   This case is for the case.
   

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memory
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save a request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request

Review comment:
       Actual works would be delegated to ETCDWorker.
   

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+

Review comment:
       This class is used by both schedulers and invokers to store data to ETCD.
   The following kinds of data are stored to ETCD.
   1. Throttling data(Action / Namespace)
   2. Queue endpoint(where a queue is running)
   3. Scheduler endpoint.
   4. Container data(running container, warmed container, data to describe how many containers are being created)
   
   Dependent modules are Queue, ContainerProxy, CreationJobManager, etc.
   
   
   

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memory
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save a request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retried until success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>

Review comment:
       This will overwrite the existing data in ETCD.
   Generally, this is used for data that is not shared among actions.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574196400



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")

Review comment:
       In this case, it stores the request into a buffer because there is already precedent request processing. If any issue happens it would let us know if the request has processed or not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-777162098


   Just comes up in my mind is it would be great to write down some documents for each component in Wiki.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-792602815


   I wrote a document about this module: https://cwiki.apache.org/confluence/display/OPENWHISK/DataManagementService
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5063: [New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r573432876



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
##########
@@ -0,0 +1,16 @@
+package org.apache.openwhisk.core.service
+
+// Data
+sealed trait KeepAliveServiceData
+case object NoData extends KeepAliveServiceData
+case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
+
+// Events received by the actor
+case object ReGrantLease
+case object GetLease
+case object GrantLease
+
+// TBD
+class LeaseKeepAliveService {

Review comment:
       I realized this pr is also dependent on these modules.
   It would be better to merge this PR after this kind of module is introduced.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] codecov-io edited a comment on pull request #5063: [New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-796587203


   # [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=h1) Report
   > Merging [#5063](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=desc) (ec5f93f) into [master](https://codecov.io/gh/apache/openwhisk/commit/e05aa44b0cab519c82cf84a8171671a21d779562?el=desc) (e05aa44) will **decrease** coverage by `6.59%`.
   > The diff coverage is `44.69%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/openwhisk/pull/5063/graphs/tree.svg?width=650&height=150&src=pr&token=l0YmsiSAso)](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #5063      +/-   ##
   ==========================================
   - Coverage   81.63%   75.03%   -6.60%     
   ==========================================
     Files         205      214       +9     
     Lines       10013    10448     +435     
     Branches      442      470      +28     
   ==========================================
   - Hits         8174     7840     -334     
   - Misses       1839     2608     +769     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...la/org/apache/openwhisk/core/etcd/EtcdClient.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZXRjZC9FdGNkQ2xpZW50LnNjYWxh) | `26.22% <ø> (ø)` | |
   | [...apache/openwhisk/core/service/WatcherService.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvc2VydmljZS9XYXRjaGVyU2VydmljZS5zY2FsYQ==) | `91.66% <ø> (+2.08%)` | :arrow_up: |
   | [...openwhisk/core/service/DataManagementService.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvc2VydmljZS9EYXRhTWFuYWdlbWVudFNlcnZpY2Uuc2NhbGE=) | `44.27% <44.27%> (ø)` | |
   | [.../scala/org/apache/openwhisk/core/WhiskConfig.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvV2hpc2tDb25maWcuc2NhbGE=) | `95.48% <100.00%> (+0.02%)` | :arrow_up: |
   | [...core/database/cosmosdb/RxObservableImplicits.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvUnhPYnNlcnZhYmxlSW1wbGljaXRzLnNjYWxh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/database/cosmosdb/cache/CacheInvalidator.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3Iuc2NhbGE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/database/cosmosdb/cache/ChangeFeedConsumer.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NoYW5nZUZlZWRDb25zdW1lci5zY2FsYQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...core/database/cosmosdb/CosmosDBArtifactStore.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJBcnRpZmFjdFN0b3JlLnNjYWxh) | `0.00% <0.00%> (-95.85%)` | :arrow_down: |
   | [...sk/core/database/cosmosdb/CosmosDBViewMapper.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29tbW9uL3NjYWxhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvb3BlbndoaXNrL2NvcmUvZGF0YWJhc2UvY29zbW9zZGIvQ29zbW9zREJWaWV3TWFwcGVyLnNjYWxh) | `0.00% <0.00%> (-93.90%)` | :arrow_down: |
   | [...tabase/cosmosdb/cache/CacheInvalidatorConfig.scala](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree#diff-Y29yZS9jb3Ntb3NkYi9jYWNoZS1pbnZhbGlkYXRvci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL29wZW53aGlzay9jb3JlL2RhdGFiYXNlL2Nvc21vc2RiL2NhY2hlL0NhY2hlSW52YWxpZGF0b3JDb25maWcuc2NhbGE=) | `0.00% <0.00%> (-92.31%)` | :arrow_down: |
   | ... and [26 more](https://codecov.io/gh/apache/openwhisk/pull/5063/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=footer). Last update [e05aa44...ec5f93f](https://codecov.io/gh/apache/openwhisk/pull/5063?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #5063: [New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#issuecomment-801624474


   @bdoyle0182 Do you have any other comments on this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on a change in pull request #5063: [WIP][New Scheduler] Add DataManagementService

Posted by GitBox <gi...@apache.org>.
style95 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574208971



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(

Review comment:
       I will add test cases after setting up the CI pipeline for scheduler components.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org