You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/03/27 18:58:43 UTC

[incubator-pulsar] branch master updated: Send Log statements to log Topic for Java Functions (#1447)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b9546  Send Log statements to log Topic for Java Functions (#1447)
86b9546 is described below

commit 86b95461602b1f813e41f382eac41a6b2a31ee63
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Mar 27 11:58:41 2018 -0700

    Send Log statements to log Topic for Java Functions (#1447)
    
    * Added LogTopic support to Java functions
    
    * UseLogAppender instead of PulsarAppender
    
    * Reverted changes to PulsarAppender
    
    * No need to take dep on pulsar-log4j plugin
    
    * Start the appender
    
    * Fixed comments
    
    * Reverted back logging function change
---
 .../functions/instance/JavaInstanceRunnable.java   |  38 +++++++
 .../pulsar/functions/instance/LogAppender.java     | 121 +++++++++++++++++++++
 2 files changed, 159 insertions(+)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3d1934c..510608f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -45,6 +45,9 @@ import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
 import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.logging.log4j.ThreadContext;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -85,6 +88,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
     @Getter(AccessLevel.PACKAGE)
     private final Map<String, Consumer> inputConsumers;
     private LinkedList<String> inputTopicsToResubscribe = null;
+    private LogAppender logAppender;
 
     // provide tables for storing states
     private final String stateStorageServiceUrl;
@@ -188,6 +192,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
         startOutputProducer();
         // start the input consumer
         startInputConsumer();
+        // start any log topic handler
+        setupLogHandler();
 
         return new JavaInstance(instanceConfig, object, clsLoader, client, inputConsumers);
     }
@@ -262,10 +268,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
                 }
                 long processAt = System.currentTimeMillis();
                 stats.incrementProcessed(processAt);
+                addLogTopicHandler();
                 result = javaInstance.handleMessage(
                         msg.getActualMessage().getMessageId(),
                         msg.getTopicName(),
                         input);
+                removeLogTopicHandler();
 
                 long doneProcessing = System.currentTimeMillis();
                 log.debug("Got result: {}", result.getResult());
@@ -755,4 +763,34 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
             }
         }
     }
+
+    private void setupLogHandler() {
+        if (instanceConfig.getFunctionConfig().getLogTopic() != null &&
+                !instanceConfig.getFunctionConfig().getLogTopic().isEmpty()) {
+            logAppender = new LogAppender(client, instanceConfig.getFunctionConfig().getLogTopic(),
+                    FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()));
+            logAppender.start();
+        }
+    }
+
+    private void addLogTopicHandler() {
+        if (logAppender == null) return;
+        LoggerContext context = LoggerContext.getContext(false);
+        Configuration config = context.getConfiguration();
+        config.addAppender(logAppender);
+        for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
+            loggerConfig.addAppender(logAppender, null, null);
+        }
+        config.getRootLogger().addAppender(logAppender, null, null);
+    }
+
+    private void removeLogTopicHandler() {
+        if (logAppender == null) return;
+        LoggerContext context = LoggerContext.getContext(false);
+        Configuration config = context.getConfiguration();
+        for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
+            loggerConfig.removeAppender(logAppender.getName());
+        }
+        config.getRootLogger().removeAppender(logAppender.getName());
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
new file mode 100644
index 0000000..3dd71a2
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import org.apache.logging.log4j.core.*;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * LogAppender class that is used to send log statements from Pulsar Functions logger
+ * to a log topic.
+ */
+public class LogAppender implements Appender {
+    private PulsarClient pulsarClient;
+    private String logTopic;
+    private String fqn;
+    private State state;
+    private ErrorHandler errorHandler;
+    private Producer<byte[]> producer;
+
+    public LogAppender(PulsarClient pulsarClient, String logTopic, String fqn) {
+        this.pulsarClient = pulsarClient;
+        this.logTopic = logTopic;
+        this.fqn = fqn;
+    }
+
+    @Override
+    public void append(LogEvent logEvent) {
+        producer.sendAsync(logEvent.getMessage().getFormattedMessage().getBytes());
+    }
+
+    @Override
+    public String getName() {
+        return fqn;
+    }
+
+    @Override
+    public Layout<? extends Serializable> getLayout() {
+        return null;
+    }
+
+    @Override
+    public boolean ignoreExceptions() {
+        return false;
+    }
+
+    @Override
+    public ErrorHandler getHandler() {
+        return errorHandler;
+    }
+
+    @Override
+    public void setHandler(ErrorHandler errorHandler) {
+        this.errorHandler = errorHandler;
+    }
+
+    @Override
+    public State getState() {
+        return state;
+    }
+
+    @Override
+    public void initialize() {
+        this.state = State.INITIALIZED;
+    }
+
+    @Override
+    public void start() {
+        this.state = State.STARTING;
+        try {
+            ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                    .topic(logTopic)
+                    .producerName(fqn)
+                    .blockIfQueueFull(false)
+                    .enableBatching(true)
+                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
+            producer = producerBuilder.create();
+        } catch (Exception e) {
+            throw new RuntimeException("Error starting LogTopic Producer", e);
+        }
+        this.state = State.STARTED;
+    }
+
+    @Override
+    public void stop() {
+        this.state = State.STOPPING;
+        producer.closeAsync();
+        producer = null;
+        this.state = State.STOPPED;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return state == State.STARTED;
+    }
+
+    @Override
+    public boolean isStopped() {
+        return state == State.STOPPED;
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.