You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "nicu marasoiu (JIRA)" <ji...@apache.org> on 2012/09/22 13:13:07 UTC

[jira] [Commented] (KAFKA-244) Improve log4j appender to use kafka.producer.Producer, and support zk.connect|broker.list options

    [ https://issues.apache.org/jira/browse/KAFKA-244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13461117#comment-13461117 ] 

nicu marasoiu commented on KAFKA-244:
-------------------------------------

Hello,

KafkaLog4jAppender 0.7.1 as commited here has a critical bug caused by a Log4j deadlock, which is not captured in the embedded servers "integration" test, let me explain why.

If Log4j initializes for the first time from the log4j.properties containing kafka appenders, then, when activateOptions() is called during Log4j boot, zookeeper client is being initialized and its send thread and event thread started. The send thread issues a LOG.info. But since Log4j is just starting up, that LOG is not yet created, or bound to the right parent, say logger.org.apache.zookeeper.client, so it is trying a synchronized(rootLogger). But that rootLogger is just under creation or at least locked by the other thread, ramping up Log4j infrastructure. So its a sheer Log4j deadlock.

Why the tests don't fail its because Log4j is already initialized at that time, and only reconfigured. So until reconfiguration is complete, it keeps using old loggers.

So if you put KAFKA on rootLogger, or even elsewhere, the KafkaAppender which connects to ZK during activateOptions causes Log4j deadlock if Log4j is not already initialized. We have written a wrapper that postpone activateOptions after Log4j bootstrap, and it is called in a ServletContextListner.start().

here is my test class reproducing the issue:



class DeferredActivationLog4jAppenderIntegrationTest extends Logging {

  val ZKHOST: String = "localhost"
  val zookeeperConnect: String = ZKHOST + ":8083"
  var logDirZk: File = null
  var kafServer: KafkaServer = null

  var zkConsumer: SimpleConsumer = null

  val tLogger = Logger.getLogger(getClass())

  private val brokerZk = 0

  private val ports = choosePorts(1)
  private val portZk = ports(0)

  private var zkServer: EmbeddedZookeeper = null

  @BeforeClass
  def before() {
    before(true)
  }

  protected def before(retry: Boolean) {
    System.setProperty(Constants.ZOOKEEPER_RETRY, retry.toString)
    System.setProperty(Constants.ZOOKEEPER_HOSTS, zookeeperConnect)
    zkServer = new EmbeddedZookeeper(zookeeperConnect)

    val propsZk = createBrokerConfig(brokerZk, portZk)
    val logDirZkPath = propsZk.getProperty("log.dir")
    logDirZk = new File(logDirZkPath)
    kafServer = createServer(new KafkaConfig(propsZk))

    Thread.sleep(100)

    zkConsumer = new SimpleConsumer(ZKHOST, portZk, 1000000, 64 * 1024)
  }

  @AfterClass
  def after() {
    zkConsumer.close

    kafServer.shutdown
    Utils.rm(logDirZk)

    Thread.sleep(200)
    zkServer.shutdown
    Thread.sleep(200)
  }

  @Test
  def testZkConnectLog4jAppends() {
    setupLog4j

    for (i <- 1 to 5)
      info("test")

    Thread.sleep(500)

    var offset = 0L
    val messages = zkConsumer.fetch(new FetchRequest("test-topic", 0, offset, 1024 * 1024))

    var count = 0
    for (message <- messages) {
      count = count + 1
    }

    assertEquals(5, count)
  }


  def setupLog4j {
    PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
  }

  private def getLog4jConfigWithZkConnect: Properties = {
    var props = new Properties()
    props.put("log4j.debug", "true")
    props.put("log4j.appender.STDOUT", "org.apache.log4j.ConsoleAppender")
    props.put("log4j.appender.STDOUT.Target", "System.out")
    props.put("log4j.appender.STDOUT.layout", "org.apache.log4j.PatternLayout")
    props.put("log4j.appender.STDOUT.layout.ConversionPattern", "%-5p: %c - %m%n")

    //    props.put("log4j.appender.KAFKA", "com.adobe.pass.commons.kafka.producer.DeferredActivationKafkaLog4jAppender")
    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")

    props.put("log4j.appender.KAFKA.Topic", "test-topic")

    props.put("log4j.appender.KAFKA.ZkConnect", zookeeperConnect)
    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")

    props.put("log4j.logger.org", "TRACE, KAFKA")
    props.put("log4j.logger.com", "TRACE, KAFKA")

    props.put("log4j.rootLogger", "INFO")
    props
  }

                
> Improve log4j appender to use kafka.producer.Producer, and support zk.connect|broker.list options  
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-244
>                 URL: https://issues.apache.org/jira/browse/KAFKA-244
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.7.1
>            Reporter: Stefano Santoro
>            Assignee: Stefano Santoro
>              Labels: log4j, newbie
>             Fix For: 0.7.1
>
>         Attachments: kafka-244-v3.patch, Log4jAppender.patch, Log4jAppenderWithWorkingZkConnect.patch
>
>
> Taken from #kafka IRC session with Neha Narkhede:
> The log4j appender is quite obsolete, there are a few things to change there. Make it use the kafka.producer.Producer instead of SyncProducer. That allows you to use either the broker.list or the zk.connect option

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira