You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2020/11/23 05:05:58 UTC

[GitHub] [openwhisk] bdoyle0182 commented on a change in pull request #5031: [New Scheduler] Etcd installation & Implements EtcdClient

bdoyle0182 commented on a change in pull request #5031:
URL: https://github.com/apache/openwhisk/pull/5031#discussion_r528473688



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
##########
@@ -0,0 +1,262 @@
+package org.apache.openwhisk.core.etcd
+
+import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
+import com.ibm.etcd.api._
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.{KvClient, WatchUpdate}
+import com.ibm.etcd.client.{EtcdClient => Client}
+import io.grpc.stub.StreamObserver
+import java.util.concurrent.Executors
+
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.EtcdType._
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol
+
+import scala.language.implicitConversions
+import scala.annotation.tailrec
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
+
+case class Lease(id: Long, ttl: Long)
+
+object RichListenableFuture {
+  implicit def convertToFuture[T](lf: ListenableFuture[T])(implicit ece: ExecutionContextExecutor): Future[T] = {
+    val p = Promise[T]()
+    Futures.addCallback(lf, new FutureCallback[T] {
+      def onFailure(t: Throwable): Unit = p failure t
+      def onSuccess(result: T): Unit = p success result
+    }, ece)
+    p.future
+  }
+}
+
+object EtcdClient {
+  // hostAndPorts format: {HOST}:{PORT}[,{HOST}:{PORT},{HOST}:{PORT}, ...]
+  def apply(hostAndPorts: String)(implicit ece: ExecutionContextExecutor): EtcdClient = {
+    require(hostAndPorts != null)
+    val client: Client = Client.forEndpoints(hostAndPorts).withPlainText().build()
+    new EtcdClient(client)(ece)
+  }
+
+  def apply(client: Client)(implicit ece: ExecutionContextExecutor): EtcdClient = {
+    new EtcdClient(client)(ece)
+  }
+}
+
+class EtcdClient(val client: Client)(override implicit val ece: ExecutionContextExecutor)
+    extends EtcdKeyValueApi
+    with EtcdLeaseApi
+    with EtcdWatchApi
+    with EtcdLeadershipApi {
+
+  def close() = {
+    client.close()
+  }
+}
+
+trait EtcdKeyValueApi extends KeyValueStore {
+  import RichListenableFuture._
+  protected[etcd] val client: Client
+
+  override def get(key: String): Future[RangeResponse] =
+    client.getKvClient.get(key).async()
+
+  override def getPrefix(prefixKey: String): Future[RangeResponse] = {
+    client.getKvClient.get(prefixKey).asPrefix().async()
+  }
+
+  override def getCount(prefixKey: String): Future[Long] = {
+    client.getKvClient.get(prefixKey).asPrefix().countOnly().async().map(_.getCount)
+  }
+
+  override def put(key: String, value: String): Future[PutResponse] =
+    client.getKvClient.put(key, value).async().recoverWith {
+      case t =>
+        Future.failed[PutResponse](getNestedException(t))
+    }
+
+  override def put(key: String, value: String, leaseId: Long): Future[PutResponse] =
+    client.getKvClient
+      .put(key, value, leaseId)
+      .async()
+      .recoverWith {
+        case t =>
+          Future.failed[PutResponse](getNestedException(t))
+      }
+
+  def put(key: String, value: Boolean): Future[PutResponse] = {
+    put(key, value.toString)
+  }
+
+  def put(key: String, value: Boolean, leaseId: Long): Future[PutResponse] = {
+    put(key, value.toString, leaseId)
+  }
+
+  override def del(key: String): Future[DeleteRangeResponse] =
+    client.getKvClient.delete(key).async().recoverWith {
+      case t =>
+        Future.failed[DeleteRangeResponse](getNestedException(t))
+    }
+
+  override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: Long): Future[TxnResponse] = {
+    client.getKvClient
+      .txnIf()
+      .cmpEqual(key)
+      .version(cmpVersion)
+      .`then`()
+      .put(client.getKvClient
+        .put(key, value.toString, leaseId)
+        .asRequest())
+      .async()
+      .recoverWith {
+        case t =>
+          Future.failed[TxnResponse](getNestedException(t))
+      }
+  }
+
+  @tailrec
+  private def getNestedException(t: Throwable): Throwable = {
+    if (t.getCause == null) t
+    else getNestedException(t.getCause)
+  }
+}
+
+trait KeyValueStore {

Review comment:
       Will this be used as an SPI? If so should it be more specific than `KeyValueStore` and related to the scheduler with the function templates




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org