You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/05/10 05:21:19 UTC

[openwhisk] branch master updated: Replace kafka.RecordMetadata with a common ResultMetadata (#5217)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new cbdcfe574 Replace kafka.RecordMetadata with a common ResultMetadata (#5217)
cbdcfe574 is described below

commit cbdcfe574984b6509335da64da86f80d592aaa1e
Author: jiangpch <ji...@navercorp.com>
AuthorDate: Tue May 10 13:21:12 2022 +0800

    Replace kafka.RecordMetadata with a common ResultMetadata (#5217)
    
    * Replace kafka.RecordMetadata with a common ResultMetadata
    
    * remove unused import
---
 .../connector/kafka/KafkaProducerConnector.scala   | 11 ++++++-----
 .../openwhisk/connector/lean/LeanProducer.scala    | 10 ++++------
 .../apache/openwhisk/core/connector/Message.scala  |  2 ++
 .../openwhisk/core/connector/MessageProducer.scala |  4 +---
 .../core/loadBalancer/CommonLoadBalancer.scala     |  5 ++---
 .../core/loadBalancer/FPCPoolBalancer.scala        |  5 ++---
 .../core/loadBalancer/InvokerPoolFactory.scala     |  8 +++-----
 .../core/loadBalancer/InvokerSupervision.scala     |  7 +++----
 .../ShardingContainerPoolBalancer.scala            |  3 +--
 .../v2/FunctionPullingContainerPool.scala          | 10 +++++-----
 .../core/invoker/ContainerMessageConsumer.scala    |  3 +--
 .../core/invoker/FPCInvokerReactive.scala          |  3 +--
 .../scheduler/container/ContainerManager.scala     | 11 ++++-------
 .../core/connector/test/TestConnector.scala        | 21 +++++++-------------
 .../test/FunctionPullingContainerPoolTests.scala   |  7 +++----
 .../test/ContainerMessageConsumerTests.scala       |  3 +--
 .../test/InvokerSupervisionTests.scala             | 15 ++++++--------
 .../test/ShardingContainerPoolBalancerTests.scala  | 23 +++++++++++-----------
 .../container/test/ContainerManagerTests.scala     |  9 +++------
 .../queue/test/MemoryQueueTestsFixture.scala       | 16 ++++++++-------
 20 files changed, 76 insertions(+), 100 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
index e08ad972f..5b616113e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
@@ -27,7 +27,7 @@ import pureconfig.generic.auto._
 import org.apache.openwhisk.common.{Counter, Logging, TransactionId}
 import org.apache.openwhisk.connector.kafka.KafkaConfiguration._
 import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.connector.{Message, MessageProducer}
+import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
 import org.apache.openwhisk.core.entity.{ByteSize, UUIDs}
 import org.apache.openwhisk.utils.Exceptions
 
@@ -49,17 +49,18 @@ class KafkaProducerConnector(
   override def sentCount(): Long = sentCounter.cur
 
   /** Sends msg to topic. This is an asynchronous operation. */
-  override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
+  override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
     implicit val transid: TransactionId = msg.transid
     val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
-    val produced = Promise[RecordMetadata]()
+    val produced = Promise[ResultMetadata]()
 
     Future {
       blocking {
         try {
           producer.send(record, new Callback {
             override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
-              if (exception == null) produced.trySuccess(metadata)
+              if (exception == null)
+                produced.trySuccess(ResultMetadata(metadata.topic(), metadata.partition(), metadata.offset()))
               else produced.tryFailure(exception)
             }
           })
@@ -72,7 +73,7 @@ class KafkaProducerConnector(
 
     produced.future.andThen {
       case Success(status) =>
-        logging.debug(this, s"sent message: ${status.topic()}[${status.partition()}][${status.offset()}]")
+        logging.debug(this, s"sent message: ${status.topic}[${status.partition}][${status.offset}]")
         sentCounter.next()
       case Failure(t) =>
         logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
index e555e85c2..2463d1410 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
@@ -18,13 +18,11 @@
 package org.apache.openwhisk.connector.lean
 
 import akka.actor.ActorSystem
+
 import scala.concurrent.Future
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
 import org.apache.openwhisk.common.Counter
 import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.core.connector.Message
-import org.apache.openwhisk.core.connector.MessageProducer
+import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
 
 import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
 import scala.collection.mutable.Map
@@ -39,7 +37,7 @@ class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit log
   override def sentCount(): Long = sentCounter.cur
 
   /** Sends msg to topic. This is an asynchronous operation. */
-  override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
+  override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
     implicit val transid = msg.transid
 
     val queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]())
@@ -47,7 +45,7 @@ class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit log
     Future {
       queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8))
       sentCounter.next()
-      new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1)
+      ResultMetadata(topic, 0, -1)
     }
   }
 
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index e823cf109..9636769b2 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -48,6 +48,8 @@ trait Message {
   override def toString = serialize
 }
 
+case class ResultMetadata(topic: String, partition: Int, offset: Long)
+
 case class ActivationMessage(override val transid: TransactionId,
                              action: FullyQualifiedEntityName,
                              revision: DocRevision,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala
index e527f6960..de4e78a93 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala
@@ -19,15 +19,13 @@ package org.apache.openwhisk.core.connector
 
 import scala.concurrent.Future
 
-import org.apache.kafka.clients.producer.RecordMetadata
-
 trait MessageProducer {
 
   /** Count of messages sent. */
   def sentCount(): Long
 
   /** Sends msg to topic. This is an asynchronous operation. */
-  def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
+  def send(topic: String, msg: Message, retry: Int = 0): Future[ResultMetadata]
 
   /** Closes producer. */
   def close(): Unit
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 932443062..045828d66 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.LongAdder
 
 import akka.actor.ActorSystem
 import akka.event.Logging.InfoLevel
-import org.apache.kafka.clients.producer.RecordMetadata
 import pureconfig._
 import pureconfig.generic.auto._
 import org.apache.openwhisk.common.LoggingMarkers._
@@ -189,7 +188,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
   /** 3. Send the activation to the invoker */
   protected def sendActivationToInvoker(producer: MessageProducer,
                                         msg: ActivationMessage,
-                                        invoker: InvokerInstanceId): Future[RecordMetadata] = {
+                                        invoker: InvokerInstanceId): Future[ResultMetadata] = {
     implicit val transid: TransactionId = msg.transid
 
     val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
@@ -206,7 +205,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
         transid.finished(
           this,
           start,
-          s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
+          s"posted to ${status.topic}[${status.partition}][${status.offset}]",
           logLevel = InfoLevel)
       case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
     }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index f606f64f8..361478213 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -8,7 +8,6 @@ import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, P
 import akka.event.Logging.InfoLevel
 import akka.pattern.ask
 import akka.util.Timeout
-import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.connector._
@@ -250,7 +249,7 @@ class FPCPoolBalancer(config: WhiskConfig,
   /** 3. Send the activation to the kafka */
   private def sendActivationToKafka(producer: MessageProducer,
                                     msg: ActivationMessage,
-                                    topic: String): Future[RecordMetadata] = {
+                                    topic: String): Future[ResultMetadata] = {
     implicit val transid: TransactionId = msg.transid
 
     MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
@@ -261,7 +260,7 @@ class FPCPoolBalancer(config: WhiskConfig,
         transid.finished(
           this,
           start,
-          s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
+          s"posted to ${status.topic}[${status.partition}][${status.offset}]",
           logLevel = InfoLevel)
       case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
     }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala
index 1511b6890..fd7f9fa4e 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala
@@ -18,11 +18,9 @@
 package org.apache.openwhisk.core.loadBalancer
 import akka.actor.ActorRef
 import akka.actor.ActorRefFactory
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.openwhisk.core.connector.ActivationMessage
-import org.apache.openwhisk.core.connector.MessageProducer
-import org.apache.openwhisk.core.connector.MessagingProvider
+import org.apache.openwhisk.core.connector.{ActivationMessage, MessageProducer, MessagingProvider, ResultMetadata}
 import org.apache.openwhisk.core.entity.InvokerInstanceId
+
 import scala.concurrent.Future
 
 trait InvokerPoolFactory {
@@ -30,6 +28,6 @@ trait InvokerPoolFactory {
     actorRefFactory: ActorRefFactory,
     messagingProvider: MessagingProvider,
     messagingProducer: MessageProducer,
-    sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+    sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
     monitor: Option[ActorRef]): ActorRef
 }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index f526da0e7..24b0f3369 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -24,7 +24,6 @@ import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
-import org.apache.kafka.clients.producer.RecordMetadata
 import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props}
 import akka.actor.FSM.CurrentState
 import akka.actor.FSM.SubscribeTransitionCallBack
@@ -76,7 +75,7 @@ final case class InvokerInfo(buffer: RingBuffer[InvocationFinishedResult])
  * by the InvokerPool and thus might not be caught by monitoring.
  */
 class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef,
-                  sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+                  sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
                   pingConsumer: MessageConsumer,
                   monitor: Option[ActorRef])
     extends Actor {
@@ -230,7 +229,7 @@ object InvokerPool {
   }
 
   def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
-            p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+            p: (ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
             pc: MessageConsumer,
             m: Option[ActorRef] = None): Props = {
     Props(new InvokerPool(f, p, pc, m))
@@ -273,7 +272,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
 
   // This is done at this point to not intermingle with the state-machine especially their timeouts.
   def customReceive: Receive = {
-    case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
+    case _: ResultMetadata => // Ignores the result of publishing test actions to MessageProducer.
   }
 
   override def receive: Receive = customReceive.orElse(super.receive)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index f71b5d8e0..5f7b9f05c 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -26,7 +26,6 @@ import akka.cluster.ClusterEvent._
 import akka.cluster.{Cluster, Member, MemberStatus}
 import akka.management.scaladsl.AkkaManagement
 import akka.management.cluster.bootstrap.ClusterBootstrap
-import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
 import pureconfig._
 import pureconfig.generic.auto._
@@ -340,7 +339,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
         actorRefFactory: ActorRefFactory,
         messagingProvider: MessagingProvider,
         messagingProducer: MessageProducer,
-        sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+        sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
         monitor: Option[ActorRef]): ActorRef = {
 
         InvokerPool.prepare(instance, WhiskEntityStore.datastore())
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index fbdade750..07029814c 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -18,15 +18,15 @@
 package org.apache.openwhisk.core.containerpool.v2
 
 import java.util.concurrent.atomic.AtomicInteger
-
 import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
-import org.apache.kafka.clients.producer.RecordMetadata
+
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.connector.ContainerCreationError._
 import org.apache.openwhisk.core.connector.{
   ContainerCreationAckMessage,
   ContainerCreationMessage,
-  ContainerDeletionMessage
+  ContainerDeletionMessage,
+  ResultMetadata
 }
 import org.apache.openwhisk.core.containerpool.{
   AdjustPrewarmedContainer,
@@ -81,7 +81,7 @@ class FunctionPullingContainerPool(
   poolConfig: ContainerPoolConfig,
   instance: InvokerInstanceId,
   prewarmConfig: List[PrewarmingConfig] = List.empty,
-  sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+  sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
   implicit val logging: Logging)
     extends Actor {
   import ContainerPoolV2.memoryConsumptionOf
@@ -841,7 +841,7 @@ object ContainerPoolV2 {
             poolConfig: ContainerPoolConfig,
             instance: InvokerInstanceId,
             prewarmConfig: List[PrewarmingConfig] = List.empty,
-            sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+            sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
     implicit logging: Logging): Props = {
     Props(
       new FunctionPullingContainerPool(
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
index 05cddd838..af66cd485 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
@@ -20,7 +20,6 @@ package org.apache.openwhisk.core.invoker
 import java.nio.charset.StandardCharsets
 
 import akka.actor.{ActorRef, ActorSystem, Props}
-import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
 import org.apache.openwhisk.core.WarmUp.isWarmUpAction
 import org.apache.openwhisk.core.WhiskConfig
@@ -48,7 +47,7 @@ class ContainerMessageConsumer(
   msgProvider: MessagingProvider,
   longPollDuration: FiniteDuration,
   maxPeek: Int,
-  sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+  sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
   implicit actorSystem: ActorSystem,
   executionContext: ExecutionContext,
   logging: Logging) {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 9087bbd58..154be6933 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -25,7 +25,6 @@ import akka.http.scaladsl.server.Route
 import com.ibm.etcd.api.Event.EventType
 import com.ibm.etcd.client.kv.KvClient.Watch
 import com.ibm.etcd.client.kv.WatchUpdate
-import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.ack.{ActiveAck, HealthActionAck, MessagingActiveAck, UserEventSender}
 import org.apache.openwhisk.core.connector._
@@ -267,7 +266,7 @@ class FPCInvokerReactive(config: WhiskConfig,
   }
 
   private def sendAckToScheduler(schedulerInstanceId: SchedulerInstanceId,
-                                 creationAckMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
+                                 creationAckMessage: ContainerCreationAckMessage): Future[ResultMetadata] = {
     val topic = s"${Invoker.topicPrefix}creationAck${schedulerInstanceId.asString}"
     val reschedulable =
       creationAckMessage.error.map(ContainerCreationError.whiskErrors.contains(_)).getOrElse(false)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
index a940f42f8..52cfe69eb 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -20,7 +20,6 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
 import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
 import akka.event.Logging.InfoLevel
-import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
 import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId}
 import org.apache.openwhisk.core.connector.ContainerCreationError.{
@@ -275,7 +274,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
 
   private def sendCreationContainerToInvoker(producer: MessageProducer,
                                              invoker: Int,
-                                             msg: ContainerCreationMessage): Future[RecordMetadata] = {
+                                             msg: ContainerCreationMessage): Future[ResultMetadata] = {
     implicit val transid: TransactionId = msg.transid
 
     val topic = s"${Scheduler.topicPrefix}invoker$invoker"
@@ -286,8 +285,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
         transid.finished(
           this,
           start,
-          s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status
-            .topic()}[${status.partition()}][${status.offset()}]",
+          s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status.topic}[${status.partition}][${status.offset}]",
           logLevel = InfoLevel)
       case Failure(_) =>
         logging.error(this, s"Failed to create container for ${msg.action}, error: error on posting to topic $topic")
@@ -297,7 +295,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
 
   private def sendDeletionContainerToInvoker(producer: MessageProducer,
                                              invoker: Int,
-                                             msg: ContainerDeletionMessage): Future[RecordMetadata] = {
+                                             msg: ContainerDeletionMessage): Future[ResultMetadata] = {
     implicit val transid: TransactionId = msg.transid
 
     val topic = s"${Scheduler.topicPrefix}invoker$invoker"
@@ -308,8 +306,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
         transid.finished(
           this,
           start,
-          s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status
-            .topic()}[${status.partition()}][${status.offset()}]",
+          s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status.topic}[${status.partition}][${status.offset}]",
           logLevel = InfoLevel)
       case Failure(_) =>
         logging.error(this, s"Failed to delete container for ${msg.action}, error: error on posting to topic $topic")
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
index 1e0950f45..92319ce99 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
@@ -19,19 +19,12 @@ package org.apache.openwhisk.core.connector.test
 
 import java.util.ArrayList
 import java.util.concurrent.LinkedBlockingQueue
-
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.collection.JavaConverters._
-
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
 import common.StreamLogging
-
 import org.apache.openwhisk.common.Counter
-import org.apache.openwhisk.core.connector.Message
-import org.apache.openwhisk.core.connector.MessageConsumer
-import org.apache.openwhisk.core.connector.MessageProducer
+import org.apache.openwhisk.core.connector.{Message, MessageConsumer, MessageProducer, ResultMetadata}
 
 class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax: Boolean)
     extends MessageConsumer
@@ -58,11 +51,11 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
 
   def occupancy = queue.size
 
-  def send(msg: Message): Future[RecordMetadata] = {
+  def send(msg: Message): Future[ResultMetadata] = {
     producer.send(topic, msg)
   }
 
-  def send(msgs: Seq[Message]): Future[RecordMetadata] = {
+  def send(msgs: Seq[Message]): Future[ResultMetadata] = {
     import scala.language.reflectiveCalls
     producer.sendBulk(topic, msgs)
   }
@@ -75,11 +68,11 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
   def getProducer(): MessageProducer = producer
 
   private val producer = new MessageProducer {
-    def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata] = {
+    def send(topic: String, msg: Message, retry: Int = 0): Future[ResultMetadata] = {
       queue.synchronized {
         if (queue.offer(msg)) {
           logging.info(this, s"put: $msg")
-          Future.successful(new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, -1, Long.box(-1L), -1, -1))
+          Future.successful(ResultMetadata(topic, 0, queue.size()))
         } else {
           logging.error(this, s"put failed: $msg")
           Future.failed(new IllegalStateException("failed to write msg"))
@@ -87,11 +80,11 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
       }
     }
 
-    def sendBulk(topic: String, msgs: Seq[Message]): Future[RecordMetadata] = {
+    def sendBulk(topic: String, msgs: Seq[Message]): Future[ResultMetadata] = {
       queue.synchronized {
         if (queue.addAll(msgs.asJava)) {
           logging.info(this, s"put: ${msgs.length} messages")
-          Future.successful(new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, -1, Long.box(-1L), -1, -1))
+          Future.successful(ResultMetadata(topic, 0, queue.size()))
         } else {
           logging.error(this, s"put failed: ${msgs.length} messages")
           Future.failed(new IllegalStateException("failed to write msg"))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 1904d8c11..99fc09f92 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -19,11 +19,9 @@ package org.apache.openwhisk.core.containerpool.v2.test
 
 import java.time.Instant
 import java.util.concurrent.TimeUnit
-
 import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
 import common.StreamLogging
-import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.openwhisk.common.{Enable, GracefulShutdown, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.connector.ContainerCreationError._
@@ -32,7 +30,8 @@ import org.apache.openwhisk.core.connector.{
   ContainerCreationAckMessage,
   ContainerCreationError,
   ContainerCreationMessage,
-  MessageProducer
+  MessageProducer,
+  ResultMetadata
 }
 import org.apache.openwhisk.core.containerpool.docker.DockerContainer
 import org.apache.openwhisk.core.containerpool.v2._
@@ -196,7 +195,7 @@ class FunctionPullingContainerPoolTests
       prewarmContainerCreationConfig)
 
   def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
-                                                    ackMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
+                                                    ackMessage: ContainerCreationAckMessage): Future[ResultMetadata] = {
     val topic = s"creationAck${schedulerInstanceId.asString}"
     producer.send(topic, ackMessage)
   }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
index 1ef1c11d5..b4c83e0e9 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
@@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets
 import akka.actor.ActorSystem
 import akka.testkit.{TestKit, TestProbe}
 import common.StreamLogging
-import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.{WarmUp, WhiskConfig}
 import org.apache.openwhisk.core.connector.ContainerCreationError._
@@ -115,7 +114,7 @@ class ContainerMessageConsumerTests
   }
 
   def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
-                                                    ackMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
+                                                    ackMessage: ContainerCreationAckMessage): Future[ResultMetadata] = {
     val topic = s"creationAck${schedulerInstanceId.asString}"
     producer.send(topic, ackMessage)
   }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 463005d6c..2cf0ef3f3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -21,8 +21,6 @@ import scala.collection.mutable
 import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.concurrent.Future
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterAll
@@ -46,8 +44,7 @@ import common.{LoggedFunction, StreamLogging}
 import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
 import org.apache.openwhisk.common.{InvokerHealth, InvokerState, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
-import org.apache.openwhisk.core.connector.ActivationMessage
-import org.apache.openwhisk.core.connector.PingMessage
+import org.apache.openwhisk.core.connector.{ActivationMessage, PingMessage, ResultMetadata}
 import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
@@ -108,7 +105,7 @@ class InvokerSupervisionTests
     val children = mutable.Queue(invoker5.ref, invoker2.ref)
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
 
-    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
+    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[ResultMetadata]]
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
     within(timeout.duration) {
@@ -147,7 +144,7 @@ class InvokerSupervisionTests
     val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
     val invokerName = s"invoker${invokerInstance.toInt}"
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
-    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
+    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[ResultMetadata]]
 
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
@@ -174,7 +171,7 @@ class InvokerSupervisionTests
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
 
     val sendActivationToInvoker = LoggedFunction { (a: ActivationMessage, b: InvokerInstanceId) =>
-      Future.successful(new RecordMetadata(new TopicPartition(invokerName, 0), 0L, 0L, 0L, Long.box(0L), 0, 0))
+      Future.successful(ResultMetadata(invokerName, 0, 0))
     }
 
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
@@ -421,7 +418,7 @@ class InvokerSupervisionTests
     val children = mutable.Queue(invoker0.ref)
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
 
-    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
+    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[ResultMetadata]]
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
     val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
@@ -444,7 +441,7 @@ class InvokerSupervisionTests
     val children = mutable.Queue(invoker0.ref)
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
 
-    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
+    val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[ResultMetadata]]
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
     val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 907264c82..2fb70088c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -22,10 +22,8 @@ import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
 import common.{StreamLogging, WhiskProperties}
-import java.nio.charset.StandardCharsets
 
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
+import java.nio.charset.StandardCharsets
 import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
@@ -39,12 +37,15 @@ import scala.concurrent.duration._
 import org.apache.openwhisk.common.{InvokerHealth, Logging, NestedSemaphore, TransactionId}
 import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
 import org.apache.openwhisk.core.WhiskConfig
-import org.apache.openwhisk.core.connector.ActivationMessage
-import org.apache.openwhisk.core.connector.CompletionMessage
-import org.apache.openwhisk.core.connector.Message
-import org.apache.openwhisk.core.connector.MessageConsumer
-import org.apache.openwhisk.core.connector.MessageProducer
-import org.apache.openwhisk.core.connector.MessagingProvider
+import org.apache.openwhisk.core.connector.{
+  ActivationMessage,
+  CompletionMessage,
+  Message,
+  MessageConsumer,
+  MessageProducer,
+  MessagingProvider,
+  ResultMetadata
+}
 import org.apache.openwhisk.core.entity.ActivationId
 import org.apache.openwhisk.core.entity.BasicAuthenticationAuthKey
 import org.apache.openwhisk.core.entity.ControllerInstanceId
@@ -455,7 +456,7 @@ class ShardingContainerPoolBalancerTests
     (producer
       .send(_: String, _: Message, _: Int))
       .when(*, *, *)
-      .returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0)))
+      .returns(Future.successful(ResultMetadata("fake", 0, 0)))
 
     messaging
   }
@@ -472,7 +473,7 @@ class ShardingContainerPoolBalancerTests
         actorRefFactory: ActorRefFactory,
         messagingProvider: MessagingProvider,
         messagingProducer: MessageProducer,
-        sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+        sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
         monitor: Option[ActorRef]): ActorRef =
         TestProbe().testActor
     }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
index a8b747ed3..569a66141 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -22,8 +22,6 @@ import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
 import com.ibm.etcd.api.{KeyValue, RangeResponse}
 import common.{StreamLogging, WskActorSystem}
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
 import org.apache.openwhisk.common.InvokerState.{Healthy, Unhealthy}
 import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, TransactionId}
 import org.apache.openwhisk.core.connector.ContainerCreationError.{
@@ -150,7 +148,7 @@ class ContainerManagerTests
       (producer
         .send(_: String, _: Message, _: Int))
         .when(*, *, *)
-        .returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0)))
+        .returns(Future.successful(ResultMetadata("fake", 0, 0)))
     }
 
     messaging
