You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by 김동경 <st...@gmail.com> on 2014/03/04 03:26:35 UTC

Producer fails to send data when it is used in log4j appender.

I made simple log4j kafka appender.
I copied most of the code from 0.8.0 Producer example in "
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example"
to code "append" function.

I confirmed producer example code is working with my environment.
But when I use same logic for log4j appender, it didn`t work.
It is trying to fetch metadata repeatedly and I am getting infinite
"Utils$.swallowError" error.

I have no idea on swallowError.
It looks it failed to fetch metadata from broker, it is trying again and
again.
Max retries count is just 3, but I don`t know why it happens.

Are there anything that should be done to produce log data into Kafka via
log4j Appender?

---------------------------------------------------------------------------------------------------------------------------------
INFO [main] (Logging.scala:67) - Verifying properties
 INFO [main] (Logging.scala:67) - Property metadata.broker.list is
overridden to kafka01:9092
 WARN [main] (Logging.scala:82) - Property zk.connect is not valid
 INFO [main] (Logging.scala:67) - Property request.required.acks is
overridden to 1
 INFO [main] (Logging.scala:67) - Property partitioner.class is overridden
to com.samsung.rtdp.SimplePartitioner2
 INFO [main] (Logging.scala:67) - Property serializer.class is overridden
to kafka.serializer.StringEncoder
 INFO [main] (HelloWorld.java:14) - Entering application.
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
Set(KafkaAppenderTest)
.
.
.
java.lang.StackOverflowError
at java.lang.StringCoding.deref(StringCoding.java:64)
at java.lang.StringCoding.encode(StringCoding.java:275)
at java.lang.String.getBytes(String.java:954)
at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
at java.io.File.exists(File.java:791)
at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at
org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
at
org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.error(Category.java:322)
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
at kafka.utils.Utils$.swallow(Utils.scala:189)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.info(Category.java:666)
at kafka.utils.Logging$class.info(Logging.scala:67)
at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
.
.
.
----------------------------------------------------------------------------------------------------------------------------------


Here is my code.
Since it came from producer example code, it is quite straightforward.
-------------------------------------------------------------------------------------------------------------------------
package com.samsung.rtdp;

import java.io.IOException;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.ErrorCode;
import org.apache.log4j.spi.LoggingEvent;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.utils.VerifiableProperties;


public class KafkaAppender extends AppenderSkeleton {

private String brokerList;
private String serializer;
private String partitioner;
private String topic;
private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";

private Properties props;
private ProducerConfig config;
private Producer<String, String> producer;

public void setBrokerList(String brokerList) { this.brokerList =
brokerList; }
public String getBrokerList() { return this.brokerList; }

public void setSerializerClass(String serializer) { this.serializer =
serializer; }
public String getSerializer() { return this.serializer; }

public void setPartitionerClass(String partitioner) { this.partitioner =
partitioner; }
public String getPartitioner() { return this.partitioner; }

public void setTopic(String topic) { this.topic = topic; }
public String getTopic() { return this.topic; }


public void printParameters(){
System.out.println("BrokerList : " + brokerList);
System.out.println("Serializer Class : " + serializer);
System.out.println("Partitioner Class : " + partitioner);
System.out.println("Topic : " + topic);
}

public void activateOptions() {

// printParameters();

props = new Properties();

props.put("metadata.broker.list", brokerList);
props.put("serializer.class", serializer);
props.put("partitioner.class", partitioner);
props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS);
props.put("zk.connect", "kafka01:2181");

config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}


public void close() {
// TODO Auto-generated method stub
producer.close();

}


public boolean requiresLayout() {
// TODO Auto-generated method stub
return true;
}

@Override
protected void append(LoggingEvent event) {
// TODO Auto-generated method stub


// printParameters();

if( this.layout == null )
{
errorHandler.error("No layout for appender " + name , null,
ErrorCode.MISSING_LAYOUT );
return;
}

String msg = this.layout.format(event);

 KeyedMessage<String, String> data = new KeyedMessage<String,
String>("KafkaAppenderTest", msg, msg);

producer.send(data);
}

}

-------------------------------------------------------------------------------------------------------------------------------------



And this is my log4j.properties
-------------------------------------------------------------------------------------------------------------------------------------
log4j.rootLogger=INFO, stdout, KAFKA
# set the logger for your package to be the KAFKA appender
#log4j.logger.com.samsung.rtdp=INFO, KAFKA

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Pattern to output the caller's file name and line number.
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n

log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
log4j.appender.KAFKA.BrokerList=kafka01:9092
log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
log4j.appender.KAFKA.Topic=test
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
--------------------------------------------------------------------------------------------------------------------------------------

Re: Producer fails to send data when it is used in log4j appender.

Posted by Jun Rao <ju...@gmail.com>.
Just see which log4j property file is included in the java classpath. If
it's not there, you could add one to your classpath.

Thanks,

Jun


On Wed, Mar 5, 2014 at 3:13 AM, 김동경 <st...@gmail.com> wrote:

