You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/03/02 21:07:23 UTC
[incubator-pulsar] branch master updated: Introduce a pulsar log4j2
appender (#1316)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 973fa3a Introduce a pulsar log4j2 appender (#1316)
973fa3a is described below
commit 973fa3ac35394e73ca7a950afac4f908633ff23c
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Fri Mar 2 13:07:21 2018 -0800
Introduce a pulsar log4j2 appender (#1316)
### Motivation
when developing `pulsar-functions`, it would be good to log the log messages generated by functions to pulsar, so the user can easily tail the log messages topic to get all the messages per functions.
### Modifications
This PR is to add a pulsar based appender.
### Result
We are able to use pulsar log4j2 appender to log messages.
---
pom.xml | 25 +++
pulsar-log4j2-appender/pom.xml | 74 +++++++
.../pulsar/log4j2/appender/PulsarAppender.java | 211 ++++++++++++++++++++
.../pulsar/log4j2/appender/PulsarManager.java | 145 ++++++++++++++
.../pulsar/log4j2/appender/PulsarAppenderTest.java | 218 +++++++++++++++++++++
.../builder/ConfigurationAssemblerTest.java | 115 +++++++++++
.../appender/builder/ConfigurationBuilderTest.java | 120 ++++++++++++
.../builder/CustomConfigurationFactory.java | 88 +++++++++
.../src/test/resources/PulsarAppenderTest.xml | 49 +++++
9 files changed, 1045 insertions(+)
diff --git a/pom.xml b/pom.xml
index ada1a71..c8131e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,7 @@ flexible messaging model and an intuitive client API.</description>
<module>all</module>
<module>docker</module>
<module>tests</module>
+ <module>pulsar-log4j2-appender</module>
</modules>
<issueManagement>
@@ -126,6 +127,9 @@ flexible messaging model and an intuitive client API.</description>
<jackson.version>2.8.4</jackson.version>
<puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
<dockerfile-maven.version>1.3.7</dockerfile-maven.version>
+
+ <!-- test dependencies -->
+ <disruptor.version>3.4.0</disruptor.version>
</properties>
<dependencyManagement>
@@ -363,6 +367,19 @@ flexible messaging model and an intuitive client API.</description>
</dependency>
<dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <type>test-jar</type>
+ <version>${log4j2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <type>test-jar</type>
+ <version>${log4j2.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
@@ -599,6 +616,14 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>bcpkix-jdk15on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
diff --git a/pulsar-log4j2-appender/pom.xml b/pulsar-log4j2-appender/pom.xml
new file mode 100644
index 0000000..9352457
--- /dev/null
+++ b/pulsar-log4j2-appender/pom.xml
@@ -0,0 +1,74 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>2.0.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-log4j2-appender</artifactId>
+ <name>Pulsar Log4j2 Appender</name>
+ <description>Pulsar Log4j2 Appender</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <!-- Required for AsyncLoggers -->
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
new file mode 100644
index 0000000..68bfbd5
--- /dev/null
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.core.AbstractLifeCycle;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.layout.SerializedLayout;
+
+/**
+ * The PulsarAppender logs events to an Apache Pulsar topic.
+ *
+ * <p>Each log event is sent as a Pulsar record.
+ */
+@Plugin(
+ name = "Pulsar",
+ category = Node.CATEGORY,
+ elementType = Appender.ELEMENT_TYPE,
+ printObject = true)
+public final class PulsarAppender extends AbstractAppender {
+
+ /**
+ * Builds PulsarAppender instances.
+ * @param <B> The type to build
+ */
+ public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
+ implements org.apache.logging.log4j.core.util.Builder<PulsarAppender> {
+
+ @PluginAttribute("key")
+ private String key;
+
+ @PluginAttribute("topic")
+ private String topic;
+
+ @PluginAttribute("serviceUrl")
+ private String serviceUrl;
+
+ @PluginAttribute(value = "avoidRecursive", defaultBoolean = true)
+ private boolean avoidRecursive;
+
+ @PluginAttribute(value = "syncSend", defaultBoolean = false)
+ private boolean syncSend;
+
+ @PluginElement("Properties")
+ private Property[] properties;
+
+ @SuppressWarnings("resource")
+ @Override
+ public PulsarAppender build() {
+ final Layout<? extends Serializable> layout = getLayout();
+ if (layout == null) {
+ AbstractLifeCycle.LOGGER.error("No layout provided for PulsarAppender");
+ return null;
+ }
+ PulsarManager manager = new PulsarManager(
+ getConfiguration().getLoggerContext(),
+ getName(),
+ serviceUrl,
+ topic,
+ syncSend,
+ properties,
+ key);
+ return new PulsarAppender(
+ getName(),
+ layout,
+ getFilter(),
+ isIgnoreExceptions(),
+ avoidRecursive,
+ manager);
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public boolean isSyncSend() {
+ return syncSend;
+ }
+
+ public Property[] getProperties() {
+ return properties;
+ }
+
+ public B setTopic(final String topic) {
+ this.topic = topic;
+ return asBuilder();
+ }
+
+ public B setSyncSend(final boolean syncSend) {
+ this.syncSend = syncSend;
+ return asBuilder();
+ }
+
+ public B setProperties(final Property[] properties) {
+ this.properties = properties;
+ return asBuilder();
+ }
+ }
+
+ /**
+ * Creates a builder for a PulsarAppender.
+ * @return a builder for a PulsarAppender.
+ */
+ @PluginBuilderFactory
+ public static <B extends Builder<B>> B newBuilder() {
+ return new Builder<B>().asBuilder();
+ }
+
+ private final boolean avoidRecursive;
+ private final PulsarManager manager;
+
+ private PulsarAppender(
+ final String name,
+ final Layout<? extends Serializable> layout,
+ final Filter filter,
+ final boolean ignoreExceptions,
+ final boolean avoidRecursive,
+ final PulsarManager manager) {
+ super(name, filter, layout, ignoreExceptions);
+ this.avoidRecursive = avoidRecursive;
+ this.manager = Objects.requireNonNull(manager, "manager");
+ }
+
+ @Override
+ public void append(final LogEvent event) {
+ if (avoidRecursive
+ && event.getLoggerName() != null
+ && event.getLoggerName().startsWith("org.apache.pulsar")) {
+ LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
+ } else {
+ try {
+ tryAppend(event);
+ } catch (final Exception e) {
+ error("Unable to write to Pulsar in appender [" + getName() + "]", event, e);
+ }
+ }
+ }
+
+ private void tryAppend(final LogEvent event) {
+ final Layout<? extends Serializable> layout = getLayout();
+ byte[] data;
+ if (layout instanceof SerializedLayout) {
+ final byte[] header = layout.getHeader();
+ final byte[] body = layout.toByteArray(event);
+ data = new byte[header.length + body.length];
+ System.arraycopy(header, 0, data, 0, header.length);
+ System.arraycopy(body, 0, data, header.length, body.length);
+ } else {
+ data = layout.toByteArray(event);
+ }
+ manager.send(data);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ try {
+ manager.startup();
+ } catch (Exception e) {
+ // fail to start the manager
+ }
+ }
+
+ @Override
+ public boolean stop(final long timeout, final TimeUnit timeUnit) {
+ setStopping();
+ boolean stopped = super.stop(timeout, timeUnit, false);
+ stopped &= manager.stop(timeout, timeUnit);
+ setStopped();
+ return stopped;
+ }
+
+ @Override
+ public String toString() {
+ return "PulsarAppender{" +
+ "name=" + getName() +
+ ", state=" + getState() +
+ ", serviceUrl=" + manager.getServiceUrl() +
+ ", topic=" + manager.getTopic() +
+ '}';
+ }
+
+}
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
new file mode 100644
index 0000000..c52faa6
--- /dev/null
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class PulsarManager extends AbstractManager {
+
+ static Supplier<ClientBuilder> PULSAR_CLIENT_BUILDER = () -> PulsarClient.builder();
+
+ static BiFunction<String, byte[], Message<byte[]>> MESSAGE_BUILDER = (key, data) -> {
+ MessageBuilder<byte[]> messageBuilder = MessageBuilder.create()
+ .setContent(data);
+ if (null != key) {
+ messageBuilder = messageBuilder.setKey(key);
+ }
+ return messageBuilder.build();
+ };
+
+ private PulsarClient client;
+ private Producer<byte[]> producer;
+
+ private final String serviceUrl;
+ private final String topic;
+ private final String key;
+ private final boolean syncSend;
+
+ public PulsarManager(final LoggerContext loggerContext,
+ final String name,
+ final String serviceUrl,
+ final String topic,
+ final boolean syncSend,
+ final Property[] properties,
+ final String key) {
+ super(loggerContext, name);
+ this.serviceUrl = Objects.requireNonNull(serviceUrl, "serviceUrl");
+ this.topic = Objects.requireNonNull(topic, "topic");
+ this.syncSend = syncSend;
+ this.key = key;
+ }
+
+ @Override
+ public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+ if (producer != null) {
+ try {
+ producer.closeAsync().get(timeout, timeUnit);
+ } catch (Exception e) {
+ // exceptions on closing
+ LOGGER.warn("Failed to close producer within {} milliseconds",
+ timeUnit.toMillis(timeout), e);
+ }
+ }
+ return true;
+ }
+
+ public void send(final byte[] msg) {
+ if (producer != null) {
+ String newKey = null;
+
+ if(key != null && key.contains("${")) {
+ newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key);
+ } else if (key != null) {
+ newKey = key;
+ }
+
+ Message<byte[]> message = MESSAGE_BUILDER.apply(newKey, msg);
+ if (syncSend) {
+ try {
+ producer.send(message);
+ } catch (PulsarClientException e) {
+ LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", e);
+ }
+ } else {
+ producer.sendAsync(message)
+ .exceptionally(cause -> {
+ LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", cause);
+ return null;
+ });
+ }
+ }
+ }
+
+ public void startup() throws Exception {
+ try {
+ client = PULSAR_CLIENT_BUILDER.get()
+ .serviceUrl(serviceUrl)
+ .build();
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+ .topic(topic)
+ .producerName("pulsar-log4j2-appender-" + topic)
+ .blockIfQueueFull(false);
+ if (syncSend) {
+ // disable batching for sync send
+ producerBuilder = producerBuilder.enableBatching(false);
+ } else {
+ // enable batching in 10 ms for async send
+ producerBuilder = producerBuilder
+ .enableBatching(true)
+ .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
+ }
+ producer = producerBuilder.create();
+ } catch (Exception t) {
+ LOGGER.error("Failed to start pulsar manager {}", t);
+ throw t;
+ }
+ }
+
+ public String getServiceUrl() {
+ return serviceUrl;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+}
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
new file mode 100644
index 0000000..4431243
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PulsarAppenderTest {
+
+ private static final String LOG_MESSAGE = "Hello, world!";
+ private static final String TOPIC_NAME = "pulsar-topic";
+
+ private static Log4jLogEvent createLogEvent() {
+ return Log4jLogEvent.newBuilder()
+ .setLoggerName(PulsarAppenderTest.class.getName())
+ .setLoggerFqcn(PulsarAppenderTest.class.getName())
+ .setLevel(Level.INFO)
+ .setMessage(new SimpleMessage(LOG_MESSAGE))
+ .build();
+ }
+
+ private ClientBuilderImpl clientBuilder;
+ private PulsarClient client;
+ private Producer<byte[]> producer;
+ private List<Message<byte[]>> history;
+
+ private LoggerContext ctx;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ history = new LinkedList<>();
+
+ client = mock(PulsarClient.class);
+ producer = mock(Producer.class);
+ clientBuilder = mock(ClientBuilderImpl.class);
+
+ doReturn(client).when(clientBuilder).build();
+ doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+
+ ProducerBuilder<byte[]> producerBuilder = mock(ProducerBuilder.class);
+ when(client.newProducer()).thenReturn(producerBuilder);
+ doReturn(producerBuilder).when(producerBuilder).topic(anyString());
+ doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
+ doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
+ doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyInt(), any(TimeUnit.class));
+ doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
+ doReturn(producer).when(producerBuilder).create();
+
+ when(producer.send(any(Message.class)))
+ .thenAnswer(invocationOnMock -> {
+ Message<byte[]> msg = invocationOnMock.getArgumentAt(0, Message.class);
+ synchronized (history) {
+ history.add(msg);
+ }
+ return null;
+ });
+
+ when(producer.sendAsync(any(Message.class)))
+ .thenAnswer(invocationOnMock -> {
+ Message<byte[]> msg = invocationOnMock.getArgumentAt(0, Message.class);
+ synchronized (history) {
+ history.add(msg);
+ }
+ CompletableFuture<MessageId> future = new CompletableFuture<>();
+ future.complete(mock(MessageId.class));
+ return future;
+ });
+
+ PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
+ PulsarManager.MESSAGE_BUILDER = (key, data) -> {
+ Message<byte[]> msg = mock(Message.class);
+ when(msg.getKey()).thenReturn(key);
+ when(msg.getData()).thenReturn(data);
+ return msg;
+ };
+
+ ctx = Configurator.initialize(
+ "PulsarAppenderTest",
+ getClass().getClassLoader(),
+ getClass().getClassLoader().getResource("PulsarAppenderTest.xml").toURI());
+ }
+
+ @Test
+ public void testAppendWithLayout() throws Exception {
+ final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithLayout");
+ appender.append(createLogEvent());
+ final Message<byte[]> item;
+ synchronized (history) {
+ assertEquals(1, history.size());
+ item = history.get(0);
+ }
+ assertNotNull(item);
+ assertNull(item.getKey());
+ assertEquals("[" + LOG_MESSAGE + "]", new String(item.getData(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testAppendWithSerializedLayout() throws Exception {
+ final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithSerializedLayout");
+ final LogEvent logEvent = createLogEvent();
+ appender.append(logEvent);
+ final Message<byte[]> item;
+ synchronized (history) {
+ assertEquals(1, history.size());
+ item = history.get(0);
+ }
+ assertNotNull(item);
+ assertNull(item.getKey());
+ assertEquals(LOG_MESSAGE, deserializeLogEvent(item.getData()).getMessage().getFormattedMessage());
+ }
+
+ @Test
+ public void testAsyncAppend() throws Exception {
+ final Appender appender = ctx.getConfiguration().getAppender("AsyncPulsarAppender");
+ appender.append(createLogEvent());
+ final Message<byte[]> item;
+ synchronized (history) {
+ assertEquals(1, history.size());
+ item = history.get(0);
+ }
+ assertNotNull(item);
+ assertNull(item.getKey());
+ assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testAppendWithKey() throws Exception {
+ final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithKey");
+ final LogEvent logEvent = createLogEvent();
+ appender.append(logEvent);
+ Message<byte[]> item;
+ synchronized (history) {
+ assertEquals(1, history.size());
+ item = history.get(0);
+ }
+ assertNotNull(item);
+ String msgKey = item.getKey();
+ assertEquals(msgKey, "key");
+ assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testAppendWithKeyLookup() throws Exception {
+ final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithKeyLookup");
+ final LogEvent logEvent = createLogEvent();
+ Date date = new Date();
+ SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy");
+ appender.append(logEvent);
+ Message<byte[]> item;
+ synchronized (history) {
+ assertEquals(1, history.size());
+ item = history.get(0);
+ }
+ assertNotNull(item);
+ String keyValue = format.format(date);
+ assertEquals(item.getKey(), keyValue);
+ assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8));
+ }
+
+ private LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException {
+ final ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ try (ObjectInput ois = new ObjectInputStream(bis)) {
+ return (LogEvent) ois.readObject();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
new file mode 100644
index 0000000..a234741
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.LifeCycle;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.config.CustomLevelConfig;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.apache.logging.log4j.core.filter.ThresholdFilter;
+import org.apache.logging.log4j.core.layout.GelfLayout;
+import org.apache.logging.log4j.core.util.Constants;
+import org.apache.pulsar.log4j2.appender.PulsarAppender;
+import org.testng.annotations.Test;
+
+public class ConfigurationAssemblerTest {
+
+ @Test
+ public void testBuildConfiguration() throws Exception {
+ try {
+ System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR,
+ "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+ final ConfigurationBuilder<BuiltConfiguration> builder = ConfigurationBuilderFactory
+ .newConfigurationBuilder();
+ CustomConfigurationFactory.addTestFixtures("config name", builder);
+ final Configuration configuration = builder.build();
+ try (LoggerContext ctx = Configurator.initialize(configuration)) {
+ validate(configuration);
+ }
+ } finally {
+ System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR);
+ }
+ }
+
+ @Test
+ public void testCustomConfigurationFactory() throws Exception {
+ try {
+ System.setProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY,
+ "org.apache.pulsar.log4j2.appender.builder.CustomConfigurationFactory");
+ System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR,
+ "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+ final Configuration config = ((LoggerContext) LogManager.getContext(false)).getConfiguration();
+ validate(config);
+ } finally {
+ System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR);
+ System.getProperties().remove(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY);
+ }
+ }
+
+ private void validate(final Configuration config) {
+ assertNotNull(config);
+ assertNotNull(config.getName());
+ assertFalse(config.getName().isEmpty());
+ assertNotNull(config, "No configuration created");
+ assertEquals("Incorrect State: " + config.getState(), config.getState(), LifeCycle.State.STARTED);
+ final Map<String, Appender> appenders = config.getAppenders();
+ assertNotNull(appenders);
+ assertTrue("Incorrect number of Appenders: " + appenders.size(), appenders.size() == 2);
+ final PulsarAppender pulsarAppender = (PulsarAppender) appenders.get("Pulsar");
+ final GelfLayout gelfLayout = (GelfLayout) pulsarAppender.getLayout();
+ final Map<String, LoggerConfig> loggers = config.getLoggers();
+ assertNotNull(loggers);
+ assertTrue("Incorrect number of LoggerConfigs: " + loggers.size(), loggers.size() == 2);
+ final LoggerConfig rootLoggerConfig = loggers.get("");
+ assertEquals(Level.ERROR, rootLoggerConfig.getLevel());
+ assertFalse(rootLoggerConfig.isIncludeLocation());
+ final LoggerConfig loggerConfig = loggers.get("org.apache.logging.log4j");
+ assertEquals(Level.DEBUG, loggerConfig.getLevel());
+ assertTrue(loggerConfig.isIncludeLocation());
+ final Filter filter = config.getFilter();
+ assertNotNull(filter, "No Filter");
+ assertTrue("Not a Threshold Filter", filter instanceof ThresholdFilter);
+ final List<CustomLevelConfig> customLevels = config.getCustomLevels();
+ assertNotNull(filter, "No CustomLevels");
+ assertEquals(1, customLevels.size());
+ final CustomLevelConfig customLevel = customLevels.get(0);
+ assertEquals("Panic", customLevel.getLevelName());
+ assertEquals(17, customLevel.getIntLevel());
+ final Logger logger = LogManager.getLogger(getClass());
+ logger.info("Welcome to Log4j!");
+ }
+}
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
new file mode 100644
index 0000000..7de8a3f
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.appender.ConsoleAppender;
+import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.testng.annotations.Test;
+
+public class ConfigurationBuilderTest {
+
+ private static final String INDENT = " ";
+ private static final String EOL = System.lineSeparator();
+
+ private void addTestFixtures(final String name, final ConfigurationBuilder<BuiltConfiguration> builder) {
+ builder.setConfigurationName(name);
+ builder.setStatusLevel(Level.ERROR);
+ builder.setShutdownTimeout(5000, TimeUnit.MILLISECONDS);
+ builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true));
+ builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, Filter.Result.NEUTRAL)
+ .addAttribute("level", Level.DEBUG));
+
+ final AppenderComponentBuilder appenderBuilder = builder.newAppender("Stdout", "CONSOLE").addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT);
+ appenderBuilder.add(builder.newLayout("PatternLayout").
+ addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable"));
+ appenderBuilder.add(builder.newFilter("MarkerFilter", Filter.Result.DENY,
+ Filter.Result.NEUTRAL).addAttribute("marker", "FLOW"));
+ builder.add(appenderBuilder);
+
+ final AppenderComponentBuilder appenderBuilder2 = builder.newAppender("Pulsar", "Pulsar")
+ .addAttribute("serviceUrl", "pulsar://localhost:6650")
+ .addAttribute("topic", "my-topic");
+ appenderBuilder2.add(builder.newLayout("GelfLayout").
+ addAttribute("host", "my-host").
+ addComponent(builder.newKeyValuePair("extraField", "extraValue")));
+ builder.add(appenderBuilder2);
+
+ builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, true).
+ add(builder.newAppenderRef("Stdout")).
+ addAttribute("additivity", false));
+ builder.add(builder.newLogger("org.apache.logging.log4j.core", Level.DEBUG).
+ add(builder.newAppenderRef("Stdout")));
+ builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout")));
+
+ builder.addProperty("MyKey", "MyValue");
+ builder.add(builder.newCustomLevel("Panic", 17));
+ builder.setPackages("foo,bar");
+ }
+
+ private final static String expectedXml =
+ "<?xml version=\"1.0\" ?>" + EOL +
+ "<Configuration name=\"config name\" status=\"ERROR\" packages=\"foo,bar\" shutdownTimeout=\"5000\">" + EOL +
+ INDENT + "<Properties>" + EOL +
+ INDENT + INDENT + "<Property name=\"MyKey\">MyValue</Property>" + EOL +
+ INDENT + "</Properties>" + EOL +
+ INDENT + "<Scripts>" + EOL +
+ INDENT + INDENT + "<ScriptFile name=\"target/test-classes/scripts/filter.groovy\" path=\"target/test-classes/scripts/filter.groovy\" isWatched=\"true\"/>" + EOL +
+ INDENT + "</Scripts>" + EOL +
+ INDENT + "<CustomLevels>" + EOL +
+ INDENT + INDENT + "<CustomLevel name=\"Panic\" intLevel=\"17\"/>" + EOL +
+ INDENT + "</CustomLevels>" + EOL +
+ INDENT + "<ThresholdFilter onMatch=\"ACCEPT\" onMisMatch=\"NEUTRAL\" level=\"DEBUG\"/>" + EOL +
+ INDENT + "<Appenders>" + EOL +
+ INDENT + INDENT + "<CONSOLE name=\"Stdout\" target=\"SYSTEM_OUT\">" + EOL +
+ INDENT + INDENT + INDENT + "<PatternLayout pattern=\"%d [%t] %-5level: %msg%n%throwable\"/>" + EOL +
+ INDENT + INDENT + INDENT + "<MarkerFilter onMatch=\"DENY\" onMisMatch=\"NEUTRAL\" marker=\"FLOW\"/>" + EOL +
+ INDENT + INDENT + "</CONSOLE>" + EOL +
+ INDENT + INDENT + "<Pulsar name=\"Pulsar\" serviceUrl=\"pulsar://localhost:6650\" topic=\"my-topic\">" + EOL +
+ INDENT + INDENT + INDENT + "<GelfLayout host=\"my-host\">" + EOL +
+ INDENT + INDENT + INDENT + INDENT + "<KeyValuePair key=\"extraField\" value=\"extraValue\"/>" + EOL +
+ INDENT + INDENT + INDENT + "</GelfLayout>" + EOL +
+ INDENT + INDENT + "</Pulsar>" + EOL +
+ INDENT + "</Appenders>" + EOL +
+ INDENT + "<Loggers>" + EOL +
+ INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j\" level=\"DEBUG\" includeLocation=\"true\" additivity=\"false\">" + EOL +
+ INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
+ INDENT + INDENT + "</Logger>" + EOL +
+ INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j.core\" level=\"DEBUG\">" + EOL +
+ INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
+ INDENT + INDENT + "</Logger>" + EOL +
+ INDENT + INDENT + "<Root level=\"ERROR\">" + EOL +
+ INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
+ INDENT + INDENT + "</Root>" + EOL +
+ INDENT + "</Loggers>" + EOL +
+ "</Configuration>" + EOL;
+
+ // TODO make test run properly on Windows
+ @Test
+ public void testXmlConstructing() throws Exception {
+ //assumeTrue(System.lineSeparator().length() == 1); // Only run test on platforms with single character line endings (such as Linux), not on Windows
+ final ConfigurationBuilder<BuiltConfiguration> builder = ConfigurationBuilderFactory.newConfigurationBuilder();
+ addTestFixtures("config name", builder);
+ final String xmlConfiguration = builder.toXmlConfiguration();
+ assertEquals(expectedXml, xmlConfiguration);
+ }
+
+}
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
new file mode 100644
index 0000000..98b26d0
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import java.net.URI;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.ConsoleAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.ConfigurationSource;
+import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+
+/**
+ * Normally this would be a plugin. However, we don't want it used for everything so it will be defined
+ * via a system property.
+ */
+//@Plugin(name = "CustomConfigurationFactory", category = ConfigurationFactory.CATEGORY)
+//@Order(50)
+public class CustomConfigurationFactory extends ConfigurationFactory {
+
+ static Configuration addTestFixtures(final String name, final ConfigurationBuilder<BuiltConfiguration> builder) {
+ builder.setConfigurationName(name);
+ builder.setStatusLevel(Level.ERROR);
+ builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true));
+ builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, Filter.Result.NEUTRAL)
+ .addAttribute("level", Level.DEBUG));
+
+ final AppenderComponentBuilder appenderBuilder = builder.newAppender("Stdout", "CONSOLE").addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT);
+ appenderBuilder.add(builder.newLayout("PatternLayout").
+ addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable"));
+ appenderBuilder.add(builder.newFilter("MarkerFilter", Filter.Result.DENY,
+ Filter.Result.NEUTRAL).addAttribute("marker", "FLOW"));
+ builder.add(appenderBuilder);
+
+ final AppenderComponentBuilder appenderBuilder2 = builder.newAppender("Pulsar", "Pulsar")
+ .addAttribute("serviceUrl", "pulsar://localhost:6650")
+ .addAttribute("topic", "my-topic");
+ appenderBuilder2.add(builder.newLayout("GelfLayout").
+ addAttribute("host", "my-host").
+ addComponent(builder.newKeyValuePair("extraField", "extraValue")));
+ builder.add(appenderBuilder2);
+
+ builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, true).
+ add(builder.newAppenderRef("Stdout")).
+ addAttribute("additivity", false));
+ builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout")));
+
+ builder.add(builder.newCustomLevel("Panic", 17));
+
+ return builder.build();
+ }
+
+ @Override
+ public Configuration getConfiguration(final LoggerContext loggerContext, final ConfigurationSource source) {
+ return getConfiguration(loggerContext, source.toString(), null);
+ }
+
+ @Override
+ public Configuration getConfiguration(final LoggerContext loggerContext, final String name, final URI configLocation) {
+ final ConfigurationBuilder<BuiltConfiguration> builder = newConfigurationBuilder();
+ return addTestFixtures(name, builder);
+ }
+
+ @Override
+ protected String[] getSupportedTypes() {
+ return new String[] {"*"};
+ }
+}
diff --git a/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
new file mode 100644
index 0000000..a5743fb
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<Configuration name="PulsarAppenderTest" status="OFF">
+ <Appenders>
+ <Pulsar name="PulsarAppenderWithLayout" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false">
+ <PatternLayout pattern="[%m]"/>
+ </Pulsar>
+ <Pulsar name="PulsarAppenderWithSerializedLayout" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false">
+ <SerializedLayout/>
+ </Pulsar>
+ <Pulsar name="AsyncPulsarAppender" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false">
+ <PatternLayout pattern="%m"/>
+ <Property name="syncSend">false</Property>
+ </Pulsar>
+ <Pulsar name="PulsarAppenderWithKey" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" key="key" avoidRecursive="false">
+ <PatternLayout pattern="%m"/>
+ </Pulsar>
+ <Pulsar name="PulsarAppenderWithKeyLookup" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" key="$${date:dd-MM-yyyy}" avoidRecursive="false">
+ <PatternLayout pattern="%m"/>
+ </Pulsar>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="PulsarAppenderWithLayout"/>
+ <AppenderRef ref="PulsarAppenderWithSerializedLayout"/>
+ <AppenderRef ref="AsyncPulsarAppender"/>
+ <AppenderRef ref="PulsarAppenderWithKey"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.