You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by mi...@apache.org on 2017/05/04 12:48:52 UTC
logging-log4j2 git commit: Improve error handling in KafkaAppender
Repository: logging-log4j2
Updated Branches:
refs/heads/master ef8e9e09a -> 9ce722cdf
Improve error handling in KafkaAppender
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/9ce722cd
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/9ce722cd
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/9ce722cd
Branch: refs/heads/master
Commit: 9ce722cdf7bbdc9e80824603405705388330258f
Parents: ef8e9e0
Author: Mikael Ståldal <mi...@magine.com>
Authored: Thu May 4 14:48:46 2017 +0200
Committer: Mikael Ståldal <mi...@magine.com>
Committed: Thu May 4 14:48:46 2017 +0200
----------------------------------------------------------------------
.../core/appender/mom/kafka/KafkaAppender.java | 42 +++++++++++---------
.../core/appender/mom/kafka/KafkaManager.java | 2 +-
2 files changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/9ce722cd/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index 8d37f35..2c495ae 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -20,14 +20,15 @@ package org.apache.logging.log4j.core.appender.mom.kafka;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
-import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.Node;
import org.apache.logging.log4j.core.config.Property;
@@ -133,29 +134,32 @@ public final class KafkaAppender extends AbstractAppender {
LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
} else {
try {
- final Layout<? extends Serializable> layout = getLayout();
- byte[] data;
- if (layout != null) {
- if (layout instanceof SerializedLayout) {
- final byte[] header = layout.getHeader();
- final byte[] body = layout.toByteArray(event);
- data = new byte[header.length + body.length];
- System.arraycopy(header, 0, data, 0, header.length);
- System.arraycopy(body, 0, data, header.length, body.length);
- } else {
- data = layout.toByteArray(event);
- }
- } else {
- data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
- }
- manager.send(data);
+ tryAppend(event);
} catch (final Exception e) {
- LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
- throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
+ error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
}
}
}
+ private void tryAppend(LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
+ final Layout<? extends Serializable> layout = getLayout();
+ byte[] data;
+ if (layout != null) {
+ if (layout instanceof SerializedLayout) {
+ final byte[] header = layout.getHeader();
+ final byte[] body = layout.toByteArray(event);
+ data = new byte[header.length + body.length];
+ System.arraycopy(header, 0, data, 0, header.length);
+ System.arraycopy(body, 0, data, header.length, body.length);
+ } else {
+ data = layout.toByteArray(event);
+ }
+ } else {
+ data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
+ }
+ manager.send(data);
+ }
+
@Override
public void start() {
super.start();
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/9ce722cd/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
----------------------------------------------------------------------
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
index d74abb4..52d21f4 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
@@ -104,7 +104,7 @@ public class KafkaManager extends AbstractManager {
producer.send(newRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
- LOGGER.error("Unable to write to Kafka [" + getName() + "].", e);
+ LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
}
}
});