You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/29 15:07:59 UTC

[GitHub] chetanmeh closed pull request #2282: Distributed tracing support #2192

chetanmeh closed pull request #2282: Distributed tracing support #2192
URL: https://github.com/apache/incubator-openwhisk/pull/2282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 488c94f49d..02e94531b9 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,6 +64,14 @@ dependencies {
     compile 'io.kamon:kamon-statsd_2.11:0.6.7'
     //for mesos
     compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.7'
+
+    //tracing support
+    compile 'io.opentracing:opentracing-api:0.31.0'
+    compile 'io.opentracing:opentracing-util:0.31.0'
+    compile 'io.opentracing.brave:brave-opentracing:0.31.0'
+    compile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1'
+    compile 'io.zipkin.reporter2:zipkin-reporter:2.6.1'
+
     scoverage gradle.scoverage.deps
 }
 
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 3ceaab8b77..bbac5f4b64 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -188,4 +188,18 @@ whisk {
         constraint-delimiter = " "//used to parse constraint strings
         teardown-on-exit = true //set to true to disable the mesos framework on system exit; set for false for HA deployments
     }
+
+    # tracing configuration
+    tracing {
+        cache-expiry = 30 seconds #how long to keep spans in cache. Set to appropriate value to trace long running requests
+        #Zipkin configuration. Uncomment following to enable zipkin based tracing
+        #zipkin {
+        #   url = "http://localhost:9411" //url to connecto to zipkin server
+             //sample-rate to decide a request is sampled or not.
+             //sample-rate 0.5 eqauls to sampling 50% of the requests
+             //sample-rate of 1 means 100% sampling.
+             //sample-rate of 0 means no sampling
+        #   sample-rate = "0.01" // sample 1% of requests by default
+        #}
+    }
 }
diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala b/common/scala/src/main/scala/whisk/common/TransactionId.scala
index cb16a42fd8..6441629f44 100644
--- a/common/scala/src/main/scala/whisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala
@@ -24,6 +24,8 @@ import akka.http.scaladsl.model.headers.RawHeader
 import pureconfig.loadConfigOrThrow
 import spray.json._
 import whisk.core.ConfigKeys
+import pureconfig._
+import whisk.common.tracing.WhiskTracerProvider
 
 import scala.util.Try
 
@@ -82,6 +84,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
     }
 
     MetricEmitter.emitCounterMetric(marker)
+
+    //tracing support
+    WhiskTracerProvider.tracer.startSpan(marker, this)
     StartMarker(Instant.now, marker)
   }
 
@@ -116,6 +121,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
     }
 
     MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