> Sorry.
> Since I used maven for dependency, Kafka is included as JAR.
> In this case, are there any way to turn off it?
>
> Thanks
> Regards
> Dongkyoung.
>
>
> 2014-03-05 13:21 GMT+09:00 Jun Rao <ju...@gmail.com>:
>
> > Just change config/log4j/properties inside Kafka.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Mar 4, 2014 at 4:09 PM, 김동경 <st...@gmail.com> wrote:
> >
> > > I couldn`t find any configuration relevant to turning off the log in
> > > http://kafka.apache.org/documentation.html#configuration.
> > > I included Kafka as Maven dependency.
> > > How could I turn off the Kafka log in the code?
> > >
> > > Thanks
> > > Regards
> > > Dongkyoung
> > >
> > > 2014-03-04 14:40 GMT+09:00 Jun Rao <ju...@gmail.com>:
> > >
> > > > I think it tries to add the logging in Kafka itself back to the
> > > > KafkaAppender.
> > > > This creates an infinite loop. Maybe you could try setting the log
> > level
> > > in
> > > > Kafka package to OFF?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <st...@gmail.com> wrote:
> > > >
> > > > > I made simple log4j kafka appender.
> > > > > I copied most of the code from 0.8.0 Producer example in "
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
> > > > "
> > > > > to code "append" function.
> > > > >
> > > > > I confirmed producer example code is working with my environment.
> > > > > But when I use same logic for log4j appender, it didn`t work.
> > > > > It is trying to fetch metadata repeatedly and I am getting infinite
> > > > > "Utils$.swallowError" error.
> > > > >
> > > > > I have no idea on swallowError.
> > > > > It looks it failed to fetch metadata from broker, it is trying
> again
> > > and
> > > > > again.
> > > > > Max retries count is just 3, but I don`t know why it happens.
> > > > >
> > > > > Are there anything that should be done to produce log data into
> Kafka
> > > via
> > > > > log4j Appender?
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> ---------------------------------------------------------------------------------------------------------------------------------
> > > > > INFO [main] (Logging.scala:67) - Verifying properties
> > > > >  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> > > > > overridden to kafka01:9092
> > > > >  WARN [main] (Logging.scala:82) - Property zk.connect is not valid
> > > > >  INFO [main] (Logging.scala:67) - Property request.required.acks is
> > > > > overridden to 1
> > > > >  INFO [main] (Logging.scala:67) - Property partitioner.class is
> > > > overridden
> > > > > to com.samsung.rtdp.SimplePartitioner2
> > > > >  INFO [main] (Logging.scala:67) - Property serializer.class is
> > > overridden
> > > > > to kafka.serializer.StringEncoder
> > > > >  INFO [main] (HelloWorld.java:14) - Entering application.
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > > .
> > > > > .
> > > > > .
> > > > > java.lang.StackOverflowError
> > > > > at java.lang.StringCoding.deref(StringCoding.java:64)
> > > > > at java.lang.StringCoding.encode(StringCoding.java:275)
> > > > > at java.lang.String.getBytes(String.java:954)
> > > > > at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> > > > > at
> > java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
> > > > > at java.io.File.exists(File.java:791)
> > > > > at
> > sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
> > > > > at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
> > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > > > at java.lang.ClassLoader.defineClass1(Native Method)
> > > > > at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
> > > > > at
> > > >
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> > > > > at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
> > > > > at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
> > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> > > > > at
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> > > > > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> > > > > at
> > > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > > > at org.apache.log4j.Category.error(Category.java:322)
> > > > > at
> > kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > > > at
> > kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallow(Utils.scala:189)
> > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > > > at kafka.producer.Producer.send(Producer.scala:76)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > > > at
> > > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > > > at org.apache.log4j.Category.info(Category.java:666)
> > > > > at kafka.utils.Logging$class.info(Logging.scala:67)
> > > > > at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
> > > > > at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> > > > > at
> > > > >
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> > > > > at kafka.utils.Utils$.swallow(Utils.scala:187)
> > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > > > at kafka.producer.Producer.send(Producer.scala:76)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > > > .
> > > > > .
> > > > > .
> > > > >
> > > > >
> > > >
> > >
> >
> ----------------------------------------------------------------------------------------------------------------------------------
> > > > >
> > > > >
> > > > > Here is my code.
> > > > > Since it came from producer example code, it is quite
> > straightforward.
> > > > >
> > > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------
> > > > > package com.samsung.rtdp;
> > > > >
> > > > > import java.io.IOException;
> > > > > import java.util.Date;
> > > > > import java.util.Properties;
> > > > > import java.util.Random;
> > > > >
> > > > > import org.apache.log4j.AppenderSkeleton;
> > > > > import org.apache.log4j.spi.ErrorCode;
> > > > > import org.apache.log4j.spi.LoggingEvent;
> > > > >
> > > > > import kafka.javaapi.producer.Producer;
> > > > > import kafka.producer.KeyedMessage;
> > > > > import kafka.producer.Partitioner;
> > > > > import kafka.producer.ProducerConfig;
> > > > > import kafka.utils.VerifiableProperties;
> > > > >
> > > > >
> > > > > public class KafkaAppender extends AppenderSkeleton {
> > > > >
> > > > > private String brokerList;
> > > > > private String serializer;
> > > > > private String partitioner;
> > > > > private String topic;
> > > > > private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";
> > > > >
> > > > > private Properties props;
> > > > > private ProducerConfig config;
> > > > > private Producer<String, String> producer;
> > > > >
> > > > > public void setBrokerList(String brokerList) { this.brokerList =
> > > > > brokerList; }
> > > > > public String getBrokerList() { return this.brokerList; }
> > > > >
> > > > > public void setSerializerClass(String serializer) {
> this.serializer =
> > > > > serializer; }
> > > > > public String getSerializer() { return this.serializer; }
> > > > >
> > > > > public void setPartitionerClass(String partitioner) {
> > this.partitioner
> > > =
> > > > > partitioner; }
> > > > > public String getPartitioner() { return this.partitioner; }
> > > > >
> > > > > public void setTopic(String topic) { this.topic = topic; }
> > > > > public String getTopic() { return this.topic; }
> > > > >
> > > > >
> > > > > public void printParameters(){
> > > > > System.out.println("BrokerList : " + brokerList);
> > > > > System.out.println("Serializer Class : " + serializer);
> > > > > System.out.println("Partitioner Class : " + partitioner);
> > > > > System.out.println("Topic : " + topic);
> > > > > }
> > > > >
> > > > > public void activateOptions() {
> > > > >
> > > > > // printParameters();
> > > > >
> > > > > props = new Properties();
> > > > >
> > > > > props.put("metadata.broker.list", brokerList);
> > > > > props.put("serializer.class", serializer);
> > > > > props.put("partitioner.class", partitioner);
> > > > > props.put("request.required.acks",
> DEFAULT_REQUIRED_REQUEST_NUACKS);
> > > > > props.put("zk.connect", "kafka01:2181");
> > > > >
> > > > > config = new ProducerConfig(props);
> > > > > producer = new Producer<String, String>(config);
> > > > > }
> > > > >
> > > > >
> > > > > public void close() {
> > > > > // TODO Auto-generated method stub
> > > > > producer.close();
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > > public boolean requiresLayout() {
> > > > > // TODO Auto-generated method stub
> > > > > return true;
> > > > > }
> > > > >
> > > > > @Override
> > > > > protected void append(LoggingEvent event) {
> > > > > // TODO Auto-generated method stub
> > > > >
> > > > >
> > > > > // printParameters();
> > > > >
> > > > > if( this.layout == null )
> > > > > {
> > > > > errorHandler.error("No layout for appender " + name , null,
> > > > > ErrorCode.MISSING_LAYOUT );
> > > > > return;
> > > > > }
> > > > >
> > > > > String msg = this.layout.format(event);
> > > > >
> > > > >  KeyedMessage<String, String> data = new KeyedMessage<String,
> > > > > String>("KafkaAppenderTest", msg, msg);
> > > > >
> > > > > producer.send(data);
> > > > > }
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > > > >
> > > > >
> > > > >
> > > > > And this is my log4j.properties
> > > > >
> > > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > > > > log4j.rootLogger=INFO, stdout, KAFKA
> > > > > # set the logger for your package to be the KAFKA appender
> > > > > #log4j.logger.com.samsung.rtdp=INFO, KAFKA
> > > > >
> > > > > log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> > > > > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> > > > >
> > > > > # Pattern to output the caller's file name and line number.
> > > > > log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) -
> > %m%n
> > > > >
> > > > > log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
> > > > > log4j.appender.KAFKA.BrokerList=kafka01:9092
> > > > > log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
> > > > >
> > >
> log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
> > > > > log4j.appender.KAFKA.Topic=test
> > > > > log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> > > > > log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) -
> %m%n
> > > > >
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------------------------------------------------
> > > > >
> > > >
> > >
> >
>

Re: Producer fails to send data when it is used in log4j appender.

Posted by 김동경 <st...@gmail.com>.
Sorry.
Since I used maven for dependency, Kafka is included as JAR.
In this case, are there any way to turn off it?

Thanks
Regards
Dongkyoung.


2014-03-05 13:21 GMT+09:00 Jun Rao <ju...@gmail.com>:

> Just change config/log4j/properties inside Kafka.
>
> Thanks,
>
> Jun
>
>
> On Tue, Mar 4, 2014 at 4:09 PM, 김동경 <st...@gmail.com> wrote:
>
> > I couldn`t find any configuration relevant to turning off the log in
> > http://kafka.apache.org/documentation.html#configuration.
> > I included Kafka as Maven dependency.
> > How could I turn off the Kafka log in the code?
> >
> > Thanks
> > Regards
> > Dongkyoung
> >
> > 2014-03-04 14:40 GMT+09:00 Jun Rao <ju...@gmail.com>:
> >
> > > I think it tries to add the logging in Kafka itself back to the
> > > KafkaAppender.
> > > This creates an infinite loop. Maybe you could try setting the log
> level
> > in
> > > Kafka package to OFF?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <st...@gmail.com> wrote:
> > >
> > > > I made simple log4j kafka appender.
> > > > I copied most of the code from 0.8.0 Producer example in "
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
> > > "
> > > > to code "append" function.
> > > >
> > > > I confirmed producer example code is working with my environment.
> > > > But when I use same logic for log4j appender, it didn`t work.
> > > > It is trying to fetch metadata repeatedly and I am getting infinite
> > > > "Utils$.swallowError" error.
> > > >
> > > > I have no idea on swallowError.
> > > > It looks it failed to fetch metadata from broker, it is trying again
> > and
> > > > again.
> > > > Max retries count is just 3, but I don`t know why it happens.
> > > >
> > > > Are there anything that should be done to produce log data into Kafka
> > via
> > > > log4j Appender?
> > > >
> > > >
> > > >
> > >
> >
> ---------------------------------------------------------------------------------------------------------------------------------
> > > > INFO [main] (Logging.scala:67) - Verifying properties
> > > >  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> > > > overridden to kafka01:9092
> > > >  WARN [main] (Logging.scala:82) - Property zk.connect is not valid
> > > >  INFO [main] (Logging.scala:67) - Property request.required.acks is
> > > > overridden to 1
> > > >  INFO [main] (Logging.scala:67) - Property partitioner.class is
> > > overridden
> > > > to com.samsung.rtdp.SimplePartitioner2
> > > >  INFO [main] (Logging.scala:67) - Property serializer.class is
> > overridden
> > > > to kafka.serializer.StringEncoder
> > > >  INFO [main] (HelloWorld.java:14) - Entering application.
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
> > > > Set(KafkaAppenderTest)
> > > > .
> > > > .
> > > > .
> > > > java.lang.StackOverflowError
> > > > at java.lang.StringCoding.deref(StringCoding.java:64)
> > > > at java.lang.StringCoding.encode(StringCoding.java:275)
> > > > at java.lang.String.getBytes(String.java:954)
> > > > at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> > > > at
> java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
> > > > at java.io.File.exists(File.java:791)
> > > > at
> sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
> > > > at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
> > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > > at java.lang.ClassLoader.defineClass1(Native Method)
> > > > at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
> > > > at
> > > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> > > > at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
> > > > at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
> > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> > > > at
> > > >
> > >
> >
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> > > > at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> > > > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> > > > at
> > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > > at org.apache.log4j.Category.error(Category.java:322)
> > > > at
> kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > > at
> kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > > at kafka.utils.Utils$.swallow(Utils.scala:189)
> > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > > at kafka.producer.Producer.send(Producer.scala:76)
> > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > > at
> > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > > at org.apache.log4j.Category.info(Category.java:666)
> > > > at kafka.utils.Logging$class.info(Logging.scala:67)
> > > > at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
> > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> > > > at
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> > > > at kafka.utils.Utils$.swallow(Utils.scala:187)
> > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > > at kafka.producer.Producer.send(Producer.scala:76)
> > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > > .
> > > > .
> > > > .
> > > >
> > > >
> > >
> >
> ----------------------------------------------------------------------------------------------------------------------------------
> > > >
> > > >
> > > > Here is my code.
> > > > Since it came from producer example code, it is quite
> straightforward.
> > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------
> > > > package com.samsung.rtdp;
> > > >
> > > > import java.io.IOException;
> > > > import java.util.Date;
> > > > import java.util.Properties;
> > > > import java.util.Random;
> > > >
> > > > import org.apache.log4j.AppenderSkeleton;
> > > > import org.apache.log4j.spi.ErrorCode;
> > > > import org.apache.log4j.spi.LoggingEvent;
> > > >
> > > > import kafka.javaapi.producer.Producer;
> > > > import kafka.producer.KeyedMessage;
> > > > import kafka.producer.Partitioner;
> > > > import kafka.producer.ProducerConfig;
> > > > import kafka.utils.VerifiableProperties;
> > > >
> > > >
> > > > public class KafkaAppender extends AppenderSkeleton {
> > > >
> > > > private String brokerList;
> > > > private String serializer;
> > > > private String partitioner;
> > > > private String topic;
> > > > private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";
> > > >
> > > > private Properties props;
> > > > private ProducerConfig config;
> > > > private Producer<String, String> producer;
> > > >
> > > > public void setBrokerList(String brokerList) { this.brokerList =
> > > > brokerList; }
> > > > public String getBrokerList() { return this.brokerList; }
> > > >
> > > > public void setSerializerClass(String serializer) { this.serializer =
> > > > serializer; }
> > > > public String getSerializer() { return this.serializer; }
> > > >
> > > > public void setPartitionerClass(String partitioner) {
> this.partitioner
> > =
> > > > partitioner; }
> > > > public String getPartitioner() { return this.partitioner; }
> > > >
> > > > public void setTopic(String topic) { this.topic = topic; }
> > > > public String getTopic() { return this.topic; }
> > > >
> > > >
> > > > public void printParameters(){
> > > > System.out.println("BrokerList : " + brokerList);
> > > > System.out.println("Serializer Class : " + serializer);
> > > > System.out.println("Partitioner Class : " + partitioner);
> > > > System.out.println("Topic : " + topic);
> > > > }
> > > >
> > > > public void activateOptions() {
> > > >
> > > > // printParameters();
> > > >
> > > > props = new Properties();
> > > >
> > > > props.put("metadata.broker.list", brokerList);
> > > > props.put("serializer.class", serializer);
> > > > props.put("partitioner.class", partitioner);
> > > > props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS);
> > > > props.put("zk.connect", "kafka01:2181");
> > > >
> > > > config = new ProducerConfig(props);
> > > > producer = new Producer<String, String>(config);
> > > > }
> > > >
> > > >
> > > > public void close() {
> > > > // TODO Auto-generated method stub
> > > > producer.close();
> > > >
> > > > }
> > > >
> > > >
> > > > public boolean requiresLayout() {
> > > > // TODO Auto-generated method stub
> > > > return true;
> > > > }
> > > >
> > > > @Override
> > > > protected void append(LoggingEvent event) {
> > > > // TODO Auto-generated method stub
> > > >
> > > >
> > > > // printParameters();
> > > >
> > > > if( this.layout == null )
> > > > {
> > > > errorHandler.error("No layout for appender " + name , null,
> > > > ErrorCode.MISSING_LAYOUT );
> > > > return;
> > > > }
> > > >
> > > > String msg = this.layout.format(event);
> > > >
> > > >  KeyedMessage<String, String> data = new KeyedMessage<String,
> > > > String>("KafkaAppenderTest", msg, msg);
> > > >
> > > > producer.send(data);
> > > > }
> > > >
> > > > }
> > > >
> > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > > >
> > > >
> > > >
> > > > And this is my log4j.properties
> > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > > > log4j.rootLogger=INFO, stdout, KAFKA
> > > > # set the logger for your package to be the KAFKA appender
> > > > #log4j.logger.com.samsung.rtdp=INFO, KAFKA
> > > >
> > > > log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> > > > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> > > >
> > > > # Pattern to output the caller's file name and line number.
> > > > log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) -
> %m%n
> > > >
> > > > log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
> > > > log4j.appender.KAFKA.BrokerList=kafka01:9092
> > > > log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
> > > >
> > log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
> > > > log4j.appender.KAFKA.Topic=test
> > > > log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> > > > log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
> > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------------------------------------------------
> > > >
> > >
> >
>

