You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/05/10 05:21:19 UTC
[openwhisk] branch master updated: Replace kafka.RecordMetadata with a common ResultMetadata (#5217)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new cbdcfe574 Replace kafka.RecordMetadata with a common ResultMetadata (#5217)
cbdcfe574 is described below
commit cbdcfe574984b6509335da64da86f80d592aaa1e
Author: jiangpch <ji...@navercorp.com>
AuthorDate: Tue May 10 13:21:12 2022 +0800
Replace kafka.RecordMetadata with a common ResultMetadata (#5217)
* Replace kafka.RecordMetadata with a common ResultMetadata
* remove unused import
---
.../connector/kafka/KafkaProducerConnector.scala | 11 ++++++-----
.../openwhisk/connector/lean/LeanProducer.scala | 10 ++++------
.../apache/openwhisk/core/connector/Message.scala | 2 ++
.../openwhisk/core/connector/MessageProducer.scala | 4 +---
.../core/loadBalancer/CommonLoadBalancer.scala | 5 ++---
.../core/loadBalancer/FPCPoolBalancer.scala | 5 ++---
.../core/loadBalancer/InvokerPoolFactory.scala | 8 +++-----
.../core/loadBalancer/InvokerSupervision.scala | 7 +++----
.../ShardingContainerPoolBalancer.scala | 3 +--
.../v2/FunctionPullingContainerPool.scala | 10 +++++-----
.../core/invoker/ContainerMessageConsumer.scala | 3 +--
.../core/invoker/FPCInvokerReactive.scala | 3 +--
.../scheduler/container/ContainerManager.scala | 11 ++++-------
.../core/connector/test/TestConnector.scala | 21 +++++++-------------
.../test/FunctionPullingContainerPoolTests.scala | 7 +++----
.../test/ContainerMessageConsumerTests.scala | 3 +--
.../test/InvokerSupervisionTests.scala | 15 ++++++--------
.../test/ShardingContainerPoolBalancerTests.scala | 23 +++++++++++-----------
.../container/test/ContainerManagerTests.scala | 9 +++------
.../queue/test/MemoryQueueTestsFixture.scala | 16 ++++++++-------
20 files changed, 76 insertions(+), 100 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
index e08ad972f..5b616113e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaProducerConnector.scala
@@ -27,7 +27,7 @@ import pureconfig.generic.auto._
import org.apache.openwhisk.common.{Counter, Logging, TransactionId}
import org.apache.openwhisk.connector.kafka.KafkaConfiguration._
import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.connector.{Message, MessageProducer}
+import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
import org.apache.openwhisk.core.entity.{ByteSize, UUIDs}
import org.apache.openwhisk.utils.Exceptions
@@ -49,17 +49,18 @@ class KafkaProducerConnector(
override def sentCount(): Long = sentCounter.cur
/** Sends msg to topic. This is an asynchronous operation. */
- override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
+ override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid
val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
- val produced = Promise[RecordMetadata]()
+ val produced = Promise[ResultMetadata]()
Future {
blocking {
try {
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
- if (exception == null) produced.trySuccess(metadata)
+ if (exception == null)
+ produced.trySuccess(ResultMetadata(metadata.topic(), metadata.partition(), metadata.offset()))
else produced.tryFailure(exception)
}
})
@@ -72,7 +73,7 @@ class KafkaProducerConnector(
produced.future.andThen {
case Success(status) =>
- logging.debug(this, s"sent message: ${status.topic()}[${status.partition()}][${status.offset()}]")
+ logging.debug(this, s"sent message: ${status.topic}[${status.partition}][${status.offset}]")
sentCounter.next()
case Failure(t) =>
logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
index e555e85c2..2463d1410 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
@@ -18,13 +18,11 @@
package org.apache.openwhisk.connector.lean
import akka.actor.ActorSystem
+
import scala.concurrent.Future
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
import org.apache.openwhisk.common.Counter
import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.core.connector.Message
-import org.apache.openwhisk.core.connector.MessageProducer
+import org.apache.openwhisk.core.connector.{Message, MessageProducer, ResultMetadata}
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import scala.collection.mutable.Map
@@ -39,7 +37,7 @@ class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit log
override def sentCount(): Long = sentCounter.cur
/** Sends msg to topic. This is an asynchronous operation. */
- override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
+ override def send(topic: String, msg: Message, retry: Int = 3): Future[ResultMetadata] = {
implicit val transid = msg.transid
val queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]())
@@ -47,7 +45,7 @@ class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit log
Future {
queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8))
sentCounter.next()
- new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1)
+ ResultMetadata(topic, 0, -1)
}
}
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index e823cf109..9636769b2 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -48,6 +48,8 @@ trait Message {
override def toString = serialize
}
+case class ResultMetadata(topic: String, partition: Int, offset: Long)
+
case class ActivationMessage(override val transid: TransactionId,
action: FullyQualifiedEntityName,
revision: DocRevision,
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala
index e527f6960..de4e78a93 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageProducer.scala
@@ -19,15 +19,13 @@ package org.apache.openwhisk.core.connector
import scala.concurrent.Future
-import org.apache.kafka.clients.producer.RecordMetadata
-
trait MessageProducer {
/** Count of messages sent. */
def sentCount(): Long
/** Sends msg to topic. This is an asynchronous operation. */
- def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata]
+ def send(topic: String, msg: Message, retry: Int = 0): Future[ResultMetadata]
/** Closes producer. */
def close(): Unit
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 932443062..045828d66 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.LongAdder
import akka.actor.ActorSystem
import akka.event.Logging.InfoLevel
-import org.apache.kafka.clients.producer.RecordMetadata
import pureconfig._
import pureconfig.generic.auto._
import org.apache.openwhisk.common.LoggingMarkers._
@@ -189,7 +188,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
/** 3. Send the activation to the invoker */
protected def sendActivationToInvoker(producer: MessageProducer,
msg: ActivationMessage,
- invoker: InvokerInstanceId): Future[RecordMetadata] = {
+ invoker: InvokerInstanceId): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid
val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
@@ -206,7 +205,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
transid.finished(
this,
start,
- s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
+ s"posted to ${status.topic}[${status.partition}][${status.offset}]",
logLevel = InfoLevel)
case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index f606f64f8..361478213 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -8,7 +8,6 @@ import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, P
import akka.event.Logging.InfoLevel
import akka.pattern.ask
import akka.util.Timeout
-import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
@@ -250,7 +249,7 @@ class FPCPoolBalancer(config: WhiskConfig,
/** 3. Send the activation to the kafka */
private def sendActivationToKafka(producer: MessageProducer,
msg: ActivationMessage,
- topic: String): Future[RecordMetadata] = {
+ topic: String): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
@@ -261,7 +260,7 @@ class FPCPoolBalancer(config: WhiskConfig,
transid.finished(
this,
start,
- s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
+ s"posted to ${status.topic}[${status.partition}][${status.offset}]",
logLevel = InfoLevel)
case Failure(_) => transid.failed(this, start, s"error on posting to topic $topic")
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala
index 1511b6890..fd7f9fa4e 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerPoolFactory.scala
@@ -18,11 +18,9 @@
package org.apache.openwhisk.core.loadBalancer
import akka.actor.ActorRef
import akka.actor.ActorRefFactory
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.openwhisk.core.connector.ActivationMessage
-import org.apache.openwhisk.core.connector.MessageProducer
-import org.apache.openwhisk.core.connector.MessagingProvider
+import org.apache.openwhisk.core.connector.{ActivationMessage, MessageProducer, MessagingProvider, ResultMetadata}
import org.apache.openwhisk.core.entity.InvokerInstanceId
+
import scala.concurrent.Future
trait InvokerPoolFactory {
@@ -30,6 +28,6 @@ trait InvokerPoolFactory {
actorRefFactory: ActorRefFactory,
messagingProvider: MessagingProvider,
messagingProducer: MessageProducer,
- sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+ sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
monitor: Option[ActorRef]): ActorRef
}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index f526da0e7..24b0f3369 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -24,7 +24,6 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
-import org.apache.kafka.clients.producer.RecordMetadata
import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props}
import akka.actor.FSM.CurrentState
import akka.actor.FSM.SubscribeTransitionCallBack
@@ -76,7 +75,7 @@ final case class InvokerInfo(buffer: RingBuffer[InvocationFinishedResult])
* by the InvokerPool and thus might not be caught by monitoring.
*/
class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef,
- sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+ sendActivationToInvoker: (ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
pingConsumer: MessageConsumer,
monitor: Option[ActorRef])
extends Actor {
@@ -230,7 +229,7 @@ object InvokerPool {
}
def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef,
- p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+ p: (ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
pc: MessageConsumer,
m: Option[ActorRef] = None): Props = {
Props(new InvokerPool(f, p, pc, m))
@@ -273,7 +272,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
// This is done at this point to not intermingle with the state-machine especially their timeouts.
def customReceive: Receive = {
- case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
+ case _: ResultMetadata => // Ignores the result of publishing test actions to MessageProducer.
}
override def receive: Receive = customReceive.orElse(super.receive)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index f71b5d8e0..5f7b9f05c 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -26,7 +26,6 @@ import akka.cluster.ClusterEvent._
import akka.cluster.{Cluster, Member, MemberStatus}
import akka.management.scaladsl.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
-import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import pureconfig._
import pureconfig.generic.auto._
@@ -340,7 +339,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
actorRefFactory: ActorRefFactory,
messagingProvider: MessagingProvider,
messagingProducer: MessageProducer,
- sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+ sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
monitor: Option[ActorRef]): ActorRef = {
InvokerPool.prepare(instance, WhiskEntityStore.datastore())
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index fbdade750..07029814c 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -18,15 +18,15 @@
package org.apache.openwhisk.core.containerpool.v2
import java.util.concurrent.atomic.AtomicInteger
-
import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
-import org.apache.kafka.clients.producer.RecordMetadata
+
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector.ContainerCreationError._
import org.apache.openwhisk.core.connector.{
ContainerCreationAckMessage,
ContainerCreationMessage,
- ContainerDeletionMessage
+ ContainerDeletionMessage,
+ ResultMetadata
}
import org.apache.openwhisk.core.containerpool.{
AdjustPrewarmedContainer,
@@ -81,7 +81,7 @@ class FunctionPullingContainerPool(
poolConfig: ContainerPoolConfig,
instance: InvokerInstanceId,
prewarmConfig: List[PrewarmingConfig] = List.empty,
- sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+ sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
implicit val logging: Logging)
extends Actor {
import ContainerPoolV2.memoryConsumptionOf
@@ -841,7 +841,7 @@ object ContainerPoolV2 {
poolConfig: ContainerPoolConfig,
instance: InvokerInstanceId,
prewarmConfig: List[PrewarmingConfig] = List.empty,
- sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+ sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
implicit logging: Logging): Props = {
Props(
new FunctionPullingContainerPool(
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
index 05cddd838..af66cd485 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala
@@ -20,7 +20,6 @@ package org.apache.openwhisk.core.invoker
import java.nio.charset.StandardCharsets
import akka.actor.{ActorRef, ActorSystem, Props}
-import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
import org.apache.openwhisk.core.WarmUp.isWarmUpAction
import org.apache.openwhisk.core.WhiskConfig
@@ -48,7 +47,7 @@ class ContainerMessageConsumer(
msgProvider: MessagingProvider,
longPollDuration: FiniteDuration,
maxPeek: Int,
- sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
+ sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[ResultMetadata])(
implicit actorSystem: ActorSystem,
executionContext: ExecutionContext,
logging: Logging) {
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 9087bbd58..154be6933 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -25,7 +25,6 @@ import akka.http.scaladsl.server.Route
import com.ibm.etcd.api.Event.EventType
import com.ibm.etcd.client.kv.KvClient.Watch
import com.ibm.etcd.client.kv.WatchUpdate
-import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.ack.{ActiveAck, HealthActionAck, MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
@@ -267,7 +266,7 @@ class FPCInvokerReactive(config: WhiskConfig,
}
private def sendAckToScheduler(schedulerInstanceId: SchedulerInstanceId,
- creationAckMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
+ creationAckMessage: ContainerCreationAckMessage): Future[ResultMetadata] = {
val topic = s"${Invoker.topicPrefix}creationAck${schedulerInstanceId.asString}"
val reschedulable =
creationAckMessage.error.map(ContainerCreationError.whiskErrors.contains(_)).getOrElse(false)
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
index a940f42f8..52cfe69eb 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
@@ -20,7 +20,6 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.event.Logging.InfoLevel
-import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.ContainerCreationError.{
@@ -275,7 +274,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
private def sendCreationContainerToInvoker(producer: MessageProducer,
invoker: Int,
- msg: ContainerCreationMessage): Future[RecordMetadata] = {
+ msg: ContainerCreationMessage): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid
val topic = s"${Scheduler.topicPrefix}invoker$invoker"
@@ -286,8 +285,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
transid.finished(
this,
start,
- s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status
- .topic()}[${status.partition()}][${status.offset()}]",
+ s"posted creationId: ${msg.creationId} for ${msg.invocationNamespace}/${msg.action} to ${status.topic}[${status.partition}][${status.offset}]",
logLevel = InfoLevel)
case Failure(_) =>
logging.error(this, s"Failed to create container for ${msg.action}, error: error on posting to topic $topic")
@@ -297,7 +295,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
private def sendDeletionContainerToInvoker(producer: MessageProducer,
invoker: Int,
- msg: ContainerDeletionMessage): Future[RecordMetadata] = {
+ msg: ContainerDeletionMessage): Future[ResultMetadata] = {
implicit val transid: TransactionId = msg.transid
val topic = s"${Scheduler.topicPrefix}invoker$invoker"
@@ -308,8 +306,7 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
transid.finished(
this,
start,
- s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status
- .topic()}[${status.partition()}][${status.offset()}]",
+ s"posted deletion for ${msg.invocationNamespace}/${msg.action} to ${status.topic}[${status.partition}][${status.offset}]",
logLevel = InfoLevel)
case Failure(_) =>
logging.error(this, s"Failed to delete container for ${msg.action}, error: error on posting to topic $topic")
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
index 1e0950f45..92319ce99 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/TestConnector.scala
@@ -19,19 +19,12 @@ package org.apache.openwhisk.core.connector.test
import java.util.ArrayList
import java.util.concurrent.LinkedBlockingQueue
-
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.collection.JavaConverters._
-
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
import common.StreamLogging
-
import org.apache.openwhisk.common.Counter
-import org.apache.openwhisk.core.connector.Message
-import org.apache.openwhisk.core.connector.MessageConsumer
-import org.apache.openwhisk.core.connector.MessageProducer
+import org.apache.openwhisk.core.connector.{Message, MessageConsumer, MessageProducer, ResultMetadata}
class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax: Boolean)
extends MessageConsumer
@@ -58,11 +51,11 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
def occupancy = queue.size
- def send(msg: Message): Future[RecordMetadata] = {
+ def send(msg: Message): Future[ResultMetadata] = {
producer.send(topic, msg)
}
- def send(msgs: Seq[Message]): Future[RecordMetadata] = {
+ def send(msgs: Seq[Message]): Future[ResultMetadata] = {
import scala.language.reflectiveCalls
producer.sendBulk(topic, msgs)
}
@@ -75,11 +68,11 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
def getProducer(): MessageProducer = producer
private val producer = new MessageProducer {
- def send(topic: String, msg: Message, retry: Int = 0): Future[RecordMetadata] = {
+ def send(topic: String, msg: Message, retry: Int = 0): Future[ResultMetadata] = {
queue.synchronized {
if (queue.offer(msg)) {
logging.info(this, s"put: $msg")
- Future.successful(new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, -1, Long.box(-1L), -1, -1))
+ Future.successful(ResultMetadata(topic, 0, queue.size()))
} else {
logging.error(this, s"put failed: $msg")
Future.failed(new IllegalStateException("failed to write msg"))
@@ -87,11 +80,11 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
}
}
- def sendBulk(topic: String, msgs: Seq[Message]): Future[RecordMetadata] = {
+ def sendBulk(topic: String, msgs: Seq[Message]): Future[ResultMetadata] = {
queue.synchronized {
if (queue.addAll(msgs.asJava)) {
logging.info(this, s"put: ${msgs.length} messages")
- Future.successful(new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, -1, Long.box(-1L), -1, -1))
+ Future.successful(ResultMetadata(topic, 0, queue.size()))
} else {
logging.error(this, s"put failed: ${msgs.length} messages")
Future.failed(new IllegalStateException("failed to write msg"))
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 1904d8c11..99fc09f92 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -19,11 +19,9 @@ package org.apache.openwhisk.core.containerpool.v2.test
import java.time.Instant
import java.util.concurrent.TimeUnit
-
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
import common.StreamLogging
-import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.{Enable, GracefulShutdown, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ContainerCreationError._
@@ -32,7 +30,8 @@ import org.apache.openwhisk.core.connector.{
ContainerCreationAckMessage,
ContainerCreationError,
ContainerCreationMessage,
- MessageProducer
+ MessageProducer,
+ ResultMetadata
}
import org.apache.openwhisk.core.containerpool.docker.DockerContainer
import org.apache.openwhisk.core.containerpool.v2._
@@ -196,7 +195,7 @@ class FunctionPullingContainerPoolTests
prewarmContainerCreationConfig)
def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
- ackMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
+ ackMessage: ContainerCreationAckMessage): Future[ResultMetadata] = {
val topic = s"creationAck${schedulerInstanceId.asString}"
producer.send(topic, ackMessage)
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
index 1ef1c11d5..b4c83e0e9 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala
@@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets
import akka.actor.ActorSystem
import akka.testkit.{TestKit, TestProbe}
import common.StreamLogging
-import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.{WarmUp, WhiskConfig}
import org.apache.openwhisk.core.connector.ContainerCreationError._
@@ -115,7 +114,7 @@ class ContainerMessageConsumerTests
}
def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
- ackMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
+ ackMessage: ContainerCreationAckMessage): Future[ResultMetadata] = {
val topic = s"creationAck${schedulerInstanceId.asString}"
producer.send(topic, ackMessage)
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 463005d6c..2cf0ef3f3 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -21,8 +21,6 @@ import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.Future
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.BeforeAndAfterAll
@@ -46,8 +44,7 @@ import common.{LoggedFunction, StreamLogging}
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import org.apache.openwhisk.common.{InvokerHealth, InvokerState, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
-import org.apache.openwhisk.core.connector.ActivationMessage
-import org.apache.openwhisk.core.connector.PingMessage
+import org.apache.openwhisk.core.connector.{ActivationMessage, PingMessage, ResultMetadata}
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
@@ -108,7 +105,7 @@ class InvokerSupervisionTests
val children = mutable.Queue(invoker5.ref, invoker2.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
- val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
+ val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[ResultMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
within(timeout.duration) {
@@ -147,7 +144,7 @@ class InvokerSupervisionTests
val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
val invokerName = s"invoker${invokerInstance.toInt}"
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
- val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
+ val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[ResultMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
@@ -174,7 +171,7 @@ class InvokerSupervisionTests
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
val sendActivationToInvoker = LoggedFunction { (a: ActivationMessage, b: InvokerInstanceId) =>
- Future.successful(new RecordMetadata(new TopicPartition(invokerName, 0), 0L, 0L, 0L, Long.box(0L), 0, 0))
+ Future.successful(ResultMetadata(invokerName, 0, 0))
}
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
@@ -421,7 +418,7 @@ class InvokerSupervisionTests
val children = mutable.Queue(invoker0.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
- val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
+ val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[ResultMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
@@ -444,7 +441,7 @@ class InvokerSupervisionTests
val children = mutable.Queue(invoker0.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
- val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
+ val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[ResultMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 907264c82..2fb70088c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -22,10 +22,8 @@ import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.testkit.TestProbe
import common.{StreamLogging, WhiskProperties}
-import java.nio.charset.StandardCharsets
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
+import java.nio.charset.StandardCharsets
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
@@ -39,12 +37,15 @@ import scala.concurrent.duration._
import org.apache.openwhisk.common.{InvokerHealth, Logging, NestedSemaphore, TransactionId}
import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
import org.apache.openwhisk.core.WhiskConfig
-import org.apache.openwhisk.core.connector.ActivationMessage
-import org.apache.openwhisk.core.connector.CompletionMessage
-import org.apache.openwhisk.core.connector.Message
-import org.apache.openwhisk.core.connector.MessageConsumer
-import org.apache.openwhisk.core.connector.MessageProducer
-import org.apache.openwhisk.core.connector.MessagingProvider
+import org.apache.openwhisk.core.connector.{
+ ActivationMessage,
+ CompletionMessage,
+ Message,
+ MessageConsumer,
+ MessageProducer,
+ MessagingProvider,
+ ResultMetadata
+}
import org.apache.openwhisk.core.entity.ActivationId
import org.apache.openwhisk.core.entity.BasicAuthenticationAuthKey
import org.apache.openwhisk.core.entity.ControllerInstanceId
@@ -455,7 +456,7 @@ class ShardingContainerPoolBalancerTests
(producer
.send(_: String, _: Message, _: Int))
.when(*, *, *)
- .returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0)))
+ .returns(Future.successful(ResultMetadata("fake", 0, 0)))
messaging
}
@@ -472,7 +473,7 @@ class ShardingContainerPoolBalancerTests
actorRefFactory: ActorRefFactory,
messagingProvider: MessagingProvider,
messagingProducer: MessageProducer,
- sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[RecordMetadata],
+ sendActivationToInvoker: (MessageProducer, ActivationMessage, InvokerInstanceId) => Future[ResultMetadata],
monitor: Option[ActorRef]): ActorRef =
TestProbe().testActor
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
index a8b747ed3..569a66141 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -22,8 +22,6 @@ import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.ibm.etcd.api.{KeyValue, RangeResponse}
import common.{StreamLogging, WskActorSystem}
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
import org.apache.openwhisk.common.InvokerState.{Healthy, Unhealthy}
import org.apache.openwhisk.common.{GracefulShutdown, InvokerHealth, Logging, TransactionId}
import org.apache.openwhisk.core.connector.ContainerCreationError.{
@@ -150,7 +148,7 @@ class ContainerManagerTests
(producer
.send(_: String, _: Message, _: Int))
.when(*, *, *)
- .returns(Future.successful(new RecordMetadata(new TopicPartition("fake", 0), 0, 0, 0l, 0l, 0, 0)))
+ .returns(Future.successful(ResultMetadata("fake", 0, 0)))
}
messaging
@@ -162,12 +160,11 @@ class ContainerManagerTests
override def sentCount(): Long = 0
/** Sends msg to topic. This is an asynchronous operation. */
- override def send(topic: String, msg: Message, retry: Int): Future[RecordMetadata] = {
+ override def send(topic: String, msg: Message, retry: Int): Future[ResultMetadata] = {
val message = s"$topic-$msg"
receiver ! message
- Future.successful(
- new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1))
+ Future.successful(ResultMetadata(topic, 0, -1))
}
/** Closes producer. */
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index 2d2ead20a..ceaf3dd7b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -1,7 +1,6 @@
package org.apache.openwhisk.core.scheduler.queue.test
import java.time.Instant
-
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.sksamuel.elastic4s.http
@@ -10,13 +9,17 @@ import com.sksamuel.elastic4s.http._
import com.sksamuel.elastic4s.http.search.{SearchHits, SearchResponse}
import com.sksamuel.elastic4s.searches.SearchRequest
import common.StreamLogging
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.common.WhiskInstants.InstantImplicits
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.ack.ActiveAck
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, Message, MessageProducer}
+import org.apache.openwhisk.core.connector.{
+ AcknowledegmentMessage,
+ ActivationMessage,
+ Message,
+ MessageProducer,
+ ResultMetadata
+}
import org.apache.openwhisk.core.database.UserContext
import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore.generateIndex
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
@@ -188,11 +191,10 @@ class MemoryQueueTestsFixture
override def sentCount(): Long = 0
/** Sends msg to topic. This is an asynchronous operation. */
- override def send(topic: String, msg: Message, retry: Int): Future[RecordMetadata] = {
+ override def send(topic: String, msg: Message, retry: Int): Future[ResultMetadata] = {
receiver ! s"$topic-${msg}"
- Future.successful(
- new RecordMetadata(new TopicPartition(topic, 0), -1, -1, System.currentTimeMillis(), null, -1, -1))
+ Future.successful(ResultMetadata(topic, 0, -1))
}
/** Closes producer. */