You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/23 17:38:50 UTC

[GitHub] tysonnorris closed pull request #3563: Skip log collection if the log-limit is set to "0".

tysonnorris closed pull request #3563: Skip log collection if the log-limit is set to "0".
URL: https://github.com/apache/incubator-openwhisk/pull/3563
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index e3fadaf939..c12eb0278e 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -156,6 +156,13 @@ whisk {
         std = 256 m
     }
 
+    # action log-limit configuration
+    log-limit {
+        min = 0 m
+        max = 10 m
+        std = 10 m
+    }
+
     mesos {
         master-url = "http://localhost:5050" //your mesos master
         master-public-url = "http://localhost:5050" // if mesos-link-log-message == true, this link will be included with the static log message (may or may not be different from master-url)
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 36f74e02fd..1fd0154f86 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -217,6 +217,7 @@ object ConfigKeys {
 
   val memory = "whisk.memory"
   val timeLimit = "whisk.time-limit"
+  val logLimit = "whisk.log-limit"
   val activation = "whisk.activation"
   val activationPayload = s"$activation.payload"
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/Limits.scala b/common/scala/src/main/scala/whisk/core/entity/Limits.scala
index a2294e0d3d..5937df7c74 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Limits.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Limits.scala
@@ -45,7 +45,9 @@ protected[entity] abstract class Limits {
  * @param memory the memory limit in megabytes, assured to be non-null because it is a value
  * @param logs the limit for logs written by the container and stored in the activation record, assured to be non-null because it is a value
  */
-protected[core] case class ActionLimits protected[core] (timeout: TimeLimit, memory: MemoryLimit, logs: LogLimit)
+protected[core] case class ActionLimits(timeout: TimeLimit = TimeLimit(),
+                                        memory: MemoryLimit = MemoryLimit(),
+                                        logs: LogLimit = LogLimit())
     extends Limits {
   override protected[entity] def toJson = ActionLimits.serdes.write(this)
 }
@@ -59,9 +61,6 @@ protected[core] case class TriggerLimits protected[core] () extends Limits {
 
 protected[core] object ActionLimits extends ArgNormalizer[ActionLimits] with DefaultJsonProtocol {
 
-  /** Creates a ActionLimits instance with default duration, memory and log limits. */
-  protected[core] def apply(): ActionLimits = ActionLimits(TimeLimit(), MemoryLimit(), LogLimit())
-
   override protected[core] implicit val serdes = new RootJsonFormat[ActionLimits] {
     val helper = jsonFormat3(ActionLimits.apply)
 
diff --git a/common/scala/src/main/scala/whisk/core/entity/LogLimit.scala b/common/scala/src/main/scala/whisk/core/entity/LogLimit.scala
index 0b0d88c227..6f3abef94b 100644
--- a/common/scala/src/main/scala/whisk/core/entity/LogLimit.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/LogLimit.scala
@@ -17,16 +17,17 @@
 
 package whisk.core.entity
 
+import pureconfig.loadConfigOrThrow
+
 import scala.language.postfixOps
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
+import spray.json._
+import whisk.core.ConfigKeys
+import whisk.core.entity.size._
 
-import spray.json.JsNumber
-import spray.json.JsValue
-import spray.json.RootJsonFormat
-import spray.json.deserializationError
-import whisk.core.entity.size.SizeInt
+case class LogLimitConfig(min: ByteSize, max: ByteSize, std: ByteSize)
 
 /**
  * LogLimit encapsulates allowed amount of logs written by an action.
@@ -45,12 +46,14 @@ protected[core] class LogLimit private (val megabytes: Int) extends AnyVal {
 }
 
 protected[core] object LogLimit extends ArgNormalizer[LogLimit] {
-  protected[core] val MIN_LOGSIZE = 0 MB
-  protected[core] val MAX_LOGSIZE = 10 MB
-  protected[core] val STD_LOGSIZE = 10 MB
+  private val logLimitConfig = loadConfigOrThrow[MemoryLimitConfig](ConfigKeys.logLimit)
+
+  protected[core] val minLogSize: ByteSize = logLimitConfig.min
+  protected[core] val maxLogSize: ByteSize = logLimitConfig.max
+  protected[core] val stdLogSize: ByteSize = logLimitConfig.std
 
   /** Gets LogLimit with default log limit */
-  protected[core] def apply(): LogLimit = LogLimit(STD_LOGSIZE)
+  protected[core] def apply(): LogLimit = LogLimit(stdLogSize)
 
   /**
    * Creates LogLimit for limit. Only the default limit is allowed currently.
@@ -61,9 +64,9 @@ protected[core] object LogLimit extends ArgNormalizer[LogLimit] {
    */
   @throws[IllegalArgumentException]
   protected[core] def apply(megabytes: ByteSize): LogLimit = {
-    require(megabytes >= MIN_LOGSIZE, s"log size $megabytes below allowed threshold of $MIN_LOGSIZE")
-    require(megabytes <= MAX_LOGSIZE, s"log size $megabytes exceeds allowed threshold of $MAX_LOGSIZE")
-    new LogLimit(megabytes.toMB.toInt);
+    require(megabytes >= minLogSize, s"log size $megabytes below allowed threshold of $minLogSize")
+    require(megabytes <= maxLogSize, s"log size $megabytes exceeds allowed threshold of $maxLogSize")
+    new LogLimit(megabytes.toMB.toInt)
   }
 
   override protected[core] implicit val serdes = new RootJsonFormat[LogLimit] {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index dd9885e7db..fddafd56a1 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -385,19 +385,24 @@ class ContainerProxy(
     // Adds logs to the raw activation.
     val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation
       .flatMap { activation =>
-        val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel)
-        collectLogs(tid, job.msg.user, activation, container, job.action)
-          .andThen {
-            case Success(_) => tid.finished(this, start)
-            case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
-          }
-          .map(logs => Right(activation.withLogs(logs)))
-          .recover {
-            case LogCollectingException(logs) =>
-              Left(ActivationLogReadingError(activation.withLogs(logs)))
-            case _ =>
-              Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
-          }
+        // Skips log collection entirely, if the limit is set to 0
+        if (job.action.limits.logs.asMegaBytes == 0.MB) {
+          Future.successful(Right(activation))
+        } else {
+          val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS, logLevel = InfoLevel)
+          collectLogs(tid, job.msg.user, activation, container, job.action)
+            .andThen {
+              case Success(_) => tid.finished(this, start)
+              case Failure(t) => tid.failed(this, start, s"reading logs failed: $t")
+            }
+            .map(logs => Right(activation.withLogs(logs)))
+            .recover {
+              case LogCollectingException(logs) =>
+                Left(ActivationLogReadingError(activation.withLogs(logs)))
+              case _ =>
+                Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
+            }
+        }
       }
 
     // Storing the record. Entirely asynchronous and not waited upon.
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 7f2e55319b..0055372f1e 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -149,11 +149,12 @@ class ContainerProxyTests
   }
 
   /** Creates an inspectable version of the ack method, which records all calls in a buffer */
-  def createAcker = LoggedFunction { (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId) =>
-    activation.annotations.get("limits") shouldBe Some(action.limits.toJson)
-    activation.annotations.get("path") shouldBe Some(action.fullyQualifiedName(false).toString.toJson)
-    activation.annotations.get("kind") shouldBe Some(action.exec.kind.toJson)
-    Future.successful(())
+  def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
+    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId) =>
+      activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
+      activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
+      activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
+      Future.successful(())
   }
 
   /** Creates an inspectable factory */
@@ -192,7 +193,7 @@ class ContainerProxyTests
         ContainerProxy
           .props(
             factory,
-            createAcker,
+            createAcker(),
             store,
             createCollector(),
             InstanceId(0, Some("myname")),
@@ -212,7 +213,7 @@ class ContainerProxyTests
     timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -248,7 +249,7 @@ class ContainerProxyTests
   it should "run an action and continue with a next run without pausing the container" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -295,7 +296,7 @@ class ContainerProxyTests
   it should "run an action after pausing the container" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -333,7 +334,7 @@ class ContainerProxyTests
   it should "successfully run on an uninitialized container" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -360,13 +361,42 @@ class ContainerProxyTests
     }
   }
 
+  it should "not collect logs if the log-limit is set to 0" in within(timeout) {
+    val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
+
+    val container = new TestContainer
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker(noLogsAction)
+    val store = createStore
+    val collector = createCollector()
+
+    val machine =
+      childActorOf(
+        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
+    registerCallback(machine)
+
+    machine ! Run(noLogsAction, message)
+    expectMsg(Transition(machine, Uninitialized, Running))
+    expectWarmed(invocationNamespace.name, noLogsAction)
+    expectMsg(Transition(machine, Running, Ready))
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 1
+      collector.calls should have size 0
+      acker.calls should have size 1
+      store.calls should have size 1
+    }
+  }
+
   /*
    * ERROR CASES
    */
   it should "complete the transaction and abort if container creation fails" in within(timeout) {
     val container = new TestContainer
     val factory = createFactory(Future.failed(new Exception()))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -401,7 +431,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -440,7 +470,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -467,7 +497,7 @@ class ContainerProxyTests
   it should "complete the transaction and destroy the container if log reading failed" in {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
 
     val partialLogs = Vector("this log line made it", Messages.logFailure)
@@ -499,7 +529,7 @@ class ContainerProxyTests
   it should "complete the transaction and destroy the container if log reading failed terminally" in {
     val container = new TestContainer
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector(Future.failed(new Exception))
 
@@ -533,7 +563,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
 
     val machine =
@@ -568,7 +598,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
 
     val machine =
@@ -603,7 +633,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
@@ -654,7 +684,7 @@ class ContainerProxyTests
       }
     }
     val factory = createFactory(Future.successful(container))
-    val acker = createAcker
+    val acker = createAcker()
     val store = createStore
     val collector = createCollector()
 
diff --git a/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala b/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
index cbf49c8c22..d6111d002e 100644
--- a/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/SchemaTests.scala
@@ -672,11 +672,11 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
         "memory" -> MemoryLimit.stdMemory.toMB.toInt.toJson,
-        "logs" -> LogLimit.STD_LOGSIZE.toMB.toInt.toJson),
+        "logs" -> LogLimit.stdLogSize.toMB.toInt.toJson),
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
         "memory" -> MemoryLimit.stdMemory.toMB.toInt.toJson,
-        "logs" -> LogLimit.STD_LOGSIZE.toMB.toInt.toJson,
+        "logs" -> LogLimit.stdLogSize.toMB.toInt.toJson,
         "foo" -> "bar".toJson),
       JsObject(
         "timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
@@ -697,7 +697,7 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
       JsNull,
       JsObject("timeout" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson),
       JsObject("memory" -> MemoryLimit.stdMemory.toMB.toInt.toJson),
-      JsObject("logs" -> (LogLimit.STD_LOGSIZE.toMB.toInt + 1).toJson),
+      JsObject("logs" -> (LogLimit.stdLogSize.toMB.toInt + 1).toJson),
       JsObject(
         "TIMEOUT" -> TimeLimit.STD_DURATION.toMillis.toInt.toJson,
         "MEMORY" -> MemoryLimit.stdMemory.toMB.toInt.toJson),
@@ -749,7 +749,7 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
     an[IllegalArgumentException] should be thrownBy ActionLimits(
       TimeLimit(),
       MemoryLimit(),
-      LogLimit(LogLimit.MIN_LOGSIZE - 1.B))
+      LogLimit(LogLimit.minLogSize - 1.B))
 
     an[IllegalArgumentException] should be thrownBy ActionLimits(
       TimeLimit(TimeLimit.MAX_DURATION + 1.millisecond),
@@ -762,7 +762,7 @@ class SchemaTests extends FlatSpec with BeforeAndAfter with ExecHelpers with Mat
     an[IllegalArgumentException] should be thrownBy ActionLimits(
       TimeLimit(),
       MemoryLimit(),
-      LogLimit(LogLimit.MAX_LOGSIZE + 1.B))
+      LogLimit(LogLimit.maxLogSize + 1.B))
   }
 
   it should "parse activation id as uuid" in {
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index e820130ff2..aa3caf8bee 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -94,13 +94,13 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
     }
 
     val toLogsString = logs match {
-      case None                                  => "None"
-      case Some(LogLimit.MIN_LOGSIZE)            => s"${LogLimit.MIN_LOGSIZE} (= min)"
-      case Some(LogLimit.STD_LOGSIZE)            => s"${LogLimit.STD_LOGSIZE} (= std)"
-      case Some(LogLimit.MAX_LOGSIZE)            => s"${LogLimit.MAX_LOGSIZE} (= max)"
-      case Some(l) if (l < LogLimit.MIN_LOGSIZE) => s"${l} (< min)"
-      case Some(l) if (l > LogLimit.MAX_LOGSIZE) => s"${l} (> max)"
-      case Some(l)                               => s"${l} (allowed)"
+      case None                                 => "None"
+      case Some(LogLimit.minLogSize)            => s"${LogLimit.minLogSize} (= min)"
+      case Some(LogLimit.stdLogSize)            => s"${LogLimit.stdLogSize} (= std)"
+      case Some(LogLimit.maxLogSize)            => s"${LogLimit.maxLogSize} (= max)"
+      case Some(l) if (l < LogLimit.minLogSize) => s"${l} (< min)"
+      case Some(l) if (l > LogLimit.maxLogSize) => s"${l} (> max)"
+      case Some(l)                              => s"${l} (allowed)"
     }
 
     val toExpectedResultString: String = if (ec == SUCCESS_EXIT) "allow" else "reject"
