You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/27 08:36:25 UTC
[pulsar] 03/10: [Function] provide default error handler for function log appender (#15728)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0ea0e2bd0752796ad24f4d377edac3919290814a
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Sun Jun 12 18:05:25 2022 -0700
[Function] provide default error handler for function log appender (#15728)
(cherry picked from commit f7635ec6d99bd5a13a31c7e9f17640746afec43c)
---
.../pulsar/functions/instance/LogAppender.java | 25 ++++++++++++++++++----
1 file changed, 21 insertions(+), 4 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
index bbca3f9efa1..46b90e54381 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -19,7 +19,12 @@
package org.apache.pulsar.functions.instance;
import java.nio.charset.StandardCharsets;
-import org.apache.logging.log4j.core.*;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.ErrorHandler;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.DefaultErrorHandler;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -32,6 +37,11 @@ import java.util.concurrent.TimeUnit;
* to a log topic.
*/
public class LogAppender implements Appender {
+
+ private static final String LOG_LEVEL = "loglevel";
+ private static final String INSTANCE = "instance";
+ private static final String FQN = "fqn";
+
private PulsarClient pulsarClient;
private String logTopic;
private String fqn;
@@ -45,15 +55,16 @@ public class LogAppender implements Appender {
this.logTopic = logTopic;
this.fqn = fqn;
this.instance = instance;
+ this.errorHandler = new DefaultErrorHandler(this);
}
@Override
public void append(LogEvent logEvent) {
producer.newMessage()
.value(logEvent.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8))
- .property("loglevel", logEvent.getLevel().name())
- .property("instance", instance)
- .property("fqn", fqn)
+ .property(LOG_LEVEL, logEvent.getLevel().name())
+ .property(INSTANCE, instance)
+ .property(FQN, fqn)
.sendAsync();
}
@@ -79,6 +90,12 @@ public class LogAppender implements Appender {
@Override
public void setHandler(ErrorHandler errorHandler) {
+ if (errorHandler == null) {
+ throw new RuntimeException("The log error handler cannot be set to null");
+ }
+ if (isStarted()) {
+ throw new RuntimeException("The log error handler cannot be changed once the appender is started");
+ }
this.errorHandler = errorHandler;
}