You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/30 08:54:36 UTC

[pulsar] 03/04: [fix #7814] fix java function logging appender not added to java function logger (#9299)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3b5dd9727de19d21470f000d8ab817a79d3e6602
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Jan 29 14:09:59 2021 +0800

    [fix #7814] fix java function logging appender not added to java function logger (#9299)
    
    Fixes #7814
    
    `JavaInstanceRunnable` create an instance of logger named `"function-" + instanceConfig.getFunctionDetails().getName()` and pass it to `Function Context`, and the logger can be used to send user defined content to function's log-topic if `--log-topic` defined.
    
    as issue #7814 mentioned, the logger is not working as expected since user cannot consume any self defined log content from `log-topic`.
    
    this happens in process runtime with created functions, but not noticed with other situation such as `localrun` function.
    
    Through debug to the created function, the logger in `Function Context` is different from the logger in `JavaInstanceRunnable`, such as the `contextName` as images shown below. In addition, the `LogAppender` set in `JavaInstanceRunnable` is not shown in `Function Context`'s logger as well.
    
    ![Imgur](https://i.imgur.com/39DMH6R.png)
    ^^^^ from JavaInstanceRunnable
    
    ![img](https://i.imgur.com/UDw5Lzt.png)
    ^^^^ from Function Context
    
    After some tests, I find out that when get `LoggerContext` by `LoggerContext.getContext()`, the context's logAppender can be take effect to `Function Context`, and the `Function Context`'s logger works great.
    
    Add `LogAppender` to the single context from `LoggerContext.getContext()`.
    
    (cherry picked from commit 81f1bed626fc750c68ab6740d8a80a6b2821b542)
---
 .../functions/instance/JavaInstanceRunnable.java   |  22 ++-
 .../integration/functions/PulsarFunctionsTest.java | 152 +++++++++++++++++++++
 .../functions/PulsarFunctionsTestBase.java         |   3 +
 3 files changed, 174 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fa68656..4d8c862 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -219,7 +219,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     }
 
     ContextImpl setupContext() {
-        Logger instanceLog = LoggerFactory.getLogger(
+        Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
         return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
                 collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager);
@@ -464,6 +464,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             log.info("Unloading JAR files for function {}", instanceConfig);
             instanceCache = null;
         }
+
+        if (logAppender != null) {
+            removeLogTopicAppender(LoggerContext.getContext());
+            removeLogTopicAppender(LoggerContext.getContext(false));
+            logAppender.stop();
+            logAppender = null;
+        }
     }
 
     synchronized public String getStatsAsString() throws IOException {
@@ -563,28 +570,37 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(),
                     FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
             logAppender.start();
+            setupLogTopicAppender(LoggerContext.getContext());
         }
     }
 
     private void addLogTopicHandler() {
         if (logAppender == null) return;
-        LoggerContext context = LoggerContext.getContext(false);
+        setupLogTopicAppender(LoggerContext.getContext(false));
+    }
+
+    private void setupLogTopicAppender(LoggerContext context) {
         Configuration config = context.getConfiguration();
         config.addAppender(logAppender);
         for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
             loggerConfig.addAppender(logAppender, null, null);
         }
         config.getRootLogger().addAppender(logAppender, null, null);
+        context.updateLoggers();
     }
 
     private void removeLogTopicHandler() {
         if (logAppender == null) return;
-        LoggerContext context = LoggerContext.getContext(false);
+        removeLogTopicAppender(LoggerContext.getContext(false));
+    }
+
+    private void removeLogTopicAppender(LoggerContext context) {
         Configuration config = context.getConfiguration();
         for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
             loggerConfig.removeAppender(logAppender.getName());
         }
         config.getRootLogger().removeAppender(logAppender.getName());
+        context.updateLoggers();
     }
 
     private void setupInput(ContextImpl contextImpl) throws Exception {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d95d1d3..4e6ff98 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -1972,6 +1972,21 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
     }
 
+    private static void checkPublisherCleanup(String topic) throws Exception {
+        try {
+            ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "stats",
+                    topic);
+            TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class);
+            assertEquals(topicStats.publishers.size(), 0);
+
+        } catch (ContainerExecException e) {
+            fail("Command should have exited with non-zero");
+        }
+    }
+
     private static void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception {
         getFunctionStatus(functionName, numMessages, checkRestarts, 1);
     }