@@ -110,7 +110,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
     for {
       time <- Seq(None, Some(TimeLimit.MIN_DURATION), Some(TimeLimit.MAX_DURATION))
       mem <- Seq(None, Some(MemoryLimit.minMemory), Some(MemoryLimit.maxMemory))
-      log <- Seq(None, Some(LogLimit.MIN_LOGSIZE), Some(LogLimit.MAX_LOGSIZE))
+      log <- Seq(None, Some(LogLimit.minLogSize), Some(LogLimit.maxLogSize))
     } yield PermutationTestParameter(time, mem, log)
   } ++
     // Add variations for negative tests
@@ -121,7 +121,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       PermutationTestParameter(None, Some(0.MB), None, BAD_REQUEST), // memory limit that is lower than allowed
       PermutationTestParameter(None, Some(MemoryLimit.maxMemory + 1.MB), None, BAD_REQUEST), // memory limit that is slightly higher than allowed
       PermutationTestParameter(None, Some((MemoryLimit.maxMemory.toMB * 5).MB), None, BAD_REQUEST), // memory limit that is much higher than allowed
-      PermutationTestParameter(None, None, Some((LogLimit.MAX_LOGSIZE.toMB * 5).MB), BAD_REQUEST)) // log size limit that is much higher than allowed
+      PermutationTestParameter(None, None, Some((LogLimit.maxLogSize.toMB * 5).MB), BAD_REQUEST)) // log size limit that is much higher than allowed
 
   /**
    * Integration test to verify that valid timeout, memory and log size limits are accepted
@@ -141,7 +141,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers {
       val limits = JsObject(
         "timeout" -> parm.timeout.getOrElse(TimeLimit.STD_DURATION).toMillis.toJson,
         "memory" -> parm.memory.getOrElse(MemoryLimit.stdMemory).toMB.toInt.toJson,
-        "logs" -> parm.logs.getOrElse(LogLimit.STD_LOGSIZE).toMB.toInt.toJson)
+        "logs" -> parm.logs.getOrElse(LogLimit.stdLogSize).toMB.toInt.toJson)
 
       val name = "ActionLimitTests-" + Instant.now.toEpochMilli
       val createResult = assetHelper.withCleaner(wsk.action, name, confirmDelete = (parm.ec == SUCCESS_EXIT)) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services