You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/11/30 18:49:51 UTC

svn commit: r1415765 - in /kafka/branches/0.8/core/src: main/scala/kafka/consumer/ main/scala/kafka/producer/ main/scala/kafka/server/ test/scala/unit/kafka/utils/

Author: nehanarkhede
Date: Fri Nov 30 17:49:49 2012
New Revision: 1415765

URL: http://svn.apache.org/viewvc?rev=1415765&view=rev
Log:
KAFKA-640 kafka.common.InvalidClientIdException in broker log4j messages; patched by Swapnil; reviewed by Neha Narkhede

Modified:
    kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala

Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Fri Nov 30 17:49:49 2012
@@ -81,7 +81,7 @@ class SimpleConsumer(val host: String,
   ClientId.validate(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_" + host + "-port_" + port)
+  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId + "-host_%s-port_%s".format(host, port))
 
   private def connect(): BlockingChannel = {
     close
@@ -169,7 +169,7 @@ class SimpleConsumer(val host: String,
   }
 }
 
-class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize")
+class FetchRequestAndResponseStats(clientId: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val respondSizeHist = newHistogram(clientId + "-FetchResponseSize")
 }

Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Fri Nov 30 17:49:49 2012
@@ -39,7 +39,7 @@ class SyncProducer(val config: SyncProdu
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.bufferSize, config.requestTimeoutMs)
-  val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host + "-port_" + config.port)
+  val producerRequestStats = new ProducerRequestStats(config.clientId + "-host_%s-port_%s".format(config.host, config.port))
 
   trace("Instantiating Scala Sync Producer")
 
@@ -150,7 +150,7 @@ class SyncProducer(val config: SyncProdu
   }
 }
 
-class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize")
+class ProducerRequestStats(clientId: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(clientId + "-ProducerRequestSize")
 }

Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Fri Nov 30 17:49:49 2012
@@ -42,9 +42,10 @@ abstract class AbstractFetcherThread(nam
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
-  val fetcherStats = new FetcherStats(clientId)
+  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
+  val fetcherStats = new FetcherStats(clientId + "-" + brokerInfo)
   val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
-  val fetcherLagStats = new FetcherLagStats(clientId)
+  val fetcherLagStats = new FetcherLagStats(clientId + "-" + brokerInfo)
 
   /* callbacks to be defined in subclass */
 
@@ -65,7 +66,7 @@ abstract class AbstractFetcherThread(nam
 
   override def doWork() {
     val fetchRequestuilder = new FetchRequestBuilder().
-            clientId(clientId).
+            clientId(clientId + "-" + brokerInfo).
             replicaId(fetcherBrokerId).
             maxWait(maxWait).
             minBytes(minBytes)

Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Fri Nov 30 17:49:49 2012
@@ -28,7 +28,7 @@ class ReplicaFetcherThread(name:String,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port),
+                                clientId = FetchRequest.ReplicaFetcherClientId,
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketBufferSize,

Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala?rev=1415765&r1=1415764&r2=1415765&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala Fri Nov 30 17:49:49 2012
@@ -27,7 +27,6 @@ class ClientIdTest {
   @Test
   def testInvalidClientIds() {
     val invalidclientIds = new ArrayBuffer[String]()
-    invalidclientIds += (".", "..")
     var longName = "ATCG"
     for (i <- 1 to 6)
       longName += longName