You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/02 21:07:23 UTC

[GitHub] sijie closed pull request #1316: Introduce a pulsar log4j2 appender

sijie closed pull request #1316: Introduce a pulsar log4j2 appender
URL: https://github.com/apache/incubator-pulsar/pull/1316
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index ada1a7127..c8131e68e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,7 @@ flexible messaging model and an intuitive client API.</description>
     <module>all</module>
     <module>docker</module>
     <module>tests</module>
+    <module>pulsar-log4j2-appender</module>
   </modules>
 
   <issueManagement>
@@ -126,6 +127,9 @@ flexible messaging model and an intuitive client API.</description>
     <jackson.version>2.8.4</jackson.version>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
     <dockerfile-maven.version>1.3.7</dockerfile-maven.version>
+
+    <!-- test dependencies -->
+    <disruptor.version>3.4.0</disruptor.version>
   </properties>
 
   <dependencyManagement>
@@ -362,6 +366,19 @@ flexible messaging model and an intuitive client API.</description>
         <version>${log4j2.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-api</artifactId>
+        <type>test-jar</type>
+        <version>${log4j2.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-core</artifactId>
+        <type>test-jar</type>
+        <version>${log4j2.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>commons-io</groupId>
         <artifactId>commons-io</artifactId>
@@ -599,6 +616,14 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>bcpkix-jdk15on</artifactId>
         <version>${bouncycastle.version}</version>
       </dependency>
+
+      <!-- test dependencies -->
+      <dependency>
+        <groupId>com.lmax</groupId>
+        <artifactId>disruptor</artifactId>
+        <version>${disruptor.version}</version>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>
 
diff --git a/pulsar-log4j2-appender/pom.xml b/pulsar-log4j2-appender/pom.xml
new file mode 100644
index 000000000..93524575b
--- /dev/null
+++ b/pulsar-log4j2-appender/pom.xml
@@ -0,0 +1,74 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.pulsar</groupId>
+		<artifactId>pulsar</artifactId>
+		<version>2.0.0-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>pulsar-log4j2-appender</artifactId>
+	<name>Pulsar Log4j2 Appender</name>
+	<description>Pulsar Log4j2 Appender</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>${project.groupId}</groupId>
+			<artifactId>pulsar-client</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-slf4j-impl</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-api</artifactId>
+			<type>test-jar</type>
+            <scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<!-- Required for AsyncLoggers -->
+		<dependency>
+			<groupId>com.lmax</groupId>
+			<artifactId>disruptor</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+</project>
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
new file mode 100644
index 000000000..68bfbd51c
--- /dev/null
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.core.AbstractLifeCycle;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.layout.SerializedLayout;
+
+/**
+ * The PulsarAppender logs events to an Apache Pulsar topic.
+ *
+ * <p>Each log event is sent as a Pulsar record.
+ */
+@Plugin(
+    name = "Pulsar",
+    category = Node.CATEGORY,
+    elementType = Appender.ELEMENT_TYPE,
+    printObject = true)
+public final class PulsarAppender extends AbstractAppender {
+
+    /**
+     * Builds PulsarAppender instances.
+     * @param <B> The type to build
+     */
+    public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
+            implements org.apache.logging.log4j.core.util.Builder<PulsarAppender> {
+
+        @PluginAttribute("key")
+        private String key;
+
+        @PluginAttribute("topic")
+        private String topic;
+
+        @PluginAttribute("serviceUrl")
+        private String serviceUrl;
+
+        @PluginAttribute(value = "avoidRecursive", defaultBoolean = true)
+        private boolean avoidRecursive;
+
+        @PluginAttribute(value = "syncSend", defaultBoolean = false)
+        private boolean syncSend;
+
+        @PluginElement("Properties")
+        private Property[] properties;
+
+        @SuppressWarnings("resource")
+        @Override
+        public PulsarAppender build() {
+            final Layout<? extends Serializable> layout = getLayout();
+            if (layout == null) {
+                AbstractLifeCycle.LOGGER.error("No layout provided for PulsarAppender");
+                return null;
+            }
+            PulsarManager manager = new PulsarManager(
+                getConfiguration().getLoggerContext(),
+                getName(),
+                serviceUrl,
+                topic,
+                syncSend,
+                properties,
+                key);
+            return new PulsarAppender(
+                getName(),
+                layout,
+                getFilter(),
+                isIgnoreExceptions(),
+                avoidRecursive,
+                manager);
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public boolean isSyncSend() {
+            return syncSend;
+        }
+
+        public Property[] getProperties() {
+            return properties;
+        }
+
+        public B setTopic(final String topic) {
+            this.topic = topic;
+            return asBuilder();
+        }
+
+        public B setSyncSend(final boolean syncSend) {
+            this.syncSend = syncSend;
+            return asBuilder();
+        }
+
+        public B setProperties(final Property[] properties) {
+            this.properties = properties;
+            return asBuilder();
+        }
+    }
+
+    /**
+     * Creates a builder for a PulsarAppender.
+     * @return a builder for a PulsarAppender.
+     */
+    @PluginBuilderFactory
+    public static <B extends Builder<B>> B newBuilder() {
+        return new Builder<B>().asBuilder();
+    }
+
+    private final boolean avoidRecursive;
+    private final PulsarManager manager;
+
+    private PulsarAppender(
+            final String name,
+            final Layout<? extends Serializable> layout,
+            final Filter filter,
+            final boolean ignoreExceptions,
+            final boolean avoidRecursive,
+            final PulsarManager manager) {
+        super(name, filter, layout, ignoreExceptions);
+        this.avoidRecursive = avoidRecursive;
+        this.manager = Objects.requireNonNull(manager, "manager");
+    }
+
+    @Override
+    public void append(final LogEvent event) {
+        if (avoidRecursive
+            && event.getLoggerName() != null
+            && event.getLoggerName().startsWith("org.apache.pulsar")) {
+            LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
+        } else {
+            try {
+                tryAppend(event);
+            } catch (final Exception e) {
+                error("Unable to write to Pulsar in appender [" + getName() + "]", event, e);
+            }
+        }
+    }
+
+    private void tryAppend(final LogEvent event) {
+        final Layout<? extends Serializable> layout = getLayout();
+        byte[] data;
+        if (layout instanceof SerializedLayout) {
+            final byte[] header = layout.getHeader();
+            final byte[] body = layout.toByteArray(event);
+            data = new byte[header.length + body.length];
+            System.arraycopy(header, 0, data, 0, header.length);
+            System.arraycopy(body, 0, data, header.length, body.length);
+        } else {
+            data = layout.toByteArray(event);
+        }
+        manager.send(data);
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        try {
+            manager.startup();
+        } catch (Exception e) {
+            // fail to start the manager
+        }
+    }
+
+    @Override
+    public boolean stop(final long timeout, final TimeUnit timeUnit) {
+        setStopping();
+        boolean stopped = super.stop(timeout, timeUnit, false);
+        stopped &= manager.stop(timeout, timeUnit);
+        setStopped();
+        return stopped;
+    }
+
+    @Override
+    public String toString() {
+        return "PulsarAppender{" +
+            "name=" + getName() +
+            ", state=" + getState() +
+            ", serviceUrl=" + manager.getServiceUrl() +
+            ", topic=" + manager.getTopic() +
+            '}';
+    }
+
+}
diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
new file mode 100644
index 000000000..c52faa6cd
--- /dev/null
+++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class PulsarManager extends AbstractManager {
+
+    static Supplier<ClientBuilder> PULSAR_CLIENT_BUILDER = () -> PulsarClient.builder();
+
+    static BiFunction<String, byte[], Message<byte[]>> MESSAGE_BUILDER = (key, data) -> {
+        MessageBuilder<byte[]> messageBuilder = MessageBuilder.create()
+            .setContent(data);
+        if (null != key) {
+            messageBuilder = messageBuilder.setKey(key);
+        }
+        return messageBuilder.build();
+    };
+
+    private PulsarClient client;
+    private Producer<byte[]> producer;
+
+    private final String serviceUrl;
+    private final String topic;
+    private final String key;
+    private final boolean syncSend;
+
+    public PulsarManager(final LoggerContext loggerContext,
+                         final String name,
+                         final String serviceUrl,
+                         final String topic,
+                         final boolean syncSend,
+                         final Property[] properties,
+                         final String key) {
+        super(loggerContext, name);
+        this.serviceUrl = Objects.requireNonNull(serviceUrl, "serviceUrl");
+        this.topic = Objects.requireNonNull(topic, "topic");
+        this.syncSend = syncSend;
+        this.key = key;
+    }
+
+    @Override
+    public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+        if (producer != null) {
+            try {
+                producer.closeAsync().get(timeout, timeUnit);
+            } catch (Exception e) {
+                // exceptions on closing
+                LOGGER.warn("Failed to close producer within {} milliseconds",
+                    timeUnit.toMillis(timeout), e);
+            }
+        }
+        return true;
+    }
+
+    public void send(final byte[] msg)  {
+        if (producer != null) {
+            String newKey = null;
+
+            if(key != null && key.contains("${")) {
+                newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key);
+            } else if (key != null) {
+                newKey = key;
+            }
+
+            Message<byte[]> message = MESSAGE_BUILDER.apply(newKey, msg);
+            if (syncSend) {
+                try {
+                    producer.send(message);
+                } catch (PulsarClientException e) {
+                    LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", e);
+                }
+            } else {
+                producer.sendAsync(message)
+                    .exceptionally(cause -> {
+                        LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", cause);
+                        return null;
+                    });
+            }
+        }
+    }
+
+    public void startup() throws Exception {
+        try {
+            client = PULSAR_CLIENT_BUILDER.get()
+                .serviceUrl(serviceUrl)
+                .build();
+            ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+                .topic(topic)
+                .producerName("pulsar-log4j2-appender-" + topic)
+                .blockIfQueueFull(false);
+            if (syncSend) {
+                // disable batching for sync send
+                producerBuilder = producerBuilder.enableBatching(false);
+            } else {
+                // enable batching in 10 ms for async send
+                producerBuilder = producerBuilder
+                    .enableBatching(true)
+                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
+            }
+            producer = producerBuilder.create();
+        } catch (Exception t) {
+            LOGGER.error("Failed to start pulsar manager {}", t);
+            throw t;
+        }
+    }
+
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+}
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
new file mode 100644
index 000000000..4431243db
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PulsarAppenderTest {
+
+    private static final String LOG_MESSAGE = "Hello, world!";
+    private static final String TOPIC_NAME = "pulsar-topic";
+
+    private static Log4jLogEvent createLogEvent() {
+        return Log4jLogEvent.newBuilder()
+            .setLoggerName(PulsarAppenderTest.class.getName())
+            .setLoggerFqcn(PulsarAppenderTest.class.getName())
+            .setLevel(Level.INFO)
+            .setMessage(new SimpleMessage(LOG_MESSAGE))
+            .build();
+    }
+
+    private ClientBuilderImpl clientBuilder;
+    private PulsarClient client;
+    private Producer<byte[]> producer;
+    private List<Message<byte[]>> history;
+
+    private LoggerContext ctx;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        history = new LinkedList<>();
+
+        client = mock(PulsarClient.class);
+        producer = mock(Producer.class);
+        clientBuilder = mock(ClientBuilderImpl.class);
+
+        doReturn(client).when(clientBuilder).build();
+        doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+
+        ProducerBuilder<byte[]> producerBuilder = mock(ProducerBuilder.class);
+        when(client.newProducer()).thenReturn(producerBuilder);
+        doReturn(producerBuilder).when(producerBuilder).topic(anyString());
+        doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
+        doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
+        doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyInt(), any(TimeUnit.class));
+        doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
+        doReturn(producer).when(producerBuilder).create();
+
+        when(producer.send(any(Message.class)))
+            .thenAnswer(invocationOnMock -> {
+                Message<byte[]> msg = invocationOnMock.getArgumentAt(0, Message.class);
+                synchronized (history) {
+                    history.add(msg);
+                }
+                return null;
+            });
+
+        when(producer.sendAsync(any(Message.class)))
+            .thenAnswer(invocationOnMock -> {
+                Message<byte[]> msg = invocationOnMock.getArgumentAt(0, Message.class);
+                synchronized (history) {
+                    history.add(msg);
+                }
+                CompletableFuture<MessageId> future = new CompletableFuture<>();
+                future.complete(mock(MessageId.class));
+                return future;
+            });
+
+        PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder;
+        PulsarManager.MESSAGE_BUILDER = (key, data) -> {
+            Message<byte[]> msg = mock(Message.class);
+            when(msg.getKey()).thenReturn(key);
+            when(msg.getData()).thenReturn(data);
+            return msg;
+        };
+
+        ctx = Configurator.initialize(
+            "PulsarAppenderTest",
+            getClass().getClassLoader(),
+            getClass().getClassLoader().getResource("PulsarAppenderTest.xml").toURI());
+    }
+
+    @Test
+    public void testAppendWithLayout() throws Exception {
+        final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithLayout");
+        appender.append(createLogEvent());
+        final Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        assertNull(item.getKey());
+        assertEquals("[" + LOG_MESSAGE + "]", new String(item.getData(), StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testAppendWithSerializedLayout() throws Exception {
+        final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithSerializedLayout");
+        final LogEvent logEvent = createLogEvent();
+        appender.append(logEvent);
+        final Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        assertNull(item.getKey());
+        assertEquals(LOG_MESSAGE, deserializeLogEvent(item.getData()).getMessage().getFormattedMessage());
+    }
+
+    @Test
+    public void testAsyncAppend() throws Exception {
+        final Appender appender = ctx.getConfiguration().getAppender("AsyncPulsarAppender");
+        appender.append(createLogEvent());
+        final Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        assertNull(item.getKey());
+        assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testAppendWithKey() throws Exception {
+        final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithKey");
+        final LogEvent logEvent = createLogEvent();
+        appender.append(logEvent);
+        Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        String msgKey = item.getKey();
+        assertEquals(msgKey, "key");
+        assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testAppendWithKeyLookup() throws Exception {
+        final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithKeyLookup");
+        final LogEvent logEvent = createLogEvent();
+        Date date = new Date();
+        SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy");
+        appender.append(logEvent);
+        Message<byte[]> item;
+        synchronized (history) {
+            assertEquals(1, history.size());
+            item = history.get(0);
+        }
+        assertNotNull(item);
+        String keyValue = format.format(date);
+        assertEquals(item.getKey(), keyValue);
+        assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8));
+    }
+
+    private LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException {
+        final ByteArrayInputStream bis = new ByteArrayInputStream(data);
+        try (ObjectInput ois = new ObjectInputStream(bis)) {
+            return (LogEvent) ois.readObject();
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
new file mode 100644
index 000000000..a23474105
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.LifeCycle;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.config.CustomLevelConfig;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.apache.logging.log4j.core.filter.ThresholdFilter;
+import org.apache.logging.log4j.core.layout.GelfLayout;
+import org.apache.logging.log4j.core.util.Constants;
+import org.apache.pulsar.log4j2.appender.PulsarAppender;
+import org.testng.annotations.Test;
+
+public class ConfigurationAssemblerTest {
+
+    @Test
+    public void testBuildConfiguration() throws Exception {
+        try {
+            System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR,
+                    "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+            final ConfigurationBuilder<BuiltConfiguration> builder = ConfigurationBuilderFactory
+                    .newConfigurationBuilder();
+            CustomConfigurationFactory.addTestFixtures("config name", builder);
+            final Configuration configuration = builder.build();
+            try (LoggerContext ctx = Configurator.initialize(configuration)) {
+                validate(configuration);
+            }
+        } finally {
+            System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR);
+        }
+    }
+
+    @Test
+    public void testCustomConfigurationFactory() throws Exception {
+        try {
+            System.setProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY,
+                    "org.apache.pulsar.log4j2.appender.builder.CustomConfigurationFactory");
+            System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR,
+                    "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+            final Configuration config = ((LoggerContext) LogManager.getContext(false)).getConfiguration();
+            validate(config);
+        } finally {
+            System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR);
+            System.getProperties().remove(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY);
+        }
+    }
+
+    private void validate(final Configuration config) {
+        assertNotNull(config);
+        assertNotNull(config.getName());
+        assertFalse(config.getName().isEmpty());
+        assertNotNull(config, "No configuration created");
+        assertEquals("Incorrect State: " + config.getState(), config.getState(), LifeCycle.State.STARTED);
+        final Map<String, Appender> appenders = config.getAppenders();
+        assertNotNull(appenders);
+        assertTrue("Incorrect number of Appenders: " + appenders.size(), appenders.size() == 2);
+        final PulsarAppender pulsarAppender = (PulsarAppender) appenders.get("Pulsar");
+        final GelfLayout gelfLayout = (GelfLayout) pulsarAppender.getLayout();
+        final Map<String, LoggerConfig> loggers = config.getLoggers();
+        assertNotNull(loggers);
+        assertTrue("Incorrect number of LoggerConfigs: " + loggers.size(), loggers.size() == 2);
+        final LoggerConfig rootLoggerConfig = loggers.get("");
+        assertEquals(Level.ERROR, rootLoggerConfig.getLevel());
+        assertFalse(rootLoggerConfig.isIncludeLocation());
+        final LoggerConfig loggerConfig = loggers.get("org.apache.logging.log4j");
+        assertEquals(Level.DEBUG, loggerConfig.getLevel());
+        assertTrue(loggerConfig.isIncludeLocation());
+        final Filter filter = config.getFilter();
+        assertNotNull(filter, "No Filter");
+        assertTrue("Not a Threshold Filter", filter instanceof ThresholdFilter);
+        final List<CustomLevelConfig> customLevels = config.getCustomLevels();
+        assertNotNull(filter, "No CustomLevels");
+        assertEquals(1, customLevels.size());
+        final CustomLevelConfig customLevel = customLevels.get(0);
+        assertEquals("Panic", customLevel.getLevelName());
+        assertEquals(17, customLevel.getIntLevel());
+        final Logger logger = LogManager.getLogger(getClass());
+        logger.info("Welcome to Log4j!");
+    }
+}
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
new file mode 100644
index 000000000..7de8a3fc6
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.appender.ConsoleAppender;
+import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.testng.annotations.Test;
+
+public class ConfigurationBuilderTest {
+
+    private static final String INDENT = "  ";
+    private static final String EOL = System.lineSeparator();
+
+    private void addTestFixtures(final String name, final ConfigurationBuilder<BuiltConfiguration> builder) {
+        builder.setConfigurationName(name);
+        builder.setStatusLevel(Level.ERROR);
+        builder.setShutdownTimeout(5000, TimeUnit.MILLISECONDS);
+        builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true));
+        builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, Filter.Result.NEUTRAL)
+                .addAttribute("level", Level.DEBUG));
+
+        final AppenderComponentBuilder appenderBuilder = builder.newAppender("Stdout", "CONSOLE").addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT);
+        appenderBuilder.add(builder.newLayout("PatternLayout").
+                addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable"));
+        appenderBuilder.add(builder.newFilter("MarkerFilter", Filter.Result.DENY,
+                Filter.Result.NEUTRAL).addAttribute("marker", "FLOW"));
+        builder.add(appenderBuilder);
+
+        final AppenderComponentBuilder appenderBuilder2 = builder.newAppender("Pulsar", "Pulsar")
+            .addAttribute("serviceUrl", "pulsar://localhost:6650")
+            .addAttribute("topic", "my-topic");
+        appenderBuilder2.add(builder.newLayout("GelfLayout").
+            addAttribute("host", "my-host").
+            addComponent(builder.newKeyValuePair("extraField", "extraValue")));
+        builder.add(appenderBuilder2);
+
+        builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, true).
+                    add(builder.newAppenderRef("Stdout")).
+                    addAttribute("additivity", false));
+        builder.add(builder.newLogger("org.apache.logging.log4j.core", Level.DEBUG).
+                    add(builder.newAppenderRef("Stdout")));
+        builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout")));
+
+        builder.addProperty("MyKey", "MyValue");
+        builder.add(builder.newCustomLevel("Panic", 17));
+        builder.setPackages("foo,bar");
+    }
+
+    private final static String expectedXml =
+            "<?xml version=\"1.0\" ?>" + EOL +
+            "<Configuration name=\"config name\" status=\"ERROR\" packages=\"foo,bar\" shutdownTimeout=\"5000\">" + EOL +
+                INDENT + "<Properties>" + EOL +
+                INDENT + INDENT + "<Property name=\"MyKey\">MyValue</Property>" + EOL +
+                INDENT + "</Properties>" + EOL +
+                INDENT + "<Scripts>" + EOL +
+                INDENT + INDENT + "<ScriptFile name=\"target/test-classes/scripts/filter.groovy\" path=\"target/test-classes/scripts/filter.groovy\" isWatched=\"true\"/>" + EOL +
+                INDENT + "</Scripts>" + EOL +
+                INDENT + "<CustomLevels>" + EOL +
+                INDENT + INDENT + "<CustomLevel name=\"Panic\" intLevel=\"17\"/>" + EOL +
+                INDENT + "</CustomLevels>" + EOL +
+                INDENT + "<ThresholdFilter onMatch=\"ACCEPT\" onMisMatch=\"NEUTRAL\" level=\"DEBUG\"/>" + EOL +
+                INDENT + "<Appenders>" + EOL +
+                INDENT + INDENT + "<CONSOLE name=\"Stdout\" target=\"SYSTEM_OUT\">" + EOL +
+                INDENT + INDENT + INDENT + "<PatternLayout pattern=\"%d [%t] %-5level: %msg%n%throwable\"/>" + EOL +
+                INDENT + INDENT + INDENT + "<MarkerFilter onMatch=\"DENY\" onMisMatch=\"NEUTRAL\" marker=\"FLOW\"/>" + EOL +
+                INDENT + INDENT + "</CONSOLE>" + EOL +
+                INDENT + INDENT + "<Pulsar name=\"Pulsar\" serviceUrl=\"pulsar://localhost:6650\" topic=\"my-topic\">" + EOL +
+                INDENT + INDENT + INDENT + "<GelfLayout host=\"my-host\">" + EOL +
+                INDENT + INDENT + INDENT + INDENT + "<KeyValuePair key=\"extraField\" value=\"extraValue\"/>" + EOL +
+                INDENT + INDENT + INDENT + "</GelfLayout>" + EOL +
+                INDENT + INDENT + "</Pulsar>" + EOL +
+                INDENT + "</Appenders>" + EOL +
+                INDENT + "<Loggers>" + EOL +
+                INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j\" level=\"DEBUG\" includeLocation=\"true\" additivity=\"false\">" + EOL +
+                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
+                INDENT + INDENT + "</Logger>" + EOL +
+                INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j.core\" level=\"DEBUG\">" + EOL +
+                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
+                INDENT + INDENT + "</Logger>" + EOL +
+                INDENT + INDENT + "<Root level=\"ERROR\">" + EOL +
+                INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL +
+                INDENT + INDENT + "</Root>" + EOL +
+                INDENT + "</Loggers>" + EOL +
+            "</Configuration>" + EOL;
+
+    // TODO make test run properly on Windows
+    @Test
+    public void testXmlConstructing() throws Exception {
+        //assumeTrue(System.lineSeparator().length() == 1); // Only run test on platforms with single character line endings (such as Linux), not on Windows
+        final ConfigurationBuilder<BuiltConfiguration> builder = ConfigurationBuilderFactory.newConfigurationBuilder();
+        addTestFixtures("config name", builder);
+        final String xmlConfiguration = builder.toXmlConfiguration();
+        assertEquals(expectedXml, xmlConfiguration);
+    }
+
+}
diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
new file mode 100644
index 000000000..98b26d000
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.log4j2.appender.builder;
+
+import java.net.URI;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.ConsoleAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.ConfigurationSource;
+import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+
+/**
+ * Normally this would be a plugin. However, we don't want it used for everything so it will be defined
+ * via a system property.
+ */
+//@Plugin(name = "CustomConfigurationFactory", category = ConfigurationFactory.CATEGORY)
+//@Order(50)
+public class CustomConfigurationFactory extends ConfigurationFactory {
+
+    static Configuration addTestFixtures(final String name, final ConfigurationBuilder<BuiltConfiguration> builder) {
+        builder.setConfigurationName(name);
+        builder.setStatusLevel(Level.ERROR);
+        builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true));
+        builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, Filter.Result.NEUTRAL)
+                .addAttribute("level", Level.DEBUG));
+
+        final AppenderComponentBuilder appenderBuilder = builder.newAppender("Stdout", "CONSOLE").addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT);
+        appenderBuilder.add(builder.newLayout("PatternLayout").
+                addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable"));
+        appenderBuilder.add(builder.newFilter("MarkerFilter", Filter.Result.DENY,
+                Filter.Result.NEUTRAL).addAttribute("marker", "FLOW"));
+        builder.add(appenderBuilder);
+
+        final AppenderComponentBuilder appenderBuilder2 = builder.newAppender("Pulsar", "Pulsar")
+            .addAttribute("serviceUrl", "pulsar://localhost:6650")
+            .addAttribute("topic", "my-topic");
+        appenderBuilder2.add(builder.newLayout("GelfLayout").
+            addAttribute("host", "my-host").
+            addComponent(builder.newKeyValuePair("extraField", "extraValue")));
+        builder.add(appenderBuilder2);
+
+        builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, true).
+                    add(builder.newAppenderRef("Stdout")).
+                    addAttribute("additivity", false));
+        builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout")));
+
+        builder.add(builder.newCustomLevel("Panic", 17));
+
+        return builder.build();
+    }
+
+    @Override
+    public Configuration getConfiguration(final LoggerContext loggerContext, final ConfigurationSource source) {
+        return getConfiguration(loggerContext, source.toString(), null);
+    }
+
+    @Override
+    public Configuration getConfiguration(final LoggerContext loggerContext, final String name, final URI configLocation) {
+        final ConfigurationBuilder<BuiltConfiguration> builder = newConfigurationBuilder();
+        return addTestFixtures(name, builder);
+    }
+
+    @Override
+    protected String[] getSupportedTypes() {
+        return new String[] {"*"};
+    }
+}
diff --git a/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
new file mode 100644
index 000000000..a5743fb37
--- /dev/null
+++ b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration name="PulsarAppenderTest" status="OFF">
+  <Appenders>
+    <Pulsar name="PulsarAppenderWithLayout" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false">
+      <PatternLayout pattern="[%m]"/>
+    </Pulsar>
+    <Pulsar name="PulsarAppenderWithSerializedLayout" serviceUrl="pulsar://localhost:6650"  topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false">
+      <SerializedLayout/>
+    </Pulsar>
+    <Pulsar name="AsyncPulsarAppender" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false">
+      <PatternLayout pattern="%m"/>
+      <Property name="syncSend">false</Property>
+    </Pulsar>
+    <Pulsar name="PulsarAppenderWithKey" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" key="key" avoidRecursive="false">
+      <PatternLayout pattern="%m"/>
+    </Pulsar>
+    <Pulsar name="PulsarAppenderWithKeyLookup" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" key="$${date:dd-MM-yyyy}" avoidRecursive="false">
+      <PatternLayout pattern="%m"/>
+    </Pulsar>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="PulsarAppenderWithLayout"/>
+      <AppenderRef ref="PulsarAppenderWithSerializedLayout"/>
+      <AppenderRef ref="AsyncPulsarAppender"/>
+      <AppenderRef ref="PulsarAppenderWithKey"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services