You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/18 07:04:01 UTC
svn commit: r1386987 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
consumer/TopicCount.scala utils/Utils.scala
Author: junrao
Date: Tue Sep 18 05:04:00 2012
New Revision: 1386987
URL: http://svn.apache.org/viewvc?rev=1386987&view=rev
Log:
TopicCount.constructTopicCount isn't thread-safe; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-379
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1386987&r1=1386986&r2=1386987&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Tue Sep 18 05:04:00 2012
@@ -18,10 +18,9 @@
package kafka.consumer
import scala.collection._
-import scala.util.parsing.json.JSON
import org.I0Itec.zkclient.ZkClient
import java.util.regex.Pattern
-import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
+import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging}
private[kafka] trait TopicCount {
@@ -60,9 +59,6 @@ private[kafka] object TopicCount extends
private val BLACKLIST_PATTERN =
Pattern.compile("""!(\p{Digit}+)!(.*)""")
- val myConversionFunc = {input : String => input.toInt}
- JSON.globalNumberParser = myConversionFunc
-
def constructTopicCount(group: String,
consumerId: String,
zkClient: ZkClient) : TopicCount = {
@@ -94,7 +90,7 @@ private[kafka] object TopicCount extends
else {
var topMap : Map[String,Int] = null
try {
- JSON.parseFull(topicCountString) match {
+ SyncJSON.parseFull(topicCountString) match {
case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1386987&r1=1386986&r2=1386987&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Tue Sep 18 05:04:00 2012
@@ -30,7 +30,7 @@ import scala.collection.mutable
import kafka.message.{NoCompressionCodec, CompressionCodec}
import org.I0Itec.zkclient.ZkClient
import joptsimple.{OptionSpec, OptionSet, OptionParser}
-
+import util.parsing.json.JSON
/**
* Helper functions!
@@ -819,3 +819,23 @@ class SnapshotStats(private val monitorD
def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
}
}
+
+/**
+ * A wrapper that synchronizes JSON in scala, which is not threadsafe.
+ */
+object SyncJSON extends Logging {
+ val myConversionFunc = {input : String => input.toInt}
+ JSON.globalNumberParser = myConversionFunc
+ val lock = new Object
+
+ def parseFull(input: String): Option[Any] = {
+ lock synchronized {
+ try {
+ JSON.parseFull(input)
+ } catch {
+ case t =>
+ throw new RuntimeException("Can't parse json string: %s".format(input), t)
+ }
+ }
+ }
+}
\ No newline at end of file