You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/11/30 18:49:51 UTC
svn commit: r1415765 - in /kafka/branches/0.8/core/src:
main/scala/kafka/consumer/ main/scala/kafka/producer/
main/scala/kafka/server/ test/scala/unit/kafka/utils/
Author: nehanarkhede
Date: Fri Nov 30 17:49:49 2012
New Revision: 1415765
URL: http://svn.apache.org/viewvc?rev=1415765&view=rev
Log:
KAFKA-640 kafka.common.InvalidClientIdException in broker log4j messages; patched by Swapnil; reviewed by Neha Narkhede
Modified:
kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Fri Nov 30 17:49:49 2012
@@ -81,7 +81,7 @@ class SimpleConsumer(val host: String,
ClientId.validate(clientId)
private val lock = new Object()
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
- private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_" + host + "-port_" + port)
+ private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId + "-host_%s-port_%s".format(host, port))
private def connect(): BlockingChannel = {
close
@@ -169,7 +169,7 @@ class SimpleConsumer(val host: String,
}
}
-class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize")
+class FetchRequestAndResponseStats(clientId: String) extends KafkaMetricsGroup {
+ val requestTimer = new KafkaTimer(newTimer(clientId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ val respondSizeHist = newHistogram(clientId + "-FetchResponseSize")
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Fri Nov 30 17:49:49 2012
@@ -39,7 +39,7 @@ class SyncProducer(val config: SyncProdu
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
config.bufferSize, config.requestTimeoutMs)
- val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host + "-port_" + config.port)
+ val producerRequestStats = new ProducerRequestStats(config.clientId + "-host_%s-port_%s".format(config.host, config.port))
trace("Instantiating Scala Sync Producer")
@@ -150,7 +150,7 @@ class SyncProducer(val config: SyncProdu
}
}
-class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize")
+class ProducerRequestStats(clientId: String) extends KafkaMetricsGroup {
+ val requestTimer = new KafkaTimer(newTimer(clientId + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ val requestSizeHist = newHistogram(clientId + "-ProducerRequestSize")
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Fri Nov 30 17:49:49 2012
@@ -42,9 +42,10 @@ abstract class AbstractFetcherThread(nam
private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition()
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
- val fetcherStats = new FetcherStats(clientId)
+ private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
+ val fetcherStats = new FetcherStats(clientId + "-" + brokerInfo)
val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
- val fetcherLagStats = new FetcherLagStats(clientId)
+ val fetcherLagStats = new FetcherLagStats(clientId + "-" + brokerInfo)
/* callbacks to be defined in subclass */
@@ -65,7 +66,7 @@ abstract class AbstractFetcherThread(nam
override def doWork() {
val fetchRequestuilder = new FetchRequestBuilder().
- clientId(clientId).
+ clientId(clientId + "-" + brokerInfo).
replicaId(fetcherBrokerId).
maxWait(maxWait).
minBytes(minBytes)
Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Fri Nov 30 17:49:49 2012
@@ -28,7 +28,7 @@ class ReplicaFetcherThread(name:String,
brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name,
- clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port),
+ clientId = FetchRequest.ReplicaFetcherClientId,
sourceBroker = sourceBroker,
socketTimeout = brokerConfig.replicaSocketTimeoutMs,
socketBufferSize = brokerConfig.replicaSocketBufferSize,
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala Fri Nov 30 17:49:49 2012
@@ -27,7 +27,6 @@ class ClientIdTest {
@Test
def testInvalidClientIds() {
val invalidclientIds = new ArrayBuffer[String]()
- invalidclientIds += (".", "..")
var longName = "ATCG"
for (i <- 1 to 6)
longName += longName