You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jose Quinteiro (Created) (JIRA)" <ji...@apache.org> on 2012/03/29 21:52:22 UTC
[jira] [Created] (KAFKA-323) Add the ability to use the async
producer in the Log4j appender
Add the ability to use the async producer in the Log4j appender
---------------------------------------------------------------
Key: KAFKA-323
URL: https://issues.apache.org/jira/browse/KAFKA-323
Project: Kafka
Issue Type: Improvement
Components: core
Reporter: Jose Quinteiro
I needed the log4j appender to use the async producer, so I added a couple of configuration methods to the log4j appender. I only added methods for the configuration fields that I needed. There are several in in the various ProducerConfigs that still cannot be set in the appender.
Sample use:
KafkaLog4jAppender kafkaAppender = new KafkaLog4jAppender();
kafkaAppender.setZkConnect( "localhost:2181/kafka" );
kafkaAppender.setTopic( "webapp" );
kafkaAppender.setProducerType( "async" );
kafkaAppender.setEnqueueTimeout( Integer.toString( Integer.MIN_VALUE ) );
kafkaAppender.activateOptions();
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-323) Add the ability to use the async
producer in the Log4j appender
Posted by "Neha Narkhede (Commented) (JIRA)" <ji...@apache.org>.
[ https://issues.apache.org/jira/browse/KAFKA-323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13241585#comment-13241585 ]
Neha Narkhede commented on KAFKA-323:
-------------------------------------
Thanks for the patch ! Would you mind attaching it as a file, and granting it to Apache ?
> Add the ability to use the async producer in the Log4j appender
> ---------------------------------------------------------------
>
> Key: KAFKA-323
> URL: https://issues.apache.org/jira/browse/KAFKA-323
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Jose Quinteiro
> Labels: appender, log4j
>
> I needed the log4j appender to use the async producer, so I added a couple of configuration methods to the log4j appender. I only added methods for the configuration fields that I needed. There are several in in the various ProducerConfigs that still cannot be set in the appender.
> Sample use:
> KafkaLog4jAppender kafkaAppender = new KafkaLog4jAppender();
> kafkaAppender.setZkConnect( "localhost:2181/kafka" );
> kafkaAppender.setTopic( "webapp" );
> kafkaAppender.setProducerType( "async" );
> kafkaAppender.setEnqueueTimeout( Integer.toString( Integer.MIN_VALUE ) );
> kafkaAppender.activateOptions();
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-323) Add the ability to use the async
producer in the Log4j appender
Posted by "Jose Quinteiro (Updated) (JIRA)" <ji...@apache.org>.
[ https://issues.apache.org/jira/browse/KAFKA-323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jose Quinteiro updated KAFKA-323:
---------------------------------
Attachment: appender.patch
> Add the ability to use the async producer in the Log4j appender
> ---------------------------------------------------------------
>
> Key: KAFKA-323
> URL: https://issues.apache.org/jira/browse/KAFKA-323
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Jose Quinteiro
> Labels: appender, log4j
> Attachments: appender.patch
>
>
> I needed the log4j appender to use the async producer, so I added a couple of configuration methods to the log4j appender. I only added methods for the configuration fields that I needed. There are several in in the various ProducerConfigs that still cannot be set in the appender.
> Sample use:
> KafkaLog4jAppender kafkaAppender = new KafkaLog4jAppender();
> kafkaAppender.setZkConnect( "localhost:2181/kafka" );
> kafkaAppender.setTopic( "webapp" );
> kafkaAppender.setProducerType( "async" );
> kafkaAppender.setEnqueueTimeout( Integer.toString( Integer.MIN_VALUE ) );
> kafkaAppender.activateOptions();
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-323) Add the ability to use the async
producer in the Log4j appender
Posted by "Jose Quinteiro (Updated) (JIRA)" <ji...@apache.org>.
[ https://issues.apache.org/jira/browse/KAFKA-323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jose Quinteiro updated KAFKA-323:
---------------------------------
Status: Patch Available (was: Open)
Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
===================================================================
--- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision 1307067)
+++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy)
@@ -22,9 +22,7 @@
import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.helpers.LogLog
import kafka.utils.Logging
-import kafka.serializer.Encoder
import java.util.{Properties, Date}
-import kafka.message.Message
import scala.collection._
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
@@ -34,7 +32,11 @@
var serializerClass:String = null
var zkConnect:String = null
var brokerList:String = null
-
+ var producerType:String = null
+ var compressionCodec:String = null
+ var enqueueTimeout:String = null
+ var queueSize:String = null
+
private var producer: Producer[String, String] = null
def getTopic:String = topic
@@ -49,6 +51,18 @@
def getSerializerClass:String = serializerClass
def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
+ def getProducerType:String = producerType
+ def setProducerType(producerType:String) { this.producerType = producerType }
+
+ def getCompressionCodec:String = compressionCodec
+ def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
+
+ def getEnqueueTimeout:String = enqueueTimeout
+ def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout }
+
+ def getQueueSize:String = queueSize
+ def setQueueSize(queueSize:String) { this.queueSize = queueSize }
+
override def activateOptions() {
val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
// check for config parameter validity
@@ -68,6 +82,11 @@
LogLog.warn("Using default encoder - kafka.serializer.StringEncoder")
}
props.put("serializer.class", serializerClass)
+ //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
+ if(producerType != null) props.put("producer.type", producerType)
+ if(compressionCodec != null) props.put("compression.codec", compressionCodec)
+ if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout)
+ if(queueSize != null) props.put("queue.size", queueSize)
val config : ProducerConfig = new ProducerConfig(props)
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect))
> Add the ability to use the async producer in the Log4j appender
> ---------------------------------------------------------------
>
> Key: KAFKA-323
> URL: https://issues.apache.org/jira/browse/KAFKA-323
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Jose Quinteiro
> Labels: appender, log4j
>
> I needed the log4j appender to use the async producer, so I added a couple of configuration methods to the log4j appender. I only added methods for the configuration fields that I needed. There are several in in the various ProducerConfigs that still cannot be set in the appender.
> Sample use:
> KafkaLog4jAppender kafkaAppender = new KafkaLog4jAppender();
> kafkaAppender.setZkConnect( "localhost:2181/kafka" );
> kafkaAppender.setTopic( "webapp" );
> kafkaAppender.setProducerType( "async" );
> kafkaAppender.setEnqueueTimeout( Integer.toString( Integer.MIN_VALUE ) );
> kafkaAppender.activateOptions();
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-323) Add the ability to use the async
producer in the Log4j appender
Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
[ https://issues.apache.org/jira/browse/KAFKA-323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jay Kreps updated KAFKA-323:
----------------------------
Resolution: Fixed
Fix Version/s: 0.8
Assignee: Jay Kreps
Status: Resolved (was: Patch Available)
> Add the ability to use the async producer in the Log4j appender
> ---------------------------------------------------------------
>
> Key: KAFKA-323
> URL: https://issues.apache.org/jira/browse/KAFKA-323
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Jose Quinteiro
> Assignee: Jay Kreps
> Labels: appender, log4j
> Fix For: 0.8
>
> Attachments: appender.patch
>
>
> I needed the log4j appender to use the async producer, so I added a couple of configuration methods to the log4j appender. I only added methods for the configuration fields that I needed. There are several in in the various ProducerConfigs that still cannot be set in the appender.
> Sample use:
> KafkaLog4jAppender kafkaAppender = new KafkaLog4jAppender();
> kafkaAppender.setZkConnect( "localhost:2181/kafka" );
> kafkaAppender.setTopic( "webapp" );
> kafkaAppender.setProducerType( "async" );
> kafkaAppender.setEnqueueTimeout( Integer.toString( Integer.MIN_VALUE ) );
> kafkaAppender.activateOptions();
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira