You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ji...@apache.org on 2021/05/06 01:36:52 UTC
[openwhisk] branch master updated: [New Scheduler]Implement
PFCInvokerServer (#5098)
This is an automated email from the ASF dual-hosted git repository.
jiangpengcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e036fc9 [New Scheduler]Implement PFCInvokerServer (#5098)
e036fc9 is described below
commit e036fc9823b8e015a88e1a44d3f282698ae62fe7
Author: ningyougang <41...@qq.com>
AuthorDate: Thu May 6 09:36:30 2021 +0800
[New Scheduler]Implement PFCInvokerServer (#5098)
* Implement PFCInvokerServer
* Remove /memory api
* Default implement enable/disable
---
ansible/group_vars/all | 2 +
ansible/roles/invoker/tasks/deploy.yml | 2 +
.../core/invoker/DefaultInvokerServer.scala | 65 +++++++++
.../openwhisk/core/invoker/FPCInvokerServer.scala | 65 +++++++++
.../apache/openwhisk/core/invoker/Invoker.scala | 12 +-
.../openwhisk/core/invoker/InvokerReactive.scala | 10 ++
tests/src/test/scala/common/WhiskProperties.java | 4 +
.../invoker/test/DefaultInvokerServerTests.scala | 145 +++++++++++++++++++++
.../core/invoker/test/FPCInvokerServerTests.scala | 144 ++++++++++++++++++++
9 files changed, 442 insertions(+), 7 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 006d158..6e8a861 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -219,6 +219,8 @@ invoker:
keystore:
password: "{{ invoker_keystore_password | default('openwhisk') }}"
name: "{{ __invoker_ssl_keyPrefix }}openwhisk-keystore.p12"
+ reactiveSpi: "{{ invokerReactive_spi | default('') }}"
+ serverSpi: "{{ invokerServer_spi | default('') }}"
userLogs:
spi: "{{ userLogs_spi | default('org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index ea4ce48..8c3c027 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -259,6 +259,8 @@
"CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
"CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
"CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
+ "CONFIG_whisk_spi_InvokerProvider": "{{ invoker.reactiveSpi }}"
+ "CONFIG_whisk_spi_InvokerServerProvider": "{{ invoker.serverSpi }}"
"CONFIG_logback_log_level": "{{ invoker.loglevel }}"
"CONFIG_whisk_memory_min": "{{ limit_action_memory_min | default() }}"
"CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
new file mode 100644
index 0000000..d372be7
--- /dev/null
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json.PrettyPrinter
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ */
+class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
+ implicit val ec: ExecutionContext,
+ val actorSystem: ActorSystem,
+ val logger: Logging)
+ extends BasicRasService {
+
+ /** Pretty print JSON response. */
+ implicit val jsonPrettyResponsePrinter = PrettyPrinter
+
+ override def routes(implicit transid: TransactionId): Route = {
+ super.routes ~ extractCredentials {
+ case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+ (path("enable") & post) {
+ invoker.enable()
+ } ~ (path("disable") & post) {
+ invoker.disable()
+ }
+ case _ => terminate(StatusCodes.Unauthorized)
+ }
+ }
+}
+
+object DefaultInvokerServer extends InvokerServerProvider {
+
+ // TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
+ val invokerUsername = "admin"
+ val invokerPassword = "admin"
+
+ override def instance(
+ invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
+ new DefaultInvokerServer(invoker, invokerUsername, invokerPassword)
+}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
new file mode 100644
index 0000000..f1b8e8c
--- /dev/null
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.http.BasicRasService
+import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json.PrettyPrinter
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Implements web server to handle certain REST API calls.
+ */
+class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
+ implicit val ec: ExecutionContext,
+ val actorSystem: ActorSystem,
+ val logger: Logging)
+ extends BasicRasService {
+
+ /** Pretty print JSON response. */
+ implicit val jsonPrettyResponsePrinter = PrettyPrinter
+
+ override def routes(implicit transid: TransactionId): Route = {
+ super.routes ~ extractCredentials {
+ case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
+ (path("enable") & post) {
+ invoker.enable()
+ } ~ (path("disable") & post) {
+ invoker.disable()
+ }
+ case _ => terminate(StatusCodes.Unauthorized)
+ }
+ }
+}
+
+object FPCInvokerServer extends InvokerServerProvider {
+
+ // TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
+ val invokerUsername = "admin"
+ val invokerPassword = "admin"
+
+ override def instance(
+ invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
+ new FPCInvokerServer(invoker, invokerUsername, invokerPassword)
+}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 1b0c8bf..3d9cc46 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker
import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
+import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
@@ -217,7 +218,10 @@ trait InvokerProvider extends Spi {
}
// this trait can be used to add common implementation
-trait InvokerCore {}
+trait InvokerCore {
+ def enable(): Route
+ def disable(): Route
+}
/**
* An Spi for providing RestAPI implementation for invoker.
@@ -227,9 +231,3 @@ trait InvokerServerProvider extends Spi {
def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
}
-
-object DefaultInvokerServer extends InvokerServerProvider {
- override def instance(
- invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
- new BasicRasService {}
-}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 6aa0884..db0dfb4 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -23,6 +23,8 @@ import java.time.Instant
import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.event.Logging.InfoLevel
+import akka.http.scaladsl.server.Directives.complete
+import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common._
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
@@ -299,4 +301,12 @@ class InvokerReactive(
}
})
+ override def enable(): Route = {
+ complete("not supported")
+ }
+
+ override def disable(): Route = {
+ complete("not supported")
+ }
+
}
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 05d8b32..19fc3ed 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -270,6 +270,10 @@ public class WhiskProperties {
return getBaseControllerHost() + ":" + getControllerBasePort();
}
+ public static String getBaseInvokerAddress(){
+ return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort");
+ }
+
public static int getMaxActionInvokesPerMinute() {
String valStr = whiskProperties.getProperty("limits.actions.invokes.perMinute");
return Integer.parseInt(valStr);
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
new file mode 100644
index 0000000..153e228
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker.test
+
+import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
+import org.apache.openwhisk.http.BasicHttpService
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+/**
+ * Tests InvokerServer API.
+ */
+@RunWith(classOf[JUnitRunner])
+class DefaultInvokerServerTests
+ extends FlatSpec
+ with BeforeAndAfterEach
+ with BeforeAndAfterAll
+ with ScalatestRouteTest
+ with Matchers
+ with StreamLogging
+ with MockFactory {
+
+ def transid() = TransactionId("tid")
+
+ val systemUsername = "username"
+ val systemPassword = "password"
+
+ val reactive = new TestInvokerReactive
+ val server = new DefaultInvokerServer(reactive, systemUsername, systemPassword)
+
+ override protected def afterEach(): Unit = reactive.reset()
+
+ /** DefaultInvokerServer API tests */
+ behavior of "DefaultInvokerServer API"
+
+ it should "enable invoker" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Post(s"/enable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ reactive.enableCount shouldBe 1
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+ it should "disable invoker" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 1
+ }
+ }
+
+ it should "not enable invoker with invalid credential" in {
+ implicit val tid = transid()
+ val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+ Post(s"/enable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+ it should "not disable invoker with invalid credential" in {
+ implicit val tid = transid()
+ val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+ Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+ it should "not enable invoker with empty credential" in {
+ implicit val tid = transid()
+ Post(s"/enable") ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+ it should "not disable invoker with empty credential" in {
+ implicit val tid = transid()
+ Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+}
+
+class TestInvokerReactive extends InvokerCore with BasicHttpService {
+ var enableCount = 0
+ var disableCount = 0
+
+ override def enable(): Route = {
+ enableCount += 1
+ complete("")
+ }
+
+ override def disable(): Route = {
+ disableCount += 1
+ complete("")
+ }
+
+ def reset(): Unit = {
+ enableCount = 0
+ disableCount = 0
+ }
+
+ /**
+ * Gets the routes implemented by the HTTP service.
+ *
+ * @param transid the id for the transaction (every request is assigned an id)
+ */
+ override def routes(implicit transid: TransactionId): Route = ???
+
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
new file mode 100644
index 0000000..8cf8d91
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker.test
+
+import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
+import org.apache.openwhisk.http.BasicHttpService
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+/**
+ * Tests InvokerServerV2 API.
+ */
+@RunWith(classOf[JUnitRunner])
+class FPCInvokerServerTests
+ extends FlatSpec
+ with BeforeAndAfterEach
+ with BeforeAndAfterAll
+ with ScalatestRouteTest
+ with Matchers
+ with StreamLogging
+ with MockFactory {
+
+ def transid() = TransactionId("tid")
+
+ val systemUsername = "username"
+ val systemPassword = "password"
+
+ val reactive = new TestFPCInvokerReactive
+ val server = new FPCInvokerServer(reactive, systemUsername, systemPassword)
+
+ override protected def afterEach(): Unit = reactive.reset()
+
+ /** FPCInvokerServer API tests */
+ behavior of "FPCInvokerServer API"
+
+ it should "enable invoker" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Post(s"/enable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ reactive.enableCount shouldBe 1
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+ it should "disable invoker" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 1
+ }
+ }
+
+ it should "not enable invoker with invalid credential" in {
+ implicit val tid = transid()
+ val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+ Post(s"/enable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+ it should "not disable invoker with invalid credential" in {
+ implicit val tid = transid()
+ val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+ Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+ it should "not enable invoker with empty credential" in {
+ implicit val tid = transid()
+ Post(s"/enable") ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 0
+ }
+ }
+
+ it should "not disable invoker with empty credential" in {
+ implicit val tid = transid()
+ Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ reactive.enableCount shouldBe 0
+ reactive.disableCount shouldBe 0
+ }
+ }
+}
+
+class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
+ var enableCount = 0
+ var disableCount = 0
+
+ override def enable(): Route = {
+ enableCount += 1
+ complete("")
+ }
+
+ override def disable(): Route = {
+ disableCount += 1
+ complete("")
+ }
+
+ def reset(): Unit = {
+ enableCount = 0
+ disableCount = 0
+ }
+
+ /**
+ * Gets the routes implemented by the HTTP service.
+ *
+ * @param transid the id for the transaction (every request is assigned an id)
+ */
+ override def routes(implicit transid: TransactionId): Route = ???
+
+}