@@ -162,12 +160,11 @@ class ContainerManagerTests
     override def sentCount(): Long = 0
 
     /** Sends msg to topic. This is an asynchronous operation. */
-    override def send(topic: String, msg: Message, retry: Int): Future[RecordMetadata] = {
+    override def send(topic: String, msg: Message, retry: Int): Future[ResultMetadata] = {
       val message = s"$topic-$msg"
       receiver ! message
 
-      Future.successful(
-        new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1))
+      Future.successful(ResultMetadata(topic, 0, -1))
     }
 
     /** Closes producer. */
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index 2d2ead20a..ceaf3dd7b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -1,7 +1,6 @@
 package org.apache.openwhisk.core.scheduler.queue.test
 
 import java.time.Instant
-
 import akka.actor.{ActorRef, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
 import com.sksamuel.elastic4s.http
@@ -10,13 +9,17 @@ import com.sksamuel.elastic4s.http._
 import com.sksamuel.elastic4s.http.search.{SearchHits, SearchResponse}
 import com.sksamuel.elastic4s.searches.SearchRequest
 import common.StreamLogging
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.common.WhiskInstants.InstantImplicits
 import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.ack.ActiveAck
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, Message, MessageProducer}
+import org.apache.openwhisk.core.connector.{
+  AcknowledegmentMessage,
+  ActivationMessage,
+  Message,
+  MessageProducer,
+  ResultMetadata
+}
 import org.apache.openwhisk.core.database.UserContext
 import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore.generateIndex
 import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
@@ -188,11 +191,10 @@ class MemoryQueueTestsFixture
     override def sentCount(): Long = 0
 
     /** Sends msg to topic. This is an asynchronous operation. */
-    override def send(topic: String, msg: Message, retry: Int): Future[RecordMetadata] = {
+    override def send(topic: String, msg: Message, retry: Int): Future[ResultMetadata] = {
       receiver ! s"$topic-${msg}"
 
-      Future.successful(
-        new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1))
+      Future.successful(ResultMetadata(topic, 0, -1))
     }
 
     /** Closes producer. */