You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/23 19:15:18 UTC

svn commit: r1376598 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/consumer/TopicCount.scala main/scala/kafka/utils/Utils.scala main/scala/kafka/utils/ZkUtils.scala test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala

Author: junrao
Date: Thu Aug 23 17:15:17 2012
New Revision: 1376598

URL: http://svn.apache.org/viewvc?rev=1376598&view=rev
Log:
TopicCount.constructTopicCount isn't thread-safe; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; kafka-379

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1376598&r1=1376597&r2=1376598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Thu Aug 23 17:15:17 2012
@@ -18,11 +18,9 @@
 package kafka.consumer
 
 import scala.collection._
-import scala.util.parsing.json.JSON
 import org.I0Itec.zkclient.ZkClient
 import java.util.regex.Pattern
-import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
-
+import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging}
 
 private[kafka] trait TopicCount {
   def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
@@ -59,9 +57,6 @@ private[kafka] object TopicCount extends
   private val BLACKLIST_PATTERN =
     Pattern.compile("""!(\p{Digit}+)!(.*)""")
 
-  val myConversionFunc = {input : String => input.toInt}
-  JSON.globalNumberParser = myConversionFunc
-
   def constructTopicCount(group: String,
                           consumerId: String,
                           zkClient: ZkClient) : TopicCount = {
@@ -93,7 +88,7 @@ private[kafka] object TopicCount extends
     else {
       var topMap : Map[String,Int] = null
       try {
-        JSON.parseFull(topicCountString) match {
+        SyncJSON.parseFull(topicCountString) match {
           case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
           case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1376598&r1=1376597&r2=1376598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Aug 23 17:15:17 2012
@@ -32,6 +32,7 @@ import java.util.{Random, Properties}
 import joptsimple.{OptionSpec, OptionSet, OptionParser}
 import kafka.common.KafkaException
 import kafka.cluster.Broker
+import util.parsing.json.JSON
 
 
 /**
@@ -906,4 +907,24 @@ class SnapshotStats(private val monitorD
 
     def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
   }
+}
+
+/**
+ *  A wrapper that synchronizes JSON in scala, which is not threadsafe.
+ */
+object SyncJSON extends Logging {
+  val myConversionFunc = {input : String => input.toInt}
+  JSON.globalNumberParser = myConversionFunc
+  val lock = new Object
+
+  def parseFull(input: String): Option[Any] = {
+    lock synchronized {
+      try {
+        JSON.parseFull(input)
+      } catch {
+        case t =>
+          throw new KafkaException("Can't parse json string: %s".format(input), t)
+      }
+    }
+  }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1376598&r1=1376597&r2=1376598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Aug 23 17:15:17 2012
@@ -24,7 +24,6 @@ import org.I0Itec.zkclient.{IZkDataListe
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
-import util.parsing.json.JSON
 import kafka.api.LeaderAndISR
 import kafka.common.NoEpochForPartitionException
 import org.apache.zookeeper.data.Stat
@@ -78,7 +77,7 @@ object ZkUtils extends Logging {
     val stat = ret._2
     if(leaderAndISRStr == null) None
     else {
-      JSON.parseFull(leaderAndISRStr) match {
+      SyncJSON.parseFull(leaderAndISRStr) match {
         case Some(m) =>
           val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
           val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
@@ -97,7 +96,7 @@ object ZkUtils extends Logging {
     val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
     if(leaderAndISR == null) None
     else {
-      JSON.parseFull(leaderAndISR) match {
+      SyncJSON.parseFull(leaderAndISR) match {
         case Some(m) =>
           Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
         case None => None
@@ -113,7 +112,7 @@ object ZkUtils extends Logging {
   def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
     val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
     if(leaderAndISR != null) {
-      val epoch = JSON.parseFull(leaderAndISR) match {
+      val epoch = SyncJSON.parseFull(leaderAndISR) match {
         case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
         case Some(m) =>
           m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
@@ -131,7 +130,7 @@ object ZkUtils extends Logging {
     val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
     if(leaderAndISR == null) Seq.empty[Int]
     else {
-      JSON.parseFull(leaderAndISR) match {
+      SyncJSON.parseFull(leaderAndISR) match {
         case Some(m) =>
           val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
           Utils.getCSVList(ISRString).map(r => r.toInt)
@@ -148,7 +147,7 @@ object ZkUtils extends Logging {
     val assignedReplicas = if (jsonPartitionMap == null) {
       Seq.empty[Int]
     } else {
-      JSON.parseFull(jsonPartitionMap) match {
+      SyncJSON.parseFull(jsonPartitionMap) match {
         case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
           case None => Seq.empty[Int]
           case Some(seq) => seq.map(_.toInt)
@@ -165,27 +164,6 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = {
-    // read previous epoch, increment it and write it to the leader path and the ISR path.
-    val epoch = try {
-      Some(getEpochForPartition(client, topic, partition))
-    }catch {
-      case e: NoEpochForPartitionException => None
-      case e1 => throw e1
-    }
-
-    val newEpoch = epoch match {
-      case Some(partitionEpoch) =>
-        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
-        partitionEpoch + 1
-      case None =>
-        // this is the first time leader is elected for this partition. So set epoch to 1
-        debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
-        1
-    }
-    newEpoch
-  }
-
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
@@ -424,7 +402,7 @@ object ZkUtils extends Logging {
     topics.foreach{ topic =>
       val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
       if (jsonPartitionMap != null) {
-        JSON.parseFull(jsonPartitionMap) match {
+        SyncJSON.parseFull(jsonPartitionMap) match {
           case Some(m) =>
             val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
             for((partition, replicas) <- replicaMap){
@@ -458,7 +436,7 @@ object ZkUtils extends Logging {
       val partitionMap = if (jsonPartitionMap == null) {
         Map[Int, Seq[Int]]()
       } else {
-        JSON.parseFull(jsonPartitionMap) match {
+        SyncJSON.parseFull(jsonPartitionMap) match {
           case Some(m) =>
             val m1 = m.asInstanceOf[Map[String, Seq[String]]]
             m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
@@ -552,31 +530,8 @@ object ZkUtils extends Logging {
     if(topics == null) Seq.empty[String]
     else topics
   }
-
-  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = {
-    // read previous epoch, increment it and write it to the leader path and the ISR path.
-    val epoch = try {
-      Some(getEpochForPartition(client, topic, partition))
-    }catch {
-      case e: NoEpochForPartitionException => None
-      case e1 => throw e1
-    }
-    val newEpoch = epoch match {
-      case Some(partitionEpoch) =>
-        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
-        partitionEpoch + 1
-      case None =>
-        // this is the first time leader is elected for this partition. So set epoch to 1
-        debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
-        LeaderAndISR.initialLeaderEpoch
-    }
-    newEpoch
-  }
 }
 
-
-
-
 class LeaderExistsOrChangedListener(topic: String,
                                     partition: Int,
                                     leaderLock: ReentrantLock,

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1376598&r1=1376597&r2=1376598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Thu Aug 23 17:15:17 2012
@@ -140,7 +140,7 @@ class KafkaLog4jAppenderTest extends JUn
 
   @Test
   def testLog4jAppends() {
-    PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
+    PropertyConfigurator.configure(getLog4jConfig)
 
     for(i <- 1 to 5)
       info("test")
@@ -156,7 +156,7 @@ class KafkaLog4jAppenderTest extends JUn
     assertEquals(5, count)
   }
 
-  private def getLog4jConfigWithZkConnect: Properties = {
+  private def getLog4jConfig: Properties = {
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")