You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:01 UTC
[25/37] git commit: Add missing metrics in 0.8;
patched by Swapnil Ghike; reviewed by Jun Rao; kafka-604
Add missing metrics in 0.8; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-604
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9115977b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9115977b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9115977b
Branch: refs/heads/trunk
Commit: 9115977b38ad207028c7c194524a408b9e5789f0
Parents: 828ce83
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Feb 22 18:47:37 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Feb 22 18:47:37 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../scala/kafka/consumer/ConsumerTopicStats.scala | 8 ++++----
.../consumer/FetchRequestAndResponseStats.scala | 8 ++++----
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/producer/Producer.scala | 6 +++---
.../kafka/producer/ProducerRequestStats.scala | 8 ++++----
.../main/scala/kafka/producer/ProducerStats.scala | 1 -
.../scala/kafka/producer/ProducerTopicStats.scala | 11 ++++++-----
.../kafka/producer/async/DefaultEventHandler.scala | 2 +-
.../kafka/producer/async/ProducerSendThread.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++----
.../scala/kafka/server/KafkaRequestHandler.scala | 4 ++--
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 6 ++++++
14 files changed, 38 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index b55b464..469ac79 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -56,7 +56,7 @@ class Partition(val topic: String,
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
newGauge(
- topic + "-" + partitionId + "UnderReplicated",
+ topic + "-" + partitionId + "-UnderReplicated",
new Gauge[Int] {
def getValue = {
if (isUnderReplicated) 1 else 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index bb38f35..ff5f470 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -24,8 +24,8 @@ import kafka.common.ClientIdAndTopic
@threadsafe
class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
- val messageRate = newMeter(metricId + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
- val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+ val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
+ val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
}
/**
@@ -35,12 +35,12 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
class ConsumerTopicStats(clientId: String) extends Logging {
private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
- private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
+ private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
- stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+ stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 2cc0f36..875eeeb 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -23,8 +23,8 @@ import java.util.concurrent.TimeUnit
import kafka.common.ClientIdAndBroker
class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer(metricId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val requestSizeHist = newHistogram(metricId + "-FetchResponseSize")
+ val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ val requestSizeHist = newHistogram(metricId + "FetchResponseSize")
}
/**
@@ -34,12 +34,12 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaM
class FetchRequestAndResponseStats(clientId: String) {
private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
- private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "All.Brokers"))
+ private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
- stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
+ stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8d9a883..9a5f053 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -266,7 +266,7 @@ private[kafka] class Log(val dir: File,
(-1L, -1L)
} else {
BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
- BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count)
+ BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count)
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validMessages = trimInvalidBytes(messages)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 66638f2..764bb02 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -50,7 +50,6 @@ class Producer[K,V](config: ProducerConfig,
producerSendThread.start()
}
- private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
KafkaMetricsReporter.startReporters(config.props)
@@ -81,7 +80,7 @@ class Producer[K,V](config: ProducerConfig,
private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
- producerTopicStats.getProducerAllTopicStats.messageRate.mark()
+ producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
}
}
@@ -106,7 +105,8 @@ class Producer[K,V](config: ProducerConfig,
}
}
if(!added) {
- producerStats.droppedMessageRate.mark()
+ producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
+ producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
error("Event queue is full of unsent messages, could not send event: " + message.toString)
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
}else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index e29ccad..9694220 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -22,8 +22,8 @@ import kafka.utils.Pool
import kafka.common.ClientIdAndBroker
class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer(metricId + "-ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val requestSizeHist = newHistogram(metricId + "-ProducerRequestSize")
+ val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ val requestSizeHist = newHistogram(metricId + "ProducerRequestSize")
}
/**
@@ -33,12 +33,12 @@ class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGr
class ProducerRequestStats(clientId: String) {
private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
- private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "All.Brokers"))
+ private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
- stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
+ stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 62ff544..e1610d3 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -24,7 +24,6 @@ class ProducerStats(clientId: String) extends KafkaMetricsGroup {
val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS)
val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS)
val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS)
- val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index fd0b44e..ed209f4 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -24,8 +24,9 @@ import java.util.concurrent.TimeUnit
@threadsafe
class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
- val messageRate = newMeter(metricId + "-MessagesPerSec", "messages", TimeUnit.SECONDS)
- val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+ val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
+ val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
+ val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
}
/**
@@ -35,12 +36,12 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
class ProducerTopicStats(clientId: String) {
private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
- private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
+ private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
- def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
+ def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
- stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+ stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 5569cc2..0fd733e 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -56,7 +56,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
keyed =>
val dataSize = keyed.message.payloadSize
producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
- producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
+ producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData
var remainingRetries = config.messageSendMaxRetries + 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 2b39cab..6691147 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -34,7 +34,7 @@ class ProducerSendThread[K,V](val threadName: String,
private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
- newGauge(clientId + "-ProducerQueueSize-" + getId,
+ newGauge(clientId + "-ProducerQueueSize",
new Gauge[Int] {
def getValue = queue.size
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6df077b..3c84695 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -180,7 +180,7 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Append [%s] to local log ".format(partitionAndData.toString))
partitionAndData.map {case (topicAndPartition, messages) =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicStats.bytesInRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@@ -198,7 +198,7 @@ class KafkaApis(val requestChannel: RequestChannel,
null
case e =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
- BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark()
+ BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
error("Error processing ProducerRequest with correlation id %d from client %s on %s:%d"
.format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition.topic, topicAndPartition.partition), e)
new ProduceResult(topicAndPartition, e)
@@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
if (!isFetchFromFollower) {
new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
} else {
@@ -275,7 +275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} catch {
case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
- BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
+ BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), t)
new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 69ca058..842dcf3 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -82,9 +82,9 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
object BrokerTopicStats extends Logging {
private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
- private val allTopicStats = new BrokerTopicMetrics("AllTopics")
+ private val allTopicsStats = new BrokerTopicMetrics("AllTopics")
- def getBrokerAllTopicStats(): BrokerTopicMetrics = allTopicStats
+ def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
stats.getAndMaybePut(topic + "-")
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 09e261f..e7248c3 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -97,7 +97,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
* Forces some dynamic jmx beans to be registered on server startup.
*/
private def registerStats() {
- BrokerTopicStats.getBrokerAllTopicStats()
+ BrokerTopicStats.getBrokerAllTopicsStats()
ControllerStats.offlinePartitionRate
ControllerStats.uncleanLeaderElectionRate
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9115977b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4e6c8ea..f7fe0de 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -58,6 +58,12 @@ class ReplicaManager(val config: KafkaConfig,
}
)
newGauge(
+ "PartitionCount",
+ new Gauge[Int] {
+ def getValue = allPartitions.size
+ }
+ )
+ newGauge(
"UnderReplicatedPartitions",
new Gauge[Int] {
def getValue = {