You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2018/04/23 17:38:51 UTC

[incubator-openwhisk] branch master updated: Skip log collection if the log-limit is set to "0". (#3563)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b15f2dd  Skip log collection if the log-limit is set to "0". (#3563)
b15f2dd is described below

commit b15f2dda5151d35b4ab2e9073570388159af991d
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Apr 23 19:38:48 2018 +0200

    Skip log collection if the log-limit is set to "0". (#3563)
    
    * Skip log collection if the log-limit is set to "0".
    
    Enables the per-action log-limit configuration to be read from config (min/max/std values respectively). It also skips the entire log-collecting process if that limit is set to 0 (no logs allowed).
    
    * Refactor LogLimit constructor, add test
    
    * Review comment.
---
 common/scala/src/main/resources/application.conf   |  7 +++
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  1 +
 .../src/main/scala/whisk/core/entity/Limits.scala  |  7 +--
 .../main/scala/whisk/core/entity/LogLimit.scala    | 27 +++++----
 .../whisk/core/containerpool/ContainerProxy.scala  | 31 +++++-----
 .../containerpool/test/ContainerProxyTests.scala   | 68 ++++++++++++++++------
 .../scala/whisk/core/entity/test/SchemaTests.scala | 10 ++--
 .../whisk/core/limits/ActionLimitsTests.scala      | 20 +++----
 8 files changed, 108 insertions(+), 63 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index e3fadaf..c12eb02 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -156,6 +156,13 @@ whisk {
         std = 256 m
     }
 
+    # action log-limit configuration
+    log-limit {
+        min = 0 m
+        max = 10 m
+        std = 10 m
+    }
+
     mesos {
         master-url = "http://localhost:5050" //your mesos master
         master-public-url = "http://localhost:5050" // if mesos-link-log-message == true, this link will be included with the static log message (may or may not be different from master-url)
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 36f74e0..1fd0154 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -217,6 +217,7 @@ object ConfigKeys {
 
   val memory = "whisk.memory"
   val timeLimit = "whisk.time-limit"
+  val logLimit = "whisk.log-limit"
   val activation = "whisk.activation"
   val activationPayload = s"$activation.payload"
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/Limits.scala b/common/scala/src/main/scala/whisk/core/entity/Limits.scala
index a2294e0..5937df7 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Limits.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Limits.scala
@@ -45,7 +45,9 @@ protected[entity] abstract class Limits {
  * @param memory the memory limit in megabytes, assured to be non-null because it is a value
  * @param logs the limit for logs written by the container and stored in the activation record, assured to be non-null because it is a value
  */
-protected[core] case class ActionLimits protected[core] (timeout: TimeLimit, memory: MemoryLimit, logs: LogLimit)
+protected[core] case class ActionLimits(timeout: TimeLimit = TimeLimit(),
+                                        memory: MemoryLimit = MemoryLimit(),
+                                        logs: LogLimit = LogLimit())
     extends Limits {
   override protected[entity] def toJson = ActionLimits.serdes.write(this)
 }
@@ -59,9 +61,6 @@ protected[core] case class TriggerLimits protected[core] () extends Limits {
 
 protected[core] object ActionLimits extends ArgNormalizer[ActionLimits] with DefaultJsonProtocol {
 
-  /** Creates a ActionLimits instance with default duration, memory and log limits. */
-  protected[core] def apply(): ActionLimits = ActionLimits(TimeLimit(), MemoryLimit(), LogLimit())
-
   override protected[core] implicit val serdes = new RootJsonFormat[ActionLimits] {
     val helper = jsonFormat3(ActionLimits.apply)
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/LogLimit.scala b/common/scala/src/main/scala/whisk/core/entity/LogLimit.scala
index 0b0d88c..6f3abef 100644
--- a/common/scala/src/main/scala/whisk/core/entity/LogLimit.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/LogLimit.scala
@@ -17,16 +17,17 @@
 
 package whisk.core.entity
 
+import pureconfig.loadConfigOrThrow
+
 import scala.language.postfixOps
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
+import spray.json._
+import whisk.core.ConfigKeys
+import whisk.core.entity.size._
 
-import spray.json.JsNumber
-import spray.json.JsValue
-import spray.json.RootJsonFormat
-import spray.json.deserializationError
-import whisk.core.entity.size.SizeInt
+case class LogLimitConfig(min: ByteSize, max: ByteSize, std: ByteSize)
 
 /**
  * LogLimit encapsulates allowed amount of logs written by an action.
@@ -45,12 +46,14 @@ protected[core] class LogLimit private (val megabytes: Int) extends AnyVal {
 }
 
 protected[core] object LogLimit extends ArgNormalizer[LogLimit] {
-  protected[core] val MIN_LOGSIZE = 0 MB
-  protected[core] val MAX_LOGSIZE = 10 MB
-  protected[core] val STD_LOGSIZE = 10 MB
+  private val logLimitConfig = loadConfigOrThrow[MemoryLimitConfig](ConfigKeys.logLimit)
+
+  protected[core] val minLogSize: ByteSize = logLimitConfig.min
+  protected[core] val maxLogSize: ByteSize = logLimitConfig.max
+  protected[core] val stdLogSize: ByteSize = logLimitConfig.std
 
   /** Gets LogLimit with default log limit */
-  protected[core] def apply(): LogLimit = LogLimit(STD_LOGSIZE)
+  protected[core] def apply(): LogLimit = LogLimit(stdLogSize)
 
   /**
    * Creates LogLimit for limit. Only the default limit is allowed currently.
@@ -61,9 +64,9 @@ protected[core] object LogLimit extends ArgNormalizer[LogLimit] {
    */
   @throws[IllegalArgumentException]
   protected[core] def apply(megabytes: ByteSize): LogLimit = {
-    require(megabytes >= MIN_LOGSIZE, s"log size $megabytes below allowed threshold of $MIN_LOGSIZE")
-    require(megabytes <= MAX_LOGSIZE, s"log size $megabytes exceeds allowed threshold of $MAX_LOGSIZE")
-    new LogLimit(megabytes.toMB.toInt);
+    require(megabytes >= minLogSize, s"log size $megabytes below allowed threshold of $minLogSize")
+    require(megabytes <= maxLogSize, s"log size $megabytes exceeds allowed threshold of $maxLogSize")
+    new LogLimit(megabytes.toMB.toInt)
   }
 
   override protected[core] implicit val serdes = new RootJsonFormat[LogLimit] {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index dd9885e..fddafd5 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -385,19 +385,24 @@ class ContainerProxy(
     // Adds logs to the raw activation.
     val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
       .flatMap { activation =>
-        val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel)
-        collectLogs(tid, job.msg.user, activation, container, job.action)
-          .andThen {
-            case Success(_) => tid.finished(this, start)
-            case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
-          }
-          .map(logs => Right(activation.withLogs(logs)))
-          .recover {
-            case LogCollectingException(logs) =>
-              Left(ActivationLogReadingError(activation.withLogs(logs)))
-            case _ =>
-              Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
-          }
+        // Skips log collection entirely, if the limit is set to 0
+        if (job.action.limits.logs.asMegaBytes == 0.MB) {
+          Future.successful(Right(activation))
+        } else {
+          val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel)
+          collectLogs(tid, job.msg.user, activation, container, job.action)
+            .andThen {
+              case Success(_) => tid.finished(this, start)
+              case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
+            }
+            .map(logs => Right(activation.withLogs(logs)))
+            .recover {
+              case LogCollectingException(logs) =>
+                Left(ActivationLogReadingError(activation.withLogs(logs)))
+              case _ =>
+                Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
+            }
+        }
       }
 
     // Storing the record. Entirely asynchronous and not waited upon.
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 7f2e553..0055372 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -149,11 +149,12 @@ class ContainerProxyTests
   }
 
   /** Creates an inspectable version of the ack method, which records all calls in a buffer */
-  def createAcker = LoggedFunction { (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId) =>
-    activation.annotations.get("limits") shouldBe Some(action.limits.toJson)
-    activation.annotations.get("path") shouldBe Some(action.fullyQualifiedName(false).toString.toJson)
-    activation.annotations.get("kind") shouldBe Some(action.exec.kind.toJson)
-    Future.successful(())
+  def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
+    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId) =>
+      activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
+      activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
+      activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
+      Future.successful(())
   }
 
   /** Creates an inspectable factory */
@@ -192,7 +193,7 @@ class ContainerProxyTests
         ContainerProxy
           .props(
             factory,
-            createAcker,
+            createAcker(),
             store,
             createCollector(),
             InstanceId(0, Some("myname")),
@@ -212,7 +213,7 @@ class ContainerProxyTests
     timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -248,7 +249,7 @@ class ContainerProxyTests
   it should "run an action and continue with a next run without pausing the container" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -295,7 +296,7 @@ class ContainerProxyTests
   it should "run an action after pausing the container" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -333,7 +334,7 @@ class ContainerProxyTests
   it should "successfully run on an uninitialized container" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -360,13 +361,42 @@ class ContainerProxyTests
     }
   }
 
+  it should "not collect logs if the log-limit is set to 0" in within(timeout) {
+    val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
+
+    val container = new TestContainer
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker(noLogsAction)
+    val store = createStore
+    val collector = createCollector()
+
+    val machine =
+      childActorOf(
+        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+    registerCallback(machine)
+
+    machine ! Run(noLogsAction, message)
+    expectMsg(Transition(machine, Uninitialized, Running))
+    expectWarmed(invocationNamespace.name, noLogsAction)
+    expectMsg(Transition(machine, Running, Ready))
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 1
+      collector.calls should have size 0
+      acker.calls should have size 1
+      store.calls should have size 1
+    }
+  }
+
   /*
    * ERROR CASES
    */
   it should "complete the transaction and abort if container creation fails" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.failed(new Exception()))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -401,7 +431,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -440,7 +470,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -467,7 +497,7 @@ class ContainerProxyTests
   it should "complete the transaction and destroy the container if log reading failed" in {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
 
     val partialLogs = Vector("this log line made it", Messages.logFailure)
@@ -499,7 +529,7 @@ class ContainerProxyTests
   it should "complete the transaction and destroy the container if log reading failed terminally" in {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector(Future.failed(new Exception))
 
@@ -533,7 +563,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
 
     val machine =
@@ -568,7 +598,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
 
     val machine =
@@ -603,7 +633,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -654,7 +684,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
diff --git a/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala b/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
index cbf49c8..d6111d0 100644
--- a/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
@@ -672,11 +672,11 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
         "memory" -> MemoryLimit.stdMemory.toMB.toInt.toJson,
-        "logs" -> LogLimit.STD_LOGSIZE.toMB.toInt.toJson),
+        "logs" -> LogLimit.stdLogSize.toMB.toInt.toJson),
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
         "memory" -> MemoryLimit.stdMemory.toMB.toInt.toJson,
-        "logs" -> LogLimit.STD_LOGSIZE.toMB.toInt.toJson,
+        "logs" -> LogLimit.stdLogSize.toMB.toInt.toJson,
         "foo" -> "bar".toJson),
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
@@ -697,7 +697,7 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
       JsNull,
       JsObject("timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson),
       JsObject("memory" -> MemoryLimit.stdMemory.toMB.toInt.toJson),
-      JsObject("logs" -> (LogLimit.STD_LOGSIZE.toMB.toInt + 1).toJson),
+      JsObject("logs" -> (LogLimit.stdLogSize.toMB.toInt + 1).toJson),
       JsObject(
         "TIMEOUT" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
         "MEMORY" -> MemoryLimit.stdMemory.toMB.toInt.toJson),
@@ -749,7 +749,7 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
     an[IllegalArgumentException] should be thrownBy ActionLimits(
       TimeLimit(),
       MemoryLimit(),
-      LogLimit(LogLimit.MIN_LOGSIZE - 1.B))
+      LogLimit(LogLimit.minLogSize - 1.B))
 
     an[IllegalArgumentException] should be thrownBy ActionLimits(
       TimeLimit(TimeLimit.MAX_DURATION + 1.millisecond),
@@ -762,7 +762,7 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
     an[IllegalArgumentException] should be thrownBy ActionLimits(
       TimeLimit(),
       MemoryLimit(),
-      LogLimit(LogLimit.MAX_LOGSIZE + 1.B))
+      LogLimit(LogLimit.maxLogSize + 1.B))
   }
 
   it should "parse activation id as uuid" in {
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index e820130..aa3caf8 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -94,13 +94,13 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
     }
 
     val toLogsString = logs match {
-      case None                                  => "None"
-      case Some(LogLimit.MIN_LOGSIZE)            => s"${LogLimit.MIN_LOGSIZE} (= min)"
-      case Some(LogLimit.STD_LOGSIZE)            => s"${LogLimit.STD_LOGSIZE} (= std)"
-      case Some(LogLimit.MAX_LOGSIZE)            => s"${LogLimit.MAX_LOGSIZE} (= max)"
-      case Some(l) if (l < LogLimit.MIN_LOGSIZE) => s"${l} (< min)"
-      case Some(l) if (l > LogLimit.MAX_LOGSIZE) => s"${l} (> max)"
-      case Some(l)                               => s"${l} (allowed)"
+      case None                                 => "None"
+      case Some(LogLimit.minLogSize)            => s"${LogLimit.minLogSize} (= min)"
+      case Some(LogLimit.stdLogSize)            => s"${LogLimit.stdLogSize} (= std)"
+      case Some(LogLimit.maxLogSize)            => s"${LogLimit.maxLogSize} (= max)"
+      case Some(l) if (l < LogLimit.minLogSize) => s"${l} (< min)"
+      case Some(l) if (l > LogLimit.maxLogSize) => s"${l} (> max)"
+      case Some(l)                              => s"${l} (allowed)"
     }
 
     val toExpectedResultString: String = if (ec == SUCCESS_EXIT) "allow" else "reject"
@@ -110,7 +110,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
     for {
       time <- Seq(None, Some(TimeLimit.MIN_DURATION), Some(TimeLimit.MAX_DURATION))
       mem <- Seq(None, Some(MemoryLimit.minMemory), Some(MemoryLimit.maxMemory))
-      log <- Seq(None, Some(LogLimit.MIN_LOGSIZE), Some(LogLimit.MAX_LOGSIZE))
+      log <- Seq(None, Some(LogLimit.minLogSize), Some(LogLimit.maxLogSize))
     } yield PermutationTestParameter(time, mem, log)
   } ++
     // Add variations for negative tests
@@ -121,7 +121,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       PermutationTestParameter(None, Some(0.MB), None, BAD_REQUEST), // memory limit that is lower than allowed
       PermutationTestParameter(None, Some(MemoryLimit.maxMemory + 1.MB), None, BAD_REQUEST), // memory limit that is slightly higher than allowed
       PermutationTestParameter(None, Some((MemoryLimit.maxMemory.toMB * 5).MB), None, BAD_REQUEST), // memory limit that is much higher than allowed
-      PermutationTestParameter(None, None, Some((LogLimit.MAX_LOGSIZE.toMB * 5).MB), BAD_REQUEST)) // log size limit that is much higher than allowed
+      PermutationTestParameter(None, None, Some((LogLimit.maxLogSize.toMB * 5).MB), BAD_REQUEST)) // log size limit that is much higher than allowed
 
   /**
    * Integration test to verify that valid timeout, memory and log size limits are accepted
@@ -141,7 +141,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       val limits = JsObject(
         "timeout" -> parm.timeout.getOrElse(TimeLimit.STD_DURATION).toMillis.toJson,
         "memory" -> parm.memory.getOrElse(MemoryLimit.stdMemory).toMB.toInt.toJson,
-        "logs" -> parm.logs.getOrElse(LogLimit.STD_LOGSIZE).toMB.toInt.toJson)
+        "logs" -> parm.logs.getOrElse(LogLimit.stdLogSize).toMB.toInt.toJson)
 
       val name = "ActionLimitTests-" + Instant.now.toEpochMilli
       val createResult = assetHelper.withCleaner(wsk.action, name, confirmDelete = (parm.ec == SUCCESS_EXIT)) {

-- 
To stop receiving notification emails like this one, please contact
tysonnorris@apache.org.