You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:42:35 UTC
[pulsar] 01/04: [Function] provide default error handler for function log appender (#15728)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 49087fa4fb763339a40cffe592fbcc774765b1ba
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Sun Jun 12 18:05:25 2022 -0700
[Function] provide default error handler for function log appender (#15728)
(cherry picked from commit f7635ec6d99bd5a13a31c7e9f17640746afec43c)
---
.../apache/pulsar/functions/instance/LogAppender.java | 19 ++++++++++++++++---
1 file changed, 16 insertions(+), 3 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
index 5250e4c3cef..956717975e5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.ErrorHandler;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.DefaultErrorHandler;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -35,6 +36,11 @@ import java.util.concurrent.TimeUnit;
* to a log topic.
*/
public class LogAppender implements Appender {
+
+ private static final String LOG_LEVEL = "loglevel";
+ private static final String INSTANCE = "instance";
+ private static final String FQN = "fqn";
+
private PulsarClient pulsarClient;
private String logTopic;
private String fqn;
@@ -48,15 +54,16 @@ public class LogAppender implements Appender {
this.logTopic = logTopic;
this.fqn = fqn;
this.instance = instance;
+ this.errorHandler = new DefaultErrorHandler(this);
}
@Override
public void append(LogEvent logEvent) {
producer.newMessage()
.value(logEvent.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8))
- .property("loglevel", logEvent.getLevel().name())
- .property("instance", instance)
- .property("fqn", fqn)
+ .property(LOG_LEVEL, logEvent.getLevel().name())
+ .property(INSTANCE, instance)
+ .property(FQN, fqn)
.sendAsync();
}
@@ -82,6 +89,12 @@ public class LogAppender implements Appender {
@Override
public void setHandler(ErrorHandler errorHandler) {
+ if (errorHandler == null) {
+ throw new RuntimeException("The log error handler cannot be set to null");
+ }
+ if (isStarted()) {
+ throw new RuntimeException("The log error handler cannot be changed once the appender is started");
+ }
this.errorHandler = errorHandler;
}