You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ji...@apache.org on 2021/05/06 01:36:52 UTC

[openwhisk] branch master updated: [New Scheduler]Implement PFCInvokerServer (#5098)

This is an automated email from the ASF dual-hosted git repository.

jiangpengcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e036fc9  [New Scheduler]Implement PFCInvokerServer (#5098)
e036fc9 is described below

commit e036fc9823b8e015a88e1a44d3f282698ae62fe7
Author: ningyougang <41...@qq.com>
AuthorDate: Thu May 6 09:36:30 2021 +0800

    [New Scheduler]Implement PFCInvokerServer (#5098)
    
    * Implement PFCInvokerServer
    
    * Remove /memory api
    
    * Default implement enable/disable
---
 ansible/group_vars/all                             |   2 +
 ansible/roles/invoker/tasks/deploy.yml             |   2 +
 .../core/invoker/DefaultInvokerServer.scala        |  65 +++++++++
 .../openwhisk/core/invoker/FPCInvokerServer.scala  |  65 +++++++++
 .../apache/openwhisk/core/invoker/Invoker.scala    |  12 +-
 .../openwhisk/core/invoker/InvokerReactive.scala   |  10 ++
 tests/src/test/scala/common/WhiskProperties.java   |   4 +
 .../invoker/test/DefaultInvokerServerTests.scala   | 145 +++++++++++++++++++++
 .../core/invoker/test/FPCInvokerServerTests.scala  | 144 ++++++++++++++++++++
 9 files changed, 442 insertions(+), 7 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 006d158..6e8a861 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -219,6 +219,8 @@ invoker:
     keystore:
       password: "{{ invoker_keystore_password | default('openwhisk') }}"
       name: "{{ __invoker_ssl_keyPrefix }}openwhisk-keystore.p12"
+  reactiveSpi: "{{ invokerReactive_spi | default('') }}"
+  serverSpi: "{{ invokerServer_spi | default('') }}"
 
 userLogs:
   spi: "{{ userLogs_spi | default('org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index ea4ce48..8c3c027 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -259,6 +259,8 @@
       "CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
       "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
       "CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
+      "CONFIG_whisk_spi_InvokerProvider": "{{ invoker.reactiveSpi }}"
+      "CONFIG_whisk_spi_InvokerServerProvider": "{{ invoker.serverSpi }}"
       "CONFIG_logback_log_level": "{{ invoker.loglevel }}"
       "CONFIG_whisk_memory_min": "{{ limit_action_memory_min | default() }}"
       "CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
new file mode 100644
index 0000000..d372be7
--- /dev/null
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json.PrettyPrinter
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ */
+class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  val actorSystem: ActorSystem,
+  val logger: Logging)
+    extends BasicRasService {
+
+  /** Pretty print JSON response. */
+  implicit val jsonPrettyResponsePrinter = PrettyPrinter
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("enable") & post) {
+          invoker.enable()
+        } ~ (path("disable") & post) {
+          invoker.disable()
+        }
+      case _ => terminate(StatusCodes.Unauthorized)
+    }
+  }
+}
+
+object DefaultInvokerServer extends InvokerServerProvider {
+
+  // TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
+  val invokerUsername = "admin"
+  val invokerPassword = "admin"
+
+  override def instance(
+    invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
+    new DefaultInvokerServer(invoker, invokerUsername, invokerPassword)
+}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
new file mode 100644
index 0000000..f1b8e8c
--- /dev/null
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json.PrettyPrinter
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ */
+class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
+  implicit val ec: ExecutionContext,
+  val actorSystem: ActorSystem,
+  val logger: Logging)
+    extends BasicRasService {
+
+  /** Pretty print JSON response. */
+  implicit val jsonPrettyResponsePrinter = PrettyPrinter
+
+  override def routes(implicit transid: TransactionId): Route = {
+    super.routes ~ extractCredentials {
+      case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+        (path("enable") & post) {
+          invoker.enable()
+        } ~ (path("disable") & post) {
+          invoker.disable()
+        }
+      case _ => terminate(StatusCodes.Unauthorized)
+    }
+  }
+}
+
+object FPCInvokerServer extends InvokerServerProvider {
+
+  // TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
+  val invokerUsername = "admin"
+  val invokerPassword = "admin"
+
+  override def instance(
+    invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
+    new FPCInvokerServer(invoker, invokerUsername, invokerPassword)
+}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 1b0c8bf..3d9cc46 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker
 
 import akka.Done
 import akka.actor.{ActorSystem, CoordinatedShutdown}
+import akka.http.scaladsl.server.Route
 import akka.stream.ActorMaterializer
 import com.typesafe.config.ConfigValueFactory
 import kamon.Kamon
@@ -217,7 +218,10 @@ trait InvokerProvider extends Spi {
 }
 
 // this trait can be used to add common implementation
-trait InvokerCore {}
+trait InvokerCore {
+  def enable(): Route
+  def disable(): Route
+}
 
 /**
  * An Spi for providing RestAPI implementation for invoker.
@@ -227,9 +231,3 @@ trait InvokerServerProvider extends Spi {
   def instance(
     invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
 }
-
-object DefaultInvokerServer extends InvokerServerProvider {
-  override def instance(
-    invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
-    new BasicRasService {}
-}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 6aa0884..db0dfb4 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -23,6 +23,8 @@ import java.time.Instant
 import akka.Done
 import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
 import akka.event.Logging.InfoLevel
+import akka.http.scaladsl.server.Directives.complete
+import akka.http.scaladsl.server.Route
 import akka.stream.ActorMaterializer
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.common.tracing.WhiskTracerProvider
@@ -299,4 +301,12 @@ class InvokerReactive(
     }
   })
 
+  override def enable(): Route = {
+    complete("not supported")
+  }
+
+  override def disable(): Route = {
+    complete("not supported")
+  }
+
 }
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 05d8b32..19fc3ed 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -270,6 +270,10 @@ public class WhiskProperties {
         return getBaseControllerHost() + ":" + getControllerBasePort();
     }
 
+    public static String getBaseInvokerAddress(){
+        return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort");
+    }
+
     public static int getMaxActionInvokesPerMinute() {
         String valStr = whiskProperties.getProperty("limits.actions.invokes.perMinute");
         return Integer.parseInt(valStr);
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
new file mode 100644
index 0000000..153e228
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker.test
+
+import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
+import org.apache.openwhisk.http.BasicHttpService
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+/**
+ * Tests InvokerServer API.
+ */
+@RunWith(classOf[JUnitRunner])
+class DefaultInvokerServerTests
+    extends FlatSpec
+    with BeforeAndAfterEach
+    with BeforeAndAfterAll
+    with ScalatestRouteTest
+    with Matchers
+    with StreamLogging
+    with MockFactory {
+
+  def transid() = TransactionId("tid")
+
+  val systemUsername = "username"
+  val systemPassword = "password"
+
+  val reactive = new TestInvokerReactive
+  val server = new DefaultInvokerServer(reactive, systemUsername, systemPassword)
+
+  override protected def afterEach(): Unit = reactive.reset()
+
+  /** DefaultInvokerServer API tests */
+  behavior of "DefaultInvokerServer API"
+
+  it should "enable invoker" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Post(s"/enable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      reactive.enableCount shouldBe 1
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+  it should "disable invoker" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 1
+    }
+  }
+
+  it should "not enable invoker with invalid credential" in {
+    implicit val tid = transid()
+    val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+    Post(s"/enable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+  it should "not disable invoker with invalid credential" in {
+    implicit val tid = transid()
+    val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+    Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+  it should "not enable invoker with empty credential" in {
+    implicit val tid = transid()
+    Post(s"/enable") ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+  it should "not disable invoker with empty credential" in {
+    implicit val tid = transid()
+    Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+}
+
+class TestInvokerReactive extends InvokerCore with BasicHttpService {
+  var enableCount = 0
+  var disableCount = 0
+
+  override def enable(): Route = {
+    enableCount += 1
+    complete("")
+  }
+
+  override def disable(): Route = {
+    disableCount += 1
+    complete("")
+  }
+
+  def reset(): Unit = {
+    enableCount = 0
+    disableCount = 0
+  }
+
+  /**
+   * Gets the routes implemented by the HTTP service.
+   *
+   * @param transid the id for the transaction (every request is assigned an id)
+   */
+  override def routes(implicit transid: TransactionId): Route = ???
+
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
new file mode 100644
index 0000000..8cf8d91
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker.test
+
+import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
+import org.apache.openwhisk.http.BasicHttpService
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+/**
+ * Tests InvokerServerV2 API.
+ */
+@RunWith(classOf[JUnitRunner])
+class FPCInvokerServerTests
+    extends FlatSpec
+    with BeforeAndAfterEach
+    with BeforeAndAfterAll
+    with ScalatestRouteTest
+    with Matchers
+    with StreamLogging
+    with MockFactory {
+
+  def transid() = TransactionId("tid")
+
+  val systemUsername = "username"
+  val systemPassword = "password"
+
+  val reactive = new TestFPCInvokerReactive
+  val server = new FPCInvokerServer(reactive, systemUsername, systemPassword)
+
+  override protected def afterEach(): Unit = reactive.reset()
+
+  /** FPCInvokerServer API tests */
+  behavior of "FPCInvokerServer API"
+
+  it should "enable invoker" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Post(s"/enable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      reactive.enableCount shouldBe 1
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+  it should "disable invoker" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 1
+    }
+  }
+
+  it should "not enable invoker with invalid credential" in {
+    implicit val tid = transid()
+    val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+    Post(s"/enable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+  it should "not disable invoker with invalid credential" in {
+    implicit val tid = transid()
+    val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+    Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+  it should "not enable invoker with empty credential" in {
+    implicit val tid = transid()
+    Post(s"/enable") ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 0
+    }
+  }
+
+  it should "not disable invoker with empty credential" in {
+    implicit val tid = transid()
+    Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      reactive.enableCount shouldBe 0
+      reactive.disableCount shouldBe 0
+    }
+  }
+}
+
+class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
+  var enableCount = 0
+  var disableCount = 0
+
+  override def enable(): Route = {
+    enableCount += 1
+    complete("")
+  }
+
+  override def disable(): Route = {
+    disableCount += 1
+    complete("")
+  }
+
+  def reset(): Unit = {
+    enableCount = 0
+    disableCount = 0
+  }
+
+  /**
+   * Gets the routes implemented by the HTTP service.
+   *
+   * @param transid the id for the transaction (every request is assigned an id)
+   */
+  override def routes(implicit transid: TransactionId): Route = ???
+
+}