+
+    //tracing support
+    WhiskTracerProvider.tracer.finishSpan(this)
   }
 
   /**
@@ -144,6 +152,9 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
 
     MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
     MetricEmitter.emitCounterMetric(endMarker)
+
+    //tracing support
+    WhiskTracerProvider.tracer.error(this)
   }
 
   /**
diff --git a/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala b/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala
new file mode 100644
index 0000000000..09c105a567
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/tracing/OpenTracingProvider.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common.tracing
+
+import java.util.concurrent.TimeUnit
+
+import brave.Tracing
+import brave.opentracing.BraveTracer
+import brave.sampler.Sampler
+import com.github.benmanes.caffeine.cache.{Caffeine, Ticker}
+import io.opentracing.propagation.{Format, TextMapExtractAdapter, TextMapInjectAdapter}
+import io.opentracing.util.GlobalTracer
+import io.opentracing.{Span, SpanContext, Tracer}
+import pureconfig._
+import whisk.common.{LogMarkerToken, TransactionId}
+import whisk.core.ConfigKeys
+import zipkin2.reporter.okhttp3.OkHttpSender
+import zipkin2.reporter.{AsyncReporter, Sender}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+
+/**
+ * OpenTracing based implementation for tracing
+ */
+class OpenTracer(val tracer: Tracer, tracingConfig: TracingConfig, ticker: Ticker = SystemTicker) extends WhiskTracer {
+  val spanMap = configureCache[String, List[Span]]()
+  val contextMap = configureCache[String, SpanContext]()
+
+  /**
+   * Start a Trace for given service.
+   *
+   * @param transactionId transactionId to which this Trace belongs.
+   * @return TracedRequest which provides details about current service being traced.
+   */
+  override def startSpan(logMarker: LogMarkerToken, transactionId: TransactionId): Unit = {
+    //initialize list for this transactionId
+    val spanList = spanMap.getOrElse(transactionId.meta.id, Nil)
+
+    val spanBuilder = tracer
+      .buildSpan(logMarker.action)
+      .withTag("transactionId", transactionId.meta.id)
+
+    val active = spanList match {
+      case Nil =>
+        //Check if any active context then resume from that else create a fresh span
+        contextMap
+          .get(transactionId.meta.id)
+          .map(spanBuilder.asChildOf)
+          .getOrElse(spanBuilder.ignoreActiveSpan())
+          .startActive(true)
+          .span()
+      case head :: _ =>
+        //Create a child span of current head
+        spanBuilder.asChildOf(head).startActive(true).span()
+    }
+    //add active span to list
+    spanMap.put(transactionId.meta.id, active :: spanList)
+  }
+
+  /**
+   * Finish a Trace associated with given transactionId.
+   *
+   * @param transactionId
+   */
+  override def finishSpan(transactionId: TransactionId): Unit = {
+    clear(transactionId)
+  }
+
+  /**
+   * Register error
+   *
+   * @param transactionId
+   */
+  override def error(transactionId: TransactionId): Unit = {
+    clear(transactionId)
+  }
+
+  /**
+   * Get the current TraceContext which can be used for downstream services
+   *
+   * @param transactionId
+   * @return
+   */
+  override def getTraceContext(transactionId: TransactionId): Option[Map[String, String]] = {
+    spanMap
+      .get(transactionId.meta.id)
+      .flatMap(_.headOption)
+      .map { span =>
+        val map = mutable.Map.empty[String, String]
+        tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapInjectAdapter(map.asJava))
+        map.toMap
+      }
+  }
+
+  /**
+   * Get the current TraceContext which can be used for downstream services
+   *
+   * @param transactionId
+   * @return
+   */
+  override def setTraceContext(transactionId: TransactionId, context: Option[Map[String, String]]) = {
+    context.foreach { scalaMap =>
+      val ctx: SpanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(scalaMap.asJava))
+      contextMap.put(transactionId.meta.id, ctx)
+    }
+  }
+
+  private def clear(transactionId: TransactionId): Unit = {
+    spanMap.get(transactionId.meta.id).foreach {
+      case head :: Nil =>
+        head.finish()
+        spanMap.remove(transactionId.meta.id)
+        contextMap.remove(transactionId.meta.id)
+      case head :: tail =>
+        head.finish()
+        spanMap.put(transactionId.meta.id, tail)
+      case Nil =>
+    }
+  }
+
+  private def configureCache[T, R](): collection.concurrent.Map[T, R] =
+    Caffeine
+      .newBuilder()
+      .ticker(ticker)
+      .expireAfterAccess(tracingConfig.cacheExpiry.toSeconds, TimeUnit.SECONDS)
+      .build()
+      .asMap()
+      .asScala
+      .asInstanceOf[collection.concurrent.Map[T, R]]
+}
+
+trait WhiskTracer {
+  def startSpan(logMarker: LogMarkerToken, transactionId: TransactionId): Unit = {}
+  def finishSpan(transactionId: TransactionId): Unit = {}
+  def error(transactionId: TransactionId): Unit = {}
+  def getTraceContext(transactionId: TransactionId): Option[Map[String, String]] = None
+  def setTraceContext(transactionId: TransactionId, context: Option[Map[String, String]]): Unit = {}
+}
+
+object WhiskTracerProvider {
+  val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing)
+
+  val tracer: WhiskTracer = createTracer(tracingConfig)
+
+  private def createTracer(tracingConfig: TracingConfig): WhiskTracer = {
+
+    tracingConfig.zipkin match {
+      case Some(zipkinConfig) => {
+        if (!GlobalTracer.isRegistered) {
+          val sender: Sender = OkHttpSender.create(zipkinConfig.generateUrl)
+          val spanReporter = AsyncReporter.create(sender)
+          val braveTracing = Tracing
+            .newBuilder()
+            .localServiceName(tracingConfig.component)
+            .spanReporter(spanReporter)
+            .sampler(Sampler.create(zipkinConfig.sampleRate.toFloat))
+            .build()
+
+          //register with OpenTracing
+          GlobalTracer.register(BraveTracer.create(braveTracing))
+
+          sys.addShutdownHook({ spanReporter.close() })
+        }
+      }
+      case None =>
+    }
+
+    if (GlobalTracer.isRegistered)
+      new OpenTracer(GlobalTracer.get(), tracingConfig)
+    else
+      NoopTracer
+  }
+}
+
+private object NoopTracer extends WhiskTracer
+case class TracingConfig(component: String, cacheExpiry: Duration, zipkin: Option[ZipkinConfig] = None)
+case class ZipkinConfig(url: String, sampleRate: String) {
+  def generateUrl = s"$url/api/v2/spans"
+}
+object SystemTicker extends Ticker {
+  override def read() = {
+    System.nanoTime()
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index fb20fda65b..be9f7b0b0a 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -222,6 +222,9 @@ object ConfigKeys {
   val dockerContainerFactory = s"${docker}.container-factory"
   val runc = "whisk.runc"
   val runcTimeouts = s"$runc.timeouts"
+
+  val tracing = "whisk.tracing"
+
   val containerFactory = "whisk.container-factory"
   val containerArgs = s"$containerFactory.container-args"
   val containerPool = "whisk.container-pool"
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index eb960b3ca5..d540a54abd 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -49,7 +49,8 @@ case class ActivationMessage(override val transid: TransactionId,
                              rootControllerIndex: ControllerInstanceId,
                              blocking: Boolean,
                              content: Option[JsObject],
-                             cause: Option[ActivationId] = None)
+                             cause: Option[ActivationId] = None,
+                             traceContext: Option[Map[String, String]] = None)
     extends Message {
 
   override def serialize = ActivationMessage.serdes.write(this).compactPrint
@@ -67,7 +68,7 @@ object ActivationMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(serdes.read(msg.parseJson))
 
   private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat9(ActivationMessage.apply)
+  implicit val serdes = jsonFormat10(ActivationMessage.apply)
 }
 
 /**
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index a2886368e3..77ce527c6d 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -78,3 +78,13 @@ ssl-config.enabledCipherSuites = [
   "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
   "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
 ]
+
+whisk{
+  # tracing configuration
+  tracing {
+    component = "Controller"
+  }
+}
+
+
+
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index 79da2d5236..49d84c2782 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem
 import akka.event.Logging.InfoLevel
 import spray.json._
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.common.tracing.WhiskTracerProvider
 import whisk.core.connector.ActivationMessage
 import whisk.core.controller.WhiskServices
 import whisk.core.database.NoDocumentException
@@ -146,25 +147,29 @@ protected[actions] trait PrimitiveActions {
 
     // merge package parameters with action (action parameters supersede), then merge in payload
     val args = action.parameters merge payload
+    val activationId = activationIdFactory.make()
+
+    val startActivation = transid.started(
+      this,
+      waitForResponse
+        .map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
+        .getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
+      logLevel = InfoLevel)
+    val startLoadbalancer =
+      transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${activationId}")
+
     val message = ActivationMessage(
       transid,
       FullyQualifiedEntityName(action.namespace, action.name, Some(action.version)),
       action.rev,
       user,
-      activationIdFactory.make(), // activation id created here
+      activationId, // activation id created here
       activeAckTopicIndex,
       waitForResponse.isDefined,
       args,
-      cause = cause)
+      cause = cause,
+      WhiskTracerProvider.tracer.getTraceContext(transid))
 
-    val startActivation = transid.started(
-      this,
-      waitForResponse
-        .map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
-        .getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
-      logLevel = InfoLevel)
-    val startLoadbalancer =
-      transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${message.activationId}")
     val postedFuture = loadBalancer.publish(action, message)
 
     postedFuture.flatMap { activeAckResponse =>
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index ebd45e5838..00c33399ab 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -66,4 +66,9 @@ whisk {
       pause-grace = 50 milliseconds
     }
   }
+
+  # tracing configuration
+  tracing {
+    component = "Invoker"
+  }
 }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index ae4a1ae081..87ca1e8018 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -26,6 +26,7 @@ import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
 import pureconfig._
 import spray.json._
+import whisk.common.tracing.WhiskTracerProvider
 import whisk.common._
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector._
@@ -192,6 +193,9 @@ class InvokerReactive(
 
         implicit val transid: TransactionId = msg.transid
 
+        //set trace context to continue tracing
+        WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext)
+
         if (!namespaceBlacklist.isBlacklisted(msg.user)) {
           val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel)
           val namespace = msg.action.path
diff --git a/tests/build.gradle b/tests/build.gradle
index cf737edbf9..919d27e4b8 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -145,12 +145,15 @@ dependencies {
     compile 'com.typesafe.akka:akka-http-testkit_2.11:10.1.1'
     compile 'com.github.java-json-tools:json-schema-validator:2.2.8'
     compile "org.mockito:mockito-core:2.15.0"
+    compile 'io.opentracing:opentracing-mock:0.31.0'
 
     compile project(':common:scala')
     compile project(':core:controller')
     compile project(':core:invoker')
     compile project(':tools:admin')
 
+
+
     scoverage gradle.scoverage.deps
 }
 
diff --git a/tests/src/test/scala/common/WskTracingTests.scala b/tests/src/test/scala/common/WskTracingTests.scala
new file mode 100644
index 0000000000..516f890188
--- /dev/null
+++ b/tests/src/test/scala/common/WskTracingTests.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import io.opentracing.Span
+import io.opentracing.mock.{MockSpan, MockTracer}
+import com.github.benmanes.caffeine.cache.Ticker
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import pureconfig.loadConfigOrThrow
+import whisk.common.{LoggingMarkers, TransactionId}
+import whisk.common.tracing.{OpenTracer, TracingConfig}
+import whisk.core.ConfigKeys
+
+import scala.ref.WeakReference
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+@RunWith(classOf[JUnitRunner])
+class WskTracingTests extends FlatSpec with TestHelpers with Matchers {
+
+  val tracer: MockTracer = new MockTracer()
+  val tracingConfig = loadConfigOrThrow[TracingConfig](ConfigKeys.tracing)
+  val ticker = new FakeTicker(System.nanoTime())
+  val openTracer = new OpenTracer(tracer, tracingConfig, ticker)
+
+  it should "create span and context and invalidate cache after expiry" in {
+    tracer.reset
+
+    val transactionId: TransactionId = TransactionId.testing
+    var list: List[WeakReference[Span]] = List()
+
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    var ctx = openTracer.getTraceContext(transactionId)
+    openTracer.setTraceContext(transactionId, ctx)
+    ctx should be(defined)
+
+    //advance ticker
+    ticker.time = System.nanoTime() + (tracingConfig.cacheExpiry.toNanos + 100)
+    ctx = openTracer.getTraceContext(transactionId)
+    ctx should not be (defined)
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 1
+    //no parent for new span as cache expiry cleared spanMap and contextMap
+    finishedSpans.get(0).parentId() should be(0)
+  }
+
+  it should "create a finished span" in {
+    tracer.reset
+    val transactionId: TransactionId = TransactionId.testing
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 1
+
+  }
+
+  it should "create a child span" in {
+    tracer.reset
+    val transactionId: TransactionId = TransactionId.testing
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+    openTracer.finishSpan(transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 2
+    val parent: MockSpan = finishedSpans.get(1)
+    val child: MockSpan = finishedSpans.get(0)
+    child.parentId should be(parent.context().spanId)
+
+  }
+
+  it should "create a span with tag" in {
+    tracer.reset
+    val transactionId: TransactionId = TransactionId.testing
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 1
+    val mockSpan: MockSpan = finishedSpans.get(0)
+    mockSpan.tags should not be null
+    mockSpan.tags should have size 1
+
+  }
+
+  it should "create a valid trace context and use it" in {
+    tracer.reset
+    val transactionId: TransactionId = TransactionId.testing
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_ACTIVATION, transactionId)
+    val context = openTracer.getTraceContext(transactionId)
+    openTracer.finishSpan(transactionId)
+    tracer.reset
+    //use context for new span
+    openTracer.setTraceContext(transactionId, context)
+    openTracer.startSpan(LoggingMarkers.CONTROLLER_KAFKA, transactionId)
+    openTracer.finishSpan(transactionId)
+    val finishedSpans = tracer.finishedSpans()
+    finishedSpans should have size 1
+    val child: MockSpan = finishedSpans.get(0)
+    //This child span should have a parent as we have set trace context
+    child.parentId should be > 0L
+  }
+}
+
+class FakeTicker(var time: Long) extends Ticker {
+  override def read() = {
+    time
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services