@@ -2623,4 +2638,141 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
     }
 
+    @Test(groups = {"java_function", "function"})
+    public void testJavaLoggingFunction() throws Exception {
+        testLoggingFunction(Runtime.JAVA);
+    }
+
+    private void testLoggingFunction(Runtime runtime) throws Exception {
+        if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
+            // python can only run on process mode
+            return;
+        }
+
+        if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.GO) {
+            // go can only run on process mode
+            return;
+        }
+
+        if (pulsarCluster == null) {
+            super.setupCluster();
+            super.setupFunctionWorkers();
+        }
+        
+        Schema<?> schema;
+        if (Runtime.JAVA == runtime) {
+            schema = Schema.STRING;
+        } else {
+            schema = Schema.BYTES;
+        }
+
+        String inputTopicName = "persistent://public/default/test-log-" + runtime + "-input-" + randomName(8);
+        String logTopicName = "test-log-" + runtime + "-log-topic-" + randomName(8);
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+            admin.topics().createNonPartitionedTopic(inputTopicName);
+            admin.topics().createNonPartitionedTopic(logTopicName);
+        }
+
+        String functionName = "test-logging-fn-" + randomName(8);
+        final int numMessages = 10;
+
+        // submit the exclamation function
+        submitJavaLoggingFunction(
+                inputTopicName, logTopicName, functionName, schema);
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // get function stats
+        getFunctionStatsEmpty(functionName);
+
+        // publish and consume result
+        publishAndConsumeMessages(inputTopicName, logTopicName, numMessages, "-log");
+
+        // get function status
+        getFunctionStatus(functionName, numMessages, true);
+
+        // get function stats
+        getFunctionStats(functionName, numMessages);
+
+        // delete function
+        deleteFunction(functionName);
+
+        // get function info
+        getFunctionInfoNotFound(functionName);
+
+        // make sure subscriptions are cleanup
+        checkSubscriptionsCleanup(inputTopicName);
+        checkPublisherCleanup(logTopicName);
+
+    }
+
+    private static void submitJavaLoggingFunction(String inputTopicName,
+                                                  String logTopicName,
+                                                  String functionName,
+                                                  Schema<?> schema) throws Exception {
+        CommandGenerator generator;
+        log.info("------- INPUT TOPIC: '{}'", inputTopicName);
+        if (inputTopicName.endsWith(".*")) {
+            log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
+            generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, LOGGING_JAVA_CLASS);
+        } else {
+            log.info("----- CREATING REGULAR FUNCTION --- ");
+            generator = CommandGenerator.createDefaultGenerator(inputTopicName, LOGGING_JAVA_CLASS);
+        }
+        generator.setLogTopic(logTopicName);
+        generator.setFunctionName(functionName);
+        String command = generator.generateCreateFunctionCommand();
+
+        log.info("---------- Function command: {}", command);
+        String[] commands = {
+                "sh", "-c", command
+        };
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+                commands);
+        assertTrue(result.getStdout().contains("\"Created successfully\""));
+
+        ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
+    }
+
+    private static void publishAndConsumeMessages(String inputTopic,
+                                                  String outputTopic,
+                                                  int numMessages,
+                                                  String messagePostfix) throws Exception {
+        @Cleanup PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+
+        @Cleanup Consumer<byte[]> consumer = client.newConsumer()
+                .topic(outputTopic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(inputTopic)
+                .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("message-" + i);
+        }
+
+        Set<String> expectedMessages = new HashSet<>();
+        for (int i = 0; i < numMessages; i++) {
+            expectedMessages.add("message-" + i + messagePostfix);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
+            String logMsg = new String(msg.getValue(), UTF_8);
+            log.info("Received: {}", logMsg);
+            assertTrue(expectedMessages.contains(logMsg));
+            expectedMessages.remove(logMsg);
+        }
+
+        consumer.close();
+        producer.close();
+        client.close();
+    }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 94bffb3..2538341 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -106,6 +106,9 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
     public static final String EXCLAMATION_GO_FILE = "exclamationFunc";
     public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc";
 
+    public static final String LOGGING_JAVA_CLASS =
+            "org.apache.pulsar.functions.api.examples.LoggingFunction";
+
     protected static String getExclamationClass(Runtime runtime,
                                                 boolean pyZip,
                                                 boolean extraDeps) {