You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/23 19:15:18 UTC
svn commit: r1376598 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/consumer/TopicCount.scala
main/scala/kafka/utils/Utils.scala main/scala/kafka/utils/ZkUtils.scala
test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
Author: junrao
Date: Thu Aug 23 17:15:17 2012
New Revision: 1376598
URL: http://svn.apache.org/viewvc?rev=1376598&view=rev
Log:
TopicCount.constructTopicCount isn't thread-safe; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; kafka-379
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1376598&r1=1376597&r2=1376598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Thu Aug 23 17:15:17 2012
@@ -18,11 +18,9 @@
package kafka.consumer
import scala.collection._
-import scala.util.parsing.json.JSON
import org.I0Itec.zkclient.ZkClient
import java.util.regex.Pattern
-import kafka.utils.{ZKGroupDirs, ZkUtils, Logging}
-
+import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging}
private[kafka] trait TopicCount {
def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
@@ -59,9 +57,6 @@ private[kafka] object TopicCount extends
private val BLACKLIST_PATTERN =
Pattern.compile("""!(\p{Digit}+)!(.*)""")
- val myConversionFunc = {input : String => input.toInt}
- JSON.globalNumberParser = myConversionFunc
-
def constructTopicCount(group: String,
consumerId: String,
zkClient: ZkClient) : TopicCount = {
@@ -93,7 +88,7 @@ private[kafka] object TopicCount extends
else {
var topMap : Map[String,Int] = null
try {
- JSON.parseFull(topicCountString) match {
+ SyncJSON.parseFull(topicCountString) match {
case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1376598&r1=1376597&r2=1376598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Aug 23 17:15:17 2012
@@ -32,6 +32,7 @@ import java.util.{Random, Properties}
import joptsimple.{OptionSpec, OptionSet, OptionParser}
import kafka.common.KafkaException
import kafka.cluster.Broker
+import util.parsing.json.JSON
/**
@@ -906,4 +907,24 @@ class SnapshotStats(private val monitorD
def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
}
+}
+
+/**
+ * A wrapper that synchronizes JSON in scala, which is not threadsafe.
+ */
+object SyncJSON extends Logging {
+ val myConversionFunc = {input : String => input.toInt}
+ JSON.globalNumberParser = myConversionFunc
+ val lock = new Object
+
+ def parseFull(input: String): Option[Any] = {
+ lock synchronized {
+ try {
+ JSON.parseFull(input)
+ } catch {
+ case t =>
+ throw new KafkaException("Can't parse json string: %s".format(input), t)
+ }
+ }
+ }
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1376598&r1=1376597&r2=1376598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Aug 23 17:15:17 2012
@@ -24,7 +24,6 @@ import org.I0Itec.zkclient.{IZkDataListe
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
import org.I0Itec.zkclient.serialize.ZkSerializer
import scala.collection._
-import util.parsing.json.JSON
import kafka.api.LeaderAndISR
import kafka.common.NoEpochForPartitionException
import org.apache.zookeeper.data.Stat
@@ -78,7 +77,7 @@ object ZkUtils extends Logging {
val stat = ret._2
if(leaderAndISRStr == null) None
else {
- JSON.parseFull(leaderAndISRStr) match {
+ SyncJSON.parseFull(leaderAndISRStr) match {
case Some(m) =>
val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
@@ -97,7 +96,7 @@ object ZkUtils extends Logging {
val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
if(leaderAndISR == null) None
else {
- JSON.parseFull(leaderAndISR) match {
+ SyncJSON.parseFull(leaderAndISR) match {
case Some(m) =>
Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
case None => None
@@ -113,7 +112,7 @@ object ZkUtils extends Logging {
def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
if(leaderAndISR != null) {
- val epoch = JSON.parseFull(leaderAndISR) match {
+ val epoch = SyncJSON.parseFull(leaderAndISR) match {
case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
case Some(m) =>
m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
@@ -131,7 +130,7 @@ object ZkUtils extends Logging {
val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
if(leaderAndISR == null) Seq.empty[Int]
else {
- JSON.parseFull(leaderAndISR) match {
+ SyncJSON.parseFull(leaderAndISR) match {
case Some(m) =>
val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
Utils.getCSVList(ISRString).map(r => r.toInt)
@@ -148,7 +147,7 @@ object ZkUtils extends Logging {
val assignedReplicas = if (jsonPartitionMap == null) {
Seq.empty[Int]
} else {
- JSON.parseFull(jsonPartitionMap) match {
+ SyncJSON.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
case None => Seq.empty[Int]
case Some(seq) => seq.map(_.toInt)
@@ -165,27 +164,6 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}
- def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = {
- // read previous epoch, increment it and write it to the leader path and the ISR path.
- val epoch = try {
- Some(getEpochForPartition(client, topic, partition))
- }catch {
- case e: NoEpochForPartitionException => None
- case e1 => throw e1
- }
-
- val newEpoch = epoch match {
- case Some(partitionEpoch) =>
- debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
- partitionEpoch + 1
- case None =>
- // this is the first time leader is elected for this partition. So set epoch to 1
- debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
- 1
- }
- newEpoch
- }
-
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val broker = new Broker(id, creator, host, port)
@@ -424,7 +402,7 @@ object ZkUtils extends Logging {
topics.foreach{ topic =>
val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
if (jsonPartitionMap != null) {
- JSON.parseFull(jsonPartitionMap) match {
+ SyncJSON.parseFull(jsonPartitionMap) match {
case Some(m) =>
val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
for((partition, replicas) <- replicaMap){
@@ -458,7 +436,7 @@ object ZkUtils extends Logging {
val partitionMap = if (jsonPartitionMap == null) {
Map[Int, Seq[Int]]()
} else {
- JSON.parseFull(jsonPartitionMap) match {
+ SyncJSON.parseFull(jsonPartitionMap) match {
case Some(m) =>
val m1 = m.asInstanceOf[Map[String, Seq[String]]]
m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
@@ -552,31 +530,8 @@ object ZkUtils extends Logging {
if(topics == null) Seq.empty[String]
else topics
}
-
- def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = {
- // read previous epoch, increment it and write it to the leader path and the ISR path.
- val epoch = try {
- Some(getEpochForPartition(client, topic, partition))
- }catch {
- case e: NoEpochForPartitionException => None
- case e1 => throw e1
- }
- val newEpoch = epoch match {
- case Some(partitionEpoch) =>
- debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
- partitionEpoch + 1
- case None =>
- // this is the first time leader is elected for this partition. So set epoch to 1
- debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
- LeaderAndISR.initialLeaderEpoch
- }
- newEpoch
- }
}
-
-
-
class LeaderExistsOrChangedListener(topic: String,
partition: Int,
leaderLock: ReentrantLock,
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1376598&r1=1376597&r2=1376598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Thu Aug 23 17:15:17 2012
@@ -140,7 +140,7 @@ class KafkaLog4jAppenderTest extends JUn
@Test
def testLog4jAppends() {
- PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
+ PropertyConfigurator.configure(getLog4jConfig)
for(i <- 1 to 5)
info("test")
@@ -156,7 +156,7 @@ class KafkaLog4jAppenderTest extends JUn
assertEquals(5, count)
}
- private def getLog4jConfigWithZkConnect: Properties = {
+ private def getLog4jConfig: Properties = {
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")