You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/03/07 14:44:55 UTC

[incubator-openwhisk] branch master updated: Add namespace-blacklist to invoker. (#3391)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f3274d2  Add namespace-blacklist to invoker. (#3391)
f3274d2 is described below

commit f3274d2155036dbd025902353f7b77c076ad1f86
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Wed Mar 7 15:44:52 2018 +0100

    Add namespace-blacklist to invoker. (#3391)
    
    This commit adds a check to the invoker, if the current namespace is allowed to execute actions.
    This check is already done in the controller, but if the action is already in the queue and throttled afterwards, there is no chance to get this action out of the queue.
    With this commit, the invoker checks as well if the action is allowed to be executed. If it is not, it will be refused immediately. This will clean up the queue very fast.
---
 ...hrottlings_design_document_for_subjects_db.json |  10 ++
 ansible/roles/invoker/tasks/deploy.yml             |   1 +
 ansible/tasks/initdb.yml                           |   1 +
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   1 +
 .../main/scala/whisk/core/entity/WhiskStore.scala  |   2 +-
 core/invoker/src/main/resources/application.conf   |   5 +-
 .../main/scala/whisk/core/invoker/Invoker.scala    |   9 +-
 .../scala/whisk/core/invoker/InvokerReactive.scala |  61 ++++++----
 .../whisk/core/invoker/NamespaceBlacklist.scala    |  78 +++++++++++++
 .../invoker/test/NamespaceBlacklistTests.scala     | 130 +++++++++++++++++++++
 10 files changed, 264 insertions(+), 34 deletions(-)

diff --git a/ansible/files/namespace_throttlings_design_document_for_subjects_db.json b/ansible/files/namespace_throttlings_design_document_for_subjects_db.json
new file mode 100644
index 0000000..2346381
--- /dev/null
+++ b/ansible/files/namespace_throttlings_design_document_for_subjects_db.json
@@ -0,0 +1,10 @@
+{
+  "_id": "_design/namespaceThrottlings",
+  "views": {
+    "blockedNamespaces": {
+      "map": "function (doc) {\n  if (doc._id.indexOf(\"/limits\") >= 0) {\n    if (doc.concurrentInvocations === 0 || doc.invocationsPerMinute === 0) {\n      var namespace = doc._id.replace(\"/limits\", \"\");\n      emit(namespace, 1);\n    }\n  } else if (doc.subject && doc.namespaces && doc.blocked) {\n    doc.namespaces.forEach(function(namespace) {\n      emit(namespace.name, 1);\n    });\n  }\n}",
+      "reduce": "_sum"
+    }
+  },
+  "language": "javascript"
+}
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index bf91a50..c113f54 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -169,6 +169,7 @@
         -e DB_PASSWORD='{{ db_password }}'
         -e DB_WHISK_ACTIONS='{{ db.whisk.actions }}'
         -e DB_WHISK_ACTIVATIONS='{{ db.whisk.activations }}'
+        -e DB_WHISK_AUTHS='{{ db.whisk.auth }}'
         -e CONFIG_whisk_db_actionsDdoc='{{ db_whisk_actions_ddoc | default() }}'
         -e CONFIG_whisk_db_activationsDdoc='{{ db_whisk_activations_ddoc | default() }}'
         -e CONFIG_whisk_db_activationsFilterDdoc='{{ db_whisk_activations_filter_ddoc | default() }}'
diff --git a/ansible/tasks/initdb.yml b/ansible/tasks/initdb.yml
index 46db9f8..5a8665a 100644
--- a/ansible/tasks/initdb.yml
+++ b/ansible/tasks/initdb.yml
@@ -14,6 +14,7 @@
   with_items:
     - "{{ openwhisk_home }}/ansible/files/auth_index.json"
     - "{{ openwhisk_home }}/ansible/files/filter_design_document.json"
+    - "{{ openwhisk_home }}/ansible/files/namespace_throttlings_design_document_for_subjects_db.json"
 
 - name: create necessary "auth" keys
   include: db/recreateDoc.yml
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 1ff53fb..a5e1fe1 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -256,6 +256,7 @@ object ConfigKeys {
   val runcTimeouts = s"$runc.timeouts"
   val containerFactory = "whisk.container-factory"
   val containerArgs = s"$containerFactory.container-args"
+  val blacklist = "whisk.blacklist"
 
   val kubernetes = "whisk.kubernetes"
   val kubernetesTimeouts = s"$kubernetes.timeouts"
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
index 0a1417e..016363a 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskStore.scala
@@ -155,7 +155,7 @@ object WhiskActivationStore {
  * @param ddoc the design document
  * @param view the view name within the design doc
  */
-protected[core] class View(ddoc: String, view: String) {
+protected[core] case class View(ddoc: String, view: String) {
 
   /** The name of the table to query. */
   val name = s"$ddoc/$view"
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index c63e1b7..7f85eb5 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -3,6 +3,10 @@ include "logging"
 include "akka-http-version"
 
 whisk {
+  blacklist {
+    poll-interval: 5 minutes
+  }
+
   # Timeouts for docker commands. Set to "Inf" to disable timeout.
   docker.timeouts {
     run: 1 minute
@@ -40,6 +44,5 @@ whisk {
     network: bridge
     dns-servers: []
     extra-args: {}
-
   }
 }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4a9f071..d6d2735 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -22,13 +22,10 @@ import scala.concurrent.duration._
 import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Try
-
 import kamon.Kamon
-
 import org.apache.curator.retry.RetryUntilElapsed
 import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.framework.recipes.shared.SharedCount
-
 import akka.Done
 import akka.actor.ActorSystem
 import akka.actor.CoordinatedShutdown
@@ -39,10 +36,7 @@ import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig._
 import whisk.core.connector.MessagingProvider
 import whisk.core.connector.PingMessage
-import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
-import whisk.core.entity.WhiskActivationStore
-import whisk.core.entity.WhiskEntityStore
+import whisk.core.entity._
 import whisk.http.BasicHttpService
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
@@ -60,6 +54,7 @@ object Invoker {
       ExecManifest.requiredProperties ++
       WhiskEntityStore.requiredProperties ++
       WhiskActivationStore.requiredProperties ++
+      WhiskAuthStore.requiredProperties ++
       kafkaHosts ++
       zookeeperHosts ++
       wskApiHost ++ Map(
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 0c9cdb4..77c63fa 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -19,37 +19,27 @@ package whisk.core.invoker
 
 import java.nio.charset.StandardCharsets
 import java.time.Instant
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-import org.apache.kafka.common.errors.RecordTooLargeException
-import akka.actor.ActorRefFactory
-import akka.actor.ActorSystem
-import akka.actor.Props
+
+import akka.actor.{ActorRefFactory, ActorSystem, Props}
+import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
+import org.apache.kafka.common.errors.RecordTooLargeException
+import pureconfig._
 import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
-import whisk.core.WhiskConfig
-import whisk.core.connector.ActivationMessage
-import whisk.core.connector.CompletionMessage
-import whisk.core.connector.MessageFeed
-import whisk.core.connector.MessageProducer
-import whisk.core.connector.MessagingProvider
-import whisk.core.containerpool.ContainerFactoryProvider
-import whisk.core.containerpool.ContainerPool
-import whisk.core.containerpool.ContainerProxy
-import whisk.core.containerpool.PrewarmingConfig
-import whisk.core.containerpool.Run
+import whisk.common.{Logging, LoggingMarkers, Scheduler, TransactionId}
+import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.core.connector._
+import whisk.core.containerpool._
 import whisk.core.containerpool.logging.LogStoreProvider
 import whisk.core.database._
 import whisk.core.entity._
 import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.spi.SpiLoader
-import akka.event.Logging.InfoLevel
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
 
 class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: MessageProducer)(
   implicit actorSystem: ActorSystem,
@@ -87,6 +77,17 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
   /** Initialize needed databases */
   private val entityStore = WhiskEntityStore.datastore(config)
   private val activationStore = WhiskActivationStore.datastore(config)
+  private val authStore = WhiskAuthStore.datastore(config)
+
+  private val namespaceBlacklist = new NamespaceBlacklist(authStore)
+
+  Scheduler.scheduleWaitAtMost(loadConfigOrThrow[NamespaceBlacklistConfig](ConfigKeys.blacklist).pollInterval) { () =>
+    logging.debug(this, "running background job to update blacklist")
+    namespaceBlacklist.refreshBlacklist()(ec, TransactionId.invoker).andThen {
+      case Success(set) => logging.info(this, s"updated blacklist to ${set.size} entries")
+      case Failure(t)   => logging.error(this, s"error on updating the blacklist: ${t.getMessage}")
+    }
+  }
 
   /** Initialize message consumers */
   val topic = s"invoker${instance.toInt}"
@@ -160,7 +161,14 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
   /** Is called when an ActivationMessage is read from Kafka */
   def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
     Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
-      .flatMap(Future.fromTry(_))
+      .flatMap(Future.fromTry)
+      .flatMap { msg =>
+        if (!namespaceBlacklist.isBlacklisted(msg.user)) {
+          Future.successful(msg)
+        } else {
+          Future.failed(NamespaceBlacklistedException(msg.user.namespace.name))
+        }
+      }
       .filter(_.action.version.isDefined)
       .flatMap { msg =>
         implicit val transid = msg.transid
@@ -235,7 +243,10 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
           // Iff everything above failed, we have a terminal error at hand. Either the message failed
           // to deserialize, or something threw an error where it is not expected to throw.
           activationFeed ! MessageFeed.Processed
-          logging.error(this, s"terminal failure while processing message: $t")
+          t match {
+            case nse: NamespaceBlacklistedException => logging.warn(this, nse.getMessage)
+            case _                                  => logging.error(this, s"terminal failure while processing message: $t")
+          }
           Future.successful(())
       }
   }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala
new file mode 100644
index 0000000..9909c82
--- /dev/null
+++ b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.invoker
+
+import whisk.common.TransactionId
+import whisk.core.database.StaleParameter
+import whisk.core.entity.{Identity, View}
+import whisk.core.entity.types.AuthStore
+
+import scala.concurrent.{ExecutionContext, Future}
+import spray.json.DefaultJsonProtocol._
+
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * The namespace blacklist gets all namespaces that are throttled to 0 or blocked from the database.
+ *
+ * The caller is responsible for periodically updating the blacklist with `refreshBlacklist`.
+ *
+ * @param authStore Subjects database with the limit-documents.
+ */
+class NamespaceBlacklist(authStore: AuthStore) {
+
+  private var blacklist: Set[String] = Set.empty
+
+  /**
+   * Check if the identity, who invoked the activation is in the blacklist.
+   *
+   * @param identity which invoked the action.
+   * @return whether or not the current identity is considered blacklisted
+   */
+  def isBlacklisted(identity: Identity): Boolean = blacklist.contains(identity.namespace.name)
+
+  /** Refreshes the current blacklist from the database. */
+  def refreshBlacklist()(implicit ec: ExecutionContext, tid: TransactionId): Future[Set[String]] = {
+    authStore
+      .query(
+        table = NamespaceBlacklist.view.name,
+        startKey = List.empty,
+        endKey = List.empty,
+        skip = 0,
+        limit = Int.MaxValue,
+        includeDocs = false,
+        descending = true,
+        reduce = false,
+        stale = StaleParameter.UpdateAfter)
+      .map(_.map(_.fields("key").convertTo[String]).toSet)
+      .map { newBlacklist =>
+        blacklist = newBlacklist
+        newBlacklist
+      }
+  }
+}
+
+object NamespaceBlacklist {
+  val view = View("namespaceThrottlings", "blockedNamespaces")
+}
+
+/** Configuration relevant to the namespace blacklist */
+case class NamespaceBlacklistConfig(pollInterval: FiniteDuration)
+
+/** Indicates the activation was stopped due to a blacklisted identity */
+case class NamespaceBlacklistedException(ns: String) extends Exception(s"Namespace $ns was blocked in invoker.")
diff --git a/tests/src/test/scala/whisk/core/invoker/test/NamespaceBlacklistTests.scala b/tests/src/test/scala/whisk/core/invoker/test/NamespaceBlacklistTests.scala
new file mode 100644
index 0000000..89670f1
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/invoker/test/NamespaceBlacklistTests.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.invoker.test
+
+import akka.stream.ActorMaterializer
+import common.{StreamLogging, WhiskProperties, WskActorSystem}
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.WhiskConfig
+import whisk.core.database.test.{DbUtils, ExtendedCouchDbRestClient}
+import whisk.core.entity._
+import whisk.core.invoker.NamespaceBlacklist
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class NamespaceBlacklistTests
+    extends FlatSpec
+    with Matchers
+    with DbUtils
+    with ScalaFutures
+    with IntegrationPatience
+    with WskActorSystem
+    with StreamLogging {
+
+  behavior of "NamespaceBlacklist"
+
+  val config = new WhiskConfig(WhiskAuthStore.requiredProperties)
+
+  implicit val materializer = ActorMaterializer()
+  implicit val tid = TransactionId.testing
+
+  val authStore = WhiskAuthStore.datastore(config)
+  val subjectsDb = new ExtendedCouchDbRestClient(
+    WhiskProperties.getProperty(WhiskConfig.dbProtocol),
+    WhiskProperties.getProperty(WhiskConfig.dbHost),
+    WhiskProperties.getProperty(WhiskConfig.dbPort).toInt,
+    WhiskProperties.getProperty(WhiskConfig.dbUsername),
+    WhiskProperties.getProperty(WhiskConfig.dbPassword),
+    WhiskProperties.getProperty(WhiskConfig.dbAuths))
+
+  /* Identities needed for the first test */
+  val identities = Seq(
+    Identity(Subject(), EntityName("testnamespace1"), AuthKey(), Set.empty, UserLimits(invocationsPerMinute = Some(0))),
+    Identity(
+      Subject(),
+      EntityName("testnamespace2"),
+      AuthKey(),
+      Set.empty,
+      UserLimits(concurrentInvocations = Some(0))),
+    Identity(
+      Subject(),
+      EntityName("testnamespace3"),
+      AuthKey(),
+      Set.empty,
+      UserLimits(invocationsPerMinute = Some(1), concurrentInvocations = Some(1))))
+
+  /* Subject document needed for the second test */
+  val subject = WhiskAuth(
+    Subject(),
+    Set(WhiskNamespace(EntityName("different1"), AuthKey()), WhiskNamespace(EntityName("different2"), AuthKey())))
+  val blockedSubject = JsObject(subject.toJson.fields + ("blocked" -> true.toJson))
+
+  val blockedNamespacesCount = 2 + subject.namespaces.size
+
+  def authToIdentities(auth: WhiskAuth): Set[Identity] = {
+    auth.namespaces.map { ns =>
+      Identity(auth.subject, ns.name, ns.authkey, Set(), UserLimits())
+    }
+  }
+
+  override def beforeAll() = {
+    val documents = identities.map { i =>
+      (i.namespace.name + "/limits", i.limits.toJson.asJsObject)
+    } :+ (subject.subject.asString, blockedSubject)
+
+    // Add all documents to the database
+    documents.foreach { case (id, doc) => subjectsDb.putDoc(id, doc).futureValue }
+
+    // Waits for the 2 blocked identities + the namespaces of the blocked subject
+    waitOnView(subjectsDb, NamespaceBlacklist.view.ddoc, NamespaceBlacklist.view.view, blockedNamespacesCount)(
+      executionContext,
+      1.minute)
+  }
+
+  override def afterAll() = {
+    val ids = identities.map(_.namespace.name + "/limits") :+ subject.subject.asString
+
+    // Force remove all documents with those ids by first getting and then deleting the documents
+    ids.foreach { id =>
+      val docE = subjectsDb.getDoc(id).futureValue
+      docE shouldBe 'right
+      val doc = docE.right.get
+      subjectsDb
+        .deleteDoc(doc.fields("_id").convertTo[String], doc.fields("_rev").convertTo[String])
+        .futureValue
+    }
+
+    super.afterAll()
+  }
+
+  it should "mark a namespace as blocked if limit is 0 in database or if one of its subjects is blocked" in {
+    val blacklist = new NamespaceBlacklist(authStore)
+
+    blacklist.refreshBlacklist().futureValue should have size blockedNamespacesCount
+
+    identities.map(blacklist.isBlacklisted) shouldBe Seq(true, true, false)
+    authToIdentities(subject).toSeq.map(blacklist.isBlacklisted) shouldBe Seq(true, true)
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
rabbah@apache.org.