You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/18 07:04:01 UTC

svn commit: r1386987 - in /incubator/kafka/trunk/core/src/main/scala/kafka: consumer/TopicCount.scala utils/Utils.scala

Author: junrao
Date: Tue Sep 18 05:04:00 2012
New Revision: 1386987

URL: http://svn.apache.org/viewvc?rev=1386987&view=rev
Log:
TopicCount.constructTopicCount isn't thread-safe; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-379

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1386987&r1=1386986&r2=1386987&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Tue Sep 18 05:04:00 2012
@@ -18,10 +18,9 @@
 package kafka.consumer
 
 import scala.collection._
-import scala.util.parsing.json.JSON
 import org.I0Itec.zkclient.ZkClient
 import java.util.regex.Pattern
-import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
+import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging}
 
 
 private[kafka] trait TopicCount {
@@ -60,9 +59,6 @@ private[kafka] object TopicCount extends
   private val BLACKLIST_PATTERN =
     Pattern.compile("""!(\p{Digit}+)!(.*)""")
 
-  val myConversionFunc = {input : String => input.toInt}
-  JSON.globalNumberParser = myConversionFunc
-
   def constructTopicCount(group: String,
                           consumerId: String,
                           zkClient: ZkClient) : TopicCount = {
@@ -94,7 +90,7 @@ private[kafka] object TopicCount extends
     else {
       var topMap : Map[String,Int] = null
       try {
-        JSON.parseFull(topicCountString) match {
+        SyncJSON.parseFull(topicCountString) match {
           case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
           case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
         }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1386987&r1=1386986&r2=1386987&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Tue Sep 18 05:04:00 2012
@@ -30,7 +30,7 @@ import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
 import joptsimple.{OptionSpec, OptionSet, OptionParser}
-
+import util.parsing.json.JSON
 
 /**
  * Helper functions!
@@ -819,3 +819,23 @@ class SnapshotStats(private val monitorD
     def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
   }
 }
+
+/**
+ *  A wrapper that synchronizes JSON in scala, which is not threadsafe.
+ */
+object SyncJSON extends Logging {
+  val myConversionFunc = {input : String => input.toInt}
+  JSON.globalNumberParser = myConversionFunc
+  val lock = new Object
+
+  def parseFull(input: String): Option[Any] = {
+    lock synchronized {
+      try {
+        JSON.parseFull(input)
+      } catch {
+        case t =>
+          throw new RuntimeException("Can't parse json string: %s".format(input), t)
+      }
+    }
+  }
+}
\ No newline at end of file