You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/30 08:54:36 UTC
[pulsar] 03/04: [fix #7814] fix java function logging appender not
added to java function logger (#9299)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3b5dd9727de19d21470f000d8ab817a79d3e6602
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Jan 29 14:09:59 2021 +0800
[fix #7814] fix java function logging appender not added to java function logger (#9299)
Fixes #7814
`JavaInstanceRunnable` create an instance of logger named `"function-" + instanceConfig.getFunctionDetails().getName()` and pass it to `Function Context`, and the logger can be used to send user defined content to function's log-topic if `--log-topic` defined.
as issue #7814 mentioned, the logger is not working as expected since user cannot consume any self defined log content from `log-topic`.
this happens in process runtime with created functions, but not noticed with other situation such as `localrun` function.
Through debug to the created function, the logger in `Function Context` is different from the logger in `JavaInstanceRunnable`, such as the `contextName` as images shown below. In addition, the `LogAppender` set in `JavaInstanceRunnable` is not shown in `Function Context`'s logger as well.
![Imgur](https://i.imgur.com/39DMH6R.png)
^^^^ from JavaInstanceRunnable
![img](https://i.imgur.com/UDw5Lzt.png)
^^^^ from Function Context
After some tests, I find out that when get `LoggerContext` by `LoggerContext.getContext()`, the context's logAppender can be take effect to `Function Context`, and the `Function Context`'s logger works great.
Add `LogAppender` to the single context from `LoggerContext.getContext()`.
(cherry picked from commit 81f1bed626fc750c68ab6740d8a80a6b2821b542)
---
.../functions/instance/JavaInstanceRunnable.java | 22 ++-
.../integration/functions/PulsarFunctionsTest.java | 152 +++++++++++++++++++++
.../functions/PulsarFunctionsTestBase.java | 3 +
3 files changed, 174 insertions(+), 3 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fa68656..4d8c862 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -219,7 +219,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
ContextImpl setupContext() {
- Logger instanceLog = LoggerFactory.getLogger(
+ Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager);
@@ -464,6 +464,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
log.info("Unloading JAR files for function {}", instanceConfig);
instanceCache = null;
}
+
+ if (logAppender != null) {
+ removeLogTopicAppender(LoggerContext.getContext());
+ removeLogTopicAppender(LoggerContext.getContext(false));
+ logAppender.stop();
+ logAppender = null;
+ }
}
synchronized public String getStatsAsString() throws IOException {
@@ -563,28 +570,37 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(),
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
logAppender.start();
+ setupLogTopicAppender(LoggerContext.getContext());
}
}
private void addLogTopicHandler() {
if (logAppender == null) return;
- LoggerContext context = LoggerContext.getContext(false);
+ setupLogTopicAppender(LoggerContext.getContext(false));
+ }
+
+ private void setupLogTopicAppender(LoggerContext context) {
Configuration config = context.getConfiguration();
config.addAppender(logAppender);
for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
loggerConfig.addAppender(logAppender, null, null);
}
config.getRootLogger().addAppender(logAppender, null, null);
+ context.updateLoggers();
}
private void removeLogTopicHandler() {
if (logAppender == null) return;
- LoggerContext context = LoggerContext.getContext(false);
+ removeLogTopicAppender(LoggerContext.getContext(false));
+ }
+
+ private void removeLogTopicAppender(LoggerContext context) {
Configuration config = context.getConfiguration();
for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
loggerConfig.removeAppender(logAppender.getName());
}
config.getRootLogger().removeAppender(logAppender.getName());
+ context.updateLoggers();
}
private void setupInput(ContextImpl contextImpl) throws Exception {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d95d1d3..4e6ff98 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -1972,6 +1972,21 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
}
+ private static void checkPublisherCleanup(String topic) throws Exception {
+ try {
+ ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "stats",
+ topic);
+ TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class);
+ assertEquals(topicStats.publishers.size(), 0);
+
+ } catch (ContainerExecException e) {
+ fail("Command should have exited with non-zero");
+ }
+ }
+
private static void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception {
getFunctionStatus(functionName, numMessages, checkRestarts, 1);
}
@@ -2623,4 +2638,141 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
}
+ @Test(groups = {"java_function", "function"})
+ public void testJavaLoggingFunction() throws Exception {
+ testLoggingFunction(Runtime.JAVA);
+ }
+
+ private void testLoggingFunction(Runtime runtime) throws Exception {
+ if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
+ // python can only run on process mode
+ return;
+ }
+
+ if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.GO) {
+ // go can only run on process mode
+ return;
+ }
+
+ if (pulsarCluster == null) {
+ super.setupCluster();
+ super.setupFunctionWorkers();
+ }
+
+ Schema<?> schema;
+ if (Runtime.JAVA == runtime) {
+ schema = Schema.STRING;
+ } else {
+ schema = Schema.BYTES;
+ }
+
+ String inputTopicName = "persistent://public/default/test-log-" + runtime + "-input-" + randomName(8);
+ String logTopicName = "test-log-" + runtime + "-log-topic-" + randomName(8);
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+ admin.topics().createNonPartitionedTopic(inputTopicName);
+ admin.topics().createNonPartitionedTopic(logTopicName);
+ }
+
+ String functionName = "test-logging-fn-" + randomName(8);
+ final int numMessages = 10;
+
+ // submit the exclamation function
+ submitJavaLoggingFunction(
+ inputTopicName, logTopicName, functionName, schema);
+
+ // get function info
+ getFunctionInfoSuccess(functionName);
+
+ // get function stats
+ getFunctionStatsEmpty(functionName);
+
+ // publish and consume result
+ publishAndConsumeMessages(inputTopicName, logTopicName, numMessages, "-log");
+
+ // get function status
+ getFunctionStatus(functionName, numMessages, true);
+
+ // get function stats
+ getFunctionStats(functionName, numMessages);
+
+ // delete function
+ deleteFunction(functionName);
+
+ // get function info
+ getFunctionInfoNotFound(functionName);
+
+ // make sure subscriptions are cleanup
+ checkSubscriptionsCleanup(inputTopicName);
+ checkPublisherCleanup(logTopicName);
+
+ }
+
+ private static void submitJavaLoggingFunction(String inputTopicName,
+ String logTopicName,
+ String functionName,
+ Schema<?> schema) throws Exception {
+ CommandGenerator generator;
+ log.info("------- INPUT TOPIC: '{}'", inputTopicName);
+ if (inputTopicName.endsWith(".*")) {
+ log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
+ generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, LOGGING_JAVA_CLASS);
+ } else {
+ log.info("----- CREATING REGULAR FUNCTION --- ");
+ generator = CommandGenerator.createDefaultGenerator(inputTopicName, LOGGING_JAVA_CLASS);
+ }
+ generator.setLogTopic(logTopicName);
+ generator.setFunctionName(functionName);
+ String command = generator.generateCreateFunctionCommand();
+
+ log.info("---------- Function command: {}", command);
+ String[] commands = {
+ "sh", "-c", command
+ };
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ commands);
+ assertTrue(result.getStdout().contains("\"Created successfully\""));
+
+ ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
+ }
+
+ private static void publishAndConsumeMessages(String inputTopic,
+ String outputTopic,
+ int numMessages,
+ String messagePostfix) throws Exception {
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup Consumer<byte[]> consumer = client.newConsumer()
+ .topic(outputTopic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopic)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("message-" + i);
+ }
+
+ Set<String> expectedMessages = new HashSet<>();
+ for (int i = 0; i < numMessages; i++) {
+ expectedMessages.add("message-" + i + messagePostfix);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
+ String logMsg = new String(msg.getValue(), UTF_8);
+ log.info("Received: {}", logMsg);
+ assertTrue(expectedMessages.contains(logMsg));
+ expectedMessages.remove(logMsg);
+ }
+
+ consumer.close();
+ producer.close();
+ client.close();
+ }
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 94bffb3..2538341 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -106,6 +106,9 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
public static final String EXCLAMATION_GO_FILE = "exclamationFunc";
public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc";
+ public static final String LOGGING_JAVA_CLASS =
+ "org.apache.pulsar.functions.api.examples.LoggingFunction";
+
protected static String getExclamationClass(Runtime runtime,
boolean pyZip,
boolean extraDeps) {