Re: Producer fails to send data when it is used in log4j appender.

Posted by Jun Rao <ju...@gmail.com>.
Just change config/log4j/properties inside Kafka.

Thanks,

Jun


On Tue, Mar 4, 2014 at 4:09 PM, 김동경 <st...@gmail.com> wrote:

> I couldn`t find any configuration relevant to turning off the log in
> http://kafka.apache.org/documentation.html#configuration.
> I included Kafka as Maven dependency.
> How could I turn off the Kafka log in the code?
>
> Thanks
> Regards
> Dongkyoung
>
> 2014-03-04 14:40 GMT+09:00 Jun Rao <ju...@gmail.com>:
>
> > I think it tries to add the logging in Kafka itself back to the
> > KafkaAppender.
> > This creates an infinite loop. Maybe you could try setting the log level
> in
> > Kafka package to OFF?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <st...@gmail.com> wrote:
> >
> > > I made simple log4j kafka appender.
> > > I copied most of the code from 0.8.0 Producer example in "
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
> > "
> > > to code "append" function.
> > >
> > > I confirmed producer example code is working with my environment.
> > > But when I use same logic for log4j appender, it didn`t work.
> > > It is trying to fetch metadata repeatedly and I am getting infinite
> > > "Utils$.swallowError" error.
> > >
> > > I have no idea on swallowError.
> > > It looks it failed to fetch metadata from broker, it is trying again
> and
> > > again.
> > > Max retries count is just 3, but I don`t know why it happens.
> > >
> > > Are there anything that should be done to produce log data into Kafka
> via
> > > log4j Appender?
> > >
> > >
> > >
> >
> ---------------------------------------------------------------------------------------------------------------------------------
> > > INFO [main] (Logging.scala:67) - Verifying properties
> > >  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> > > overridden to kafka01:9092
> > >  WARN [main] (Logging.scala:82) - Property zk.connect is not valid
> > >  INFO [main] (Logging.scala:67) - Property request.required.acks is
> > > overridden to 1
> > >  INFO [main] (Logging.scala:67) - Property partitioner.class is
> > overridden
> > > to com.samsung.rtdp.SimplePartitioner2
> > >  INFO [main] (Logging.scala:67) - Property serializer.class is
> overridden
> > > to kafka.serializer.StringEncoder
> > >  INFO [main] (HelloWorld.java:14) - Entering application.
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > > .
> > > .
> > > .
> > > java.lang.StackOverflowError
> > > at java.lang.StringCoding.deref(StringCoding.java:64)
> > > at java.lang.StringCoding.encode(StringCoding.java:275)
> > > at java.lang.String.getBytes(String.java:954)
> > > at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> > > at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
> > > at java.io.File.exists(File.java:791)
> > > at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
> > > at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > at java.lang.ClassLoader.defineClass1(Native Method)
> > > at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
> > > at
> > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> > > at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
> > > at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > at
> > >
> > >
> >
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> > > at
> > >
> >
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> > > at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> > > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> > > at
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > at
> > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > at org.apache.log4j.Category.error(Category.java:322)
> > > at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > at kafka.utils.Utils$.swallow(Utils.scala:189)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > at kafka.producer.Producer.send(Producer.scala:76)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > at
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > at
> > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > at org.apache.log4j.Category.info(Category.java:666)
> > > at kafka.utils.Logging$class.info(Logging.scala:67)
> > > at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> > > at kafka.utils.Utils$.swallow(Utils.scala:187)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > at kafka.producer.Producer.send(Producer.scala:76)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > .
> > > .
> > > .
> > >
> > >
> >
> ----------------------------------------------------------------------------------------------------------------------------------
> > >
> > >
> > > Here is my code.
> > > Since it came from producer example code, it is quite straightforward.
> > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------
> > > package com.samsung.rtdp;
> > >
> > > import java.io.IOException;
> > > import java.util.Date;
> > > import java.util.Properties;
> > > import java.util.Random;
> > >
> > > import org.apache.log4j.AppenderSkeleton;
> > > import org.apache.log4j.spi.ErrorCode;
> > > import org.apache.log4j.spi.LoggingEvent;
> > >
> > > import kafka.javaapi.producer.Producer;
> > > import kafka.producer.KeyedMessage;
> > > import kafka.producer.Partitioner;
> > > import kafka.producer.ProducerConfig;
> > > import kafka.utils.VerifiableProperties;
> > >
> > >
> > > public class KafkaAppender extends AppenderSkeleton {
> > >
> > > private String brokerList;
> > > private String serializer;
> > > private String partitioner;
> > > private String topic;
> > > private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";
> > >
> > > private Properties props;
> > > private ProducerConfig config;
> > > private Producer<String, String> producer;
> > >
> > > public void setBrokerList(String brokerList) { this.brokerList =
> > > brokerList; }
> > > public String getBrokerList() { return this.brokerList; }
> > >
> > > public void setSerializerClass(String serializer) { this.serializer =
> > > serializer; }
> > > public String getSerializer() { return this.serializer; }
> > >
> > > public void setPartitionerClass(String partitioner) { this.partitioner
> =
> > > partitioner; }
> > > public String getPartitioner() { return this.partitioner; }
> > >
> > > public void setTopic(String topic) { this.topic = topic; }
> > > public String getTopic() { return this.topic; }
> > >
> > >
> > > public void printParameters(){
> > > System.out.println("BrokerList : " + brokerList);
> > > System.out.println("Serializer Class : " + serializer);
> > > System.out.println("Partitioner Class : " + partitioner);
> > > System.out.println("Topic : " + topic);
> > > }
> > >
> > > public void activateOptions() {
> > >
> > > // printParameters();
> > >
> > > props = new Properties();
> > >
> > > props.put("metadata.broker.list", brokerList);
> > > props.put("serializer.class", serializer);
> > > props.put("partitioner.class", partitioner);
> > > props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS);
> > > props.put("zk.connect", "kafka01:2181");
> > >
> > > config = new ProducerConfig(props);
> > > producer = new Producer<String, String>(config);
> > > }
> > >
> > >
> > > public void close() {
> > > // TODO Auto-generated method stub
> > > producer.close();
> > >
> > > }
> > >
> > >
> > > public boolean requiresLayout() {
> > > // TODO Auto-generated method stub
> > > return true;
> > > }
> > >
> > > @Override
> > > protected void append(LoggingEvent event) {
> > > // TODO Auto-generated method stub
> > >
> > >
> > > // printParameters();
> > >
> > > if( this.layout == null )
> > > {
> > > errorHandler.error("No layout for appender " + name , null,
> > > ErrorCode.MISSING_LAYOUT );
> > > return;
> > > }
> > >
> > > String msg = this.layout.format(event);
> > >
> > >  KeyedMessage<String, String> data = new KeyedMessage<String,
> > > String>("KafkaAppenderTest", msg, msg);
> > >
> > > producer.send(data);
> > > }
> > >
> > > }
> > >
> > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > >
> > >
> > >
> > > And this is my log4j.properties
> > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > > log4j.rootLogger=INFO, stdout, KAFKA
> > > # set the logger for your package to be the KAFKA appender
> > > #log4j.logger.com.samsung.rtdp=INFO, KAFKA
> > >
> > > log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> > > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> > >
> > > # Pattern to output the caller's file name and line number.
> > > log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
> > >
> > > log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
> > > log4j.appender.KAFKA.BrokerList=kafka01:9092
> > > log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
> > >
> log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
> > > log4j.appender.KAFKA.Topic=test
> > > log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> > > log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
> > >
> > >
> >
> --------------------------------------------------------------------------------------------------------------------------------------
> > >
> >
>

Re: Producer fails to send data when it is used in log4j appender.

Posted by 김동경 <st...@gmail.com>.
I couldn`t find any configuration relevant to turning off the log in
http://kafka.apache.org/documentation.html#configuration.
I included Kafka as Maven dependency.
How could I turn off the Kafka log in the code?

