You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:42:35 UTC

[pulsar] 01/04: [Function] provide default error handler for function log appender (#15728)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49087fa4fb763339a40cffe592fbcc774765b1ba
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Sun Jun 12 18:05:25 2022 -0700

    [Function] provide default error handler for function log appender (#15728)
    
    (cherry picked from commit f7635ec6d99bd5a13a31c7e9f17640746afec43c)
---
 .../apache/pulsar/functions/instance/LogAppender.java | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
index 5250e4c3cef..956717975e5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.core.Appender;
 import org.apache.logging.log4j.core.ErrorHandler;
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.DefaultErrorHandler;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -35,6 +36,11 @@ import java.util.concurrent.TimeUnit;
  * to a log topic.
  */
 public class LogAppender implements Appender {
+
+    private static final String LOG_LEVEL = "loglevel";
+    private static final String INSTANCE = "instance";
+    private static final String FQN = "fqn";
+
     private PulsarClient pulsarClient;
     private String logTopic;
     private String fqn;
@@ -48,15 +54,16 @@ public class LogAppender implements Appender {
         this.logTopic = logTopic;
         this.fqn = fqn;
         this.instance = instance;
+        this.errorHandler = new DefaultErrorHandler(this);
     }
 
     @Override
     public void append(LogEvent logEvent) {
         producer.newMessage()
                 .value(logEvent.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8))
-                .property("loglevel", logEvent.getLevel().name())
-                .property("instance", instance)
-                .property("fqn", fqn)
+                .property(LOG_LEVEL, logEvent.getLevel().name())
+                .property(INSTANCE, instance)
+                .property(FQN, fqn)
                 .sendAsync();
     }
 
@@ -82,6 +89,12 @@ public class LogAppender implements Appender {
 
     @Override
     public void setHandler(ErrorHandler errorHandler) {
+        if (errorHandler == null) {
+            throw new RuntimeException("The log error handler cannot be set to null");
+        }
+        if (isStarted()) {
+            throw new RuntimeException("The log error handler cannot be changed once the appender is started");
+        }
         this.errorHandler = errorHandler;
     }