Thanks
Regards
Dongkyoung

2014-03-04 14:40 GMT+09:00 Jun Rao <ju...@gmail.com>:

> I think it tries to add the logging in Kafka itself back to the
> KafkaAppender.
> This creates an infinite loop. Maybe you could try setting the log level in
> Kafka package to OFF?
>
> Thanks,
>
> Jun
>
>
> On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <st...@gmail.com> wrote:
>
> > I made simple log4j kafka appender.
> > I copied most of the code from 0.8.0 Producer example in "
> > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
> "
> > to code "append" function.
> >
> > I confirmed producer example code is working with my environment.
> > But when I use same logic for log4j appender, it didn`t work.
> > It is trying to fetch metadata repeatedly and I am getting infinite
> > "Utils$.swallowError" error.
> >
> > I have no idea on swallowError.
> > It looks it failed to fetch metadata from broker, it is trying again and
> > again.
> > Max retries count is just 3, but I don`t know why it happens.
> >
> > Are there anything that should be done to produce log data into Kafka via
> > log4j Appender?
> >
> >
> >
> ---------------------------------------------------------------------------------------------------------------------------------
> > INFO [main] (Logging.scala:67) - Verifying properties
> >  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> > overridden to kafka01:9092
> >  WARN [main] (Logging.scala:82) - Property zk.connect is not valid
> >  INFO [main] (Logging.scala:67) - Property request.required.acks is
> > overridden to 1
> >  INFO [main] (Logging.scala:67) - Property partitioner.class is
> overridden
> > to com.samsung.rtdp.SimplePartitioner2
> >  INFO [main] (Logging.scala:67) - Property serializer.class is overridden
> > to kafka.serializer.StringEncoder
> >  INFO [main] (HelloWorld.java:14) - Entering application.
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
> > Set(KafkaAppenderTest)
> >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
> > Set(KafkaAppenderTest)
> > .
> > .
> > .
> > java.lang.StackOverflowError
> > at java.lang.StringCoding.deref(StringCoding.java:64)
> > at java.lang.StringCoding.encode(StringCoding.java:275)
> > at java.lang.String.getBytes(String.java:954)
> > at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> > at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
> > at java.io.File.exists(File.java:791)
> > at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
> > at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > at java.lang.ClassLoader.defineClass1(Native Method)
> > at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
> > at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> > at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
> > at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > at
> >
> >
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> > at
> >
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> > at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> > at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > at
> >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > at org.apache.log4j.Category.error(Category.java:322)
> > at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > at kafka.utils.Utils$.swallow(Utils.scala:189)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > at kafka.producer.Producer.send(Producer.scala:76)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > at
> >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > at org.apache.log4j.Category.info(Category.java:666)
> > at kafka.utils.Logging$class.info(Logging.scala:67)
> > at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> > at kafka.utils.Utils$.swallow(Utils.scala:187)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > at kafka.producer.Producer.send(Producer.scala:76)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > .
> > .
> > .
> >
> >
> ----------------------------------------------------------------------------------------------------------------------------------
> >
> >
> > Here is my code.
> > Since it came from producer example code, it is quite straightforward.
> >
> >
> -------------------------------------------------------------------------------------------------------------------------
> > package com.samsung.rtdp;
> >
> > import java.io.IOException;
> > import java.util.Date;
> > import java.util.Properties;
> > import java.util.Random;
> >
> > import org.apache.log4j.AppenderSkeleton;
> > import org.apache.log4j.spi.ErrorCode;
> > import org.apache.log4j.spi.LoggingEvent;
> >
> > import kafka.javaapi.producer.Producer;
> > import kafka.producer.KeyedMessage;
> > import kafka.producer.Partitioner;
> > import kafka.producer.ProducerConfig;
> > import kafka.utils.VerifiableProperties;
> >
> >
> > public class KafkaAppender extends AppenderSkeleton {
> >
> > private String brokerList;
> > private String serializer;
> > private String partitioner;
> > private String topic;
> > private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";
> >
> > private Properties props;
> > private ProducerConfig config;
> > private Producer<String, String> producer;
> >
> > public void setBrokerList(String brokerList) { this.brokerList =
> > brokerList; }
> > public String getBrokerList() { return this.brokerList; }
> >
> > public void setSerializerClass(String serializer) { this.serializer =
> > serializer; }
> > public String getSerializer() { return this.serializer; }
> >
> > public void setPartitionerClass(String partitioner) { this.partitioner =
> > partitioner; }
> > public String getPartitioner() { return this.partitioner; }
> >
> > public void setTopic(String topic) { this.topic = topic; }
> > public String getTopic() { return this.topic; }
> >
> >
> > public void printParameters(){
> > System.out.println("BrokerList : " + brokerList);
> > System.out.println("Serializer Class : " + serializer);
> > System.out.println("Partitioner Class : " + partitioner);
> > System.out.println("Topic : " + topic);
> > }
> >
> > public void activateOptions() {
> >
> > // printParameters();
> >
> > props = new Properties();
> >
> > props.put("metadata.broker.list", brokerList);
> > props.put("serializer.class", serializer);
> > props.put("partitioner.class", partitioner);
> > props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS);
> > props.put("zk.connect", "kafka01:2181");
> >
> > config = new ProducerConfig(props);
> > producer = new Producer<String, String>(config);
> > }
> >
> >
> > public void close() {
> > // TODO Auto-generated method stub
> > producer.close();
> >
> > }
> >
> >
> > public boolean requiresLayout() {
> > // TODO Auto-generated method stub
> > return true;
> > }
> >
> > @Override
> > protected void append(LoggingEvent event) {
> > // TODO Auto-generated method stub
> >
> >
> > // printParameters();
> >
> > if( this.layout == null )
> > {
> > errorHandler.error("No layout for appender " + name , null,
> > ErrorCode.MISSING_LAYOUT );
> > return;
> > }
> >
> > String msg = this.layout.format(event);
> >
> >  KeyedMessage<String, String> data = new KeyedMessage<String,
> > String>("KafkaAppenderTest", msg, msg);
> >
> > producer.send(data);
> > }
> >
> > }
> >
> >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> >
> >
> >
> > And this is my log4j.properties
> >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > log4j.rootLogger=INFO, stdout, KAFKA
> > # set the logger for your package to be the KAFKA appender
> > #log4j.logger.com.samsung.rtdp=INFO, KAFKA
> >
> > log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> >
> > # Pattern to output the caller's file name and line number.
> > log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
> >
> > log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
> > log4j.appender.KAFKA.BrokerList=kafka01:9092
> > log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
> > log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
> > log4j.appender.KAFKA.Topic=test
> > log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> > log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
> >
> >
> --------------------------------------------------------------------------------------------------------------------------------------
> >
>

Re: Producer fails to send data when it is used in log4j appender.

Posted by Jun Rao <ju...@gmail.com>.
I think it tries to add the logging in Kafka itself back to the KafkaAppender.
This creates an infinite loop. Maybe you could try setting the log level in
Kafka package to OFF?

Thanks,

Jun


On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <st...@gmail.com> wrote:

> I made simple log4j kafka appender.
> I copied most of the code from 0.8.0 Producer example in "
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example"
> to code "append" function.
>
> I confirmed producer example code is working with my environment.
> But when I use same logic for log4j appender, it didn`t work.
> It is trying to fetch metadata repeatedly and I am getting infinite
> "Utils$.swallowError" error.
>
> I have no idea on swallowError.
> It looks it failed to fetch metadata from broker, it is trying again and
> again.
> Max retries count is just 3, but I don`t know why it happens.
>
> Are there anything that should be done to produce log data into Kafka via
> log4j Appender?
>
>
> ---------------------------------------------------------------------------------------------------------------------------------
> INFO [main] (Logging.scala:67) - Verifying properties
>  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> overridden to kafka01:9092
>  WARN [main] (Logging.scala:82) - Property zk.connect is not valid
>  INFO [main] (Logging.scala:67) - Property request.required.acks is
> overridden to 1
>  INFO [main] (Logging.scala:67) - Property partitioner.class is overridden
> to com.samsung.rtdp.SimplePartitioner2
>  INFO [main] (Logging.scala:67) - Property serializer.class is overridden
> to kafka.serializer.StringEncoder
>  INFO [main] (HelloWorld.java:14) - Entering application.
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
> Set(KafkaAppenderTest)
> .
> .
> .
> java.lang.StackOverflowError
> at java.lang.StringCoding.deref(StringCoding.java:64)
> at java.lang.StringCoding.encode(StringCoding.java:275)
> at java.lang.String.getBytes(String.java:954)
> at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
> at java.io.File.exists(File.java:791)
> at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
> at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
> at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> at
>
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> at
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at
>
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.error(Category.java:322)
> at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> at kafka.utils.Utils$.swallow(Utils.scala:189)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at
>
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.info(Category.java:666)
> at kafka.utils.Logging$class.info(Logging.scala:67)
> at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:187)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> .
> .
> .
>
> ----------------------------------------------------------------------------------------------------------------------------------
>
>
> Here is my code.
> Since it came from producer example code, it is quite straightforward.
>
> -------------------------------------------------------------------------------------------------------------------------
> package com.samsung.rtdp;
>
> import java.io.IOException;
> import java.util.Date;
> import java.util.Properties;
> import java.util.Random;
>
> import org.apache.log4j.AppenderSkeleton;
> import org.apache.log4j.spi.ErrorCode;
> import org.apache.log4j.spi.LoggingEvent;
>
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.Partitioner;
> import kafka.producer.ProducerConfig;
> import kafka.utils.VerifiableProperties;
>
>
> public class KafkaAppender extends AppenderSkeleton {
>
> private String brokerList;
> private String serializer;
> private String partitioner;
> private String topic;
> private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";
>
> private Properties props;
> private ProducerConfig config;
> private Producer<String, String> producer;
>
> public void setBrokerList(String brokerList) { this.brokerList =
> brokerList; }
> public String getBrokerList() { return this.brokerList; }
>
> public void setSerializerClass(String serializer) { this.serializer =
> serializer; }
> public String getSerializer() { return this.serializer; }
>
> public void setPartitionerClass(String partitioner) { this.partitioner =
> partitioner; }
> public String getPartitioner() { return this.partitioner; }
>
> public void setTopic(String topic) { this.topic = topic; }
> public String getTopic() { return this.topic; }
>
>
> public void printParameters(){
> System.out.println("BrokerList : " + brokerList);
> System.out.println("Serializer Class : " + serializer);
> System.out.println("Partitioner Class : " + partitioner);
> System.out.println("Topic : " + topic);
> }
>
> public void activateOptions() {
>
> // printParameters();
>
> props = new Properties();
>
> props.put("metadata.broker.list", brokerList);
> props.put("serializer.class", serializer);
> props.put("partitioner.class", partitioner);
> props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS);
> props.put("zk.connect", "kafka01:2181");
>
> config = new ProducerConfig(props);
> producer = new Producer<String, String>(config);
> }
>
>
> public void close() {
> // TODO Auto-generated method stub
> producer.close();
>
> }
>
>
> public boolean requiresLayout() {
> // TODO Auto-generated method stub
> return true;
> }
>
> @Override
> protected void append(LoggingEvent event) {
> // TODO Auto-generated method stub
>
>
> // printParameters();
>
> if( this.layout == null )
> {
> errorHandler.error("No layout for appender " + name , null,
> ErrorCode.MISSING_LAYOUT );
> return;
> }
>
> String msg = this.layout.format(event);
>
>  KeyedMessage<String, String> data = new KeyedMessage<String,
> String>("KafkaAppenderTest", msg, msg);
>
> producer.send(data);
> }
>
> }
>
>
> -------------------------------------------------------------------------------------------------------------------------------------
>
>
>
> And this is my log4j.properties
>
> -------------------------------------------------------------------------------------------------------------------------------------
> log4j.rootLogger=INFO, stdout, KAFKA
> # set the logger for your package to be the KAFKA appender
> #log4j.logger.com.samsung.rtdp=INFO, KAFKA
>
> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
>
> # Pattern to output the caller's file name and line number.
> log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
>
> log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
> log4j.appender.KAFKA.BrokerList=kafka01:9092
> log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
> log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
> log4j.appender.KAFKA.Topic=test
> log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
>
> --------------------------------------------------------------------------------------------------------------------------------------
>