You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/03/27 18:58:43 UTC
[incubator-pulsar] branch master updated: Send Log statements to
log Topic for Java Functions (#1447)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 86b9546 Send Log statements to log Topic for Java Functions (#1447)
86b9546 is described below
commit 86b95461602b1f813e41f382eac41a6b2a31ee63
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Mar 27 11:58:41 2018 -0700
Send Log statements to log Topic for Java Functions (#1447)
* Added LogTopic support to Java functions
* UseLogAppender instead of PulsarAppender
* Reverted changes to PulsarAppender
* No need to take dep on pulsar-log4j plugin
* Start the appender
* Fixed comments
* Reverted back logging function change
---
.../functions/instance/JavaInstanceRunnable.java | 38 +++++++
.../pulsar/functions/instance/LogAppender.java | 121 +++++++++++++++++++++
2 files changed, 159 insertions(+)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3d1934c..510608f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -45,6 +45,9 @@ import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.logging.log4j.ThreadContext;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -85,6 +88,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
@Getter(AccessLevel.PACKAGE)
private final Map<String, Consumer> inputConsumers;
private LinkedList<String> inputTopicsToResubscribe = null;
+ private LogAppender logAppender;
// provide tables for storing states
private final String stateStorageServiceUrl;
@@ -188,6 +192,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
startOutputProducer();
// start the input consumer
startInputConsumer();
+ // start any log topic handler
+ setupLogHandler();
return new JavaInstance(instanceConfig, object, clsLoader, client, inputConsumers);
}
@@ -262,10 +268,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
}
long processAt = System.currentTimeMillis();
stats.incrementProcessed(processAt);
+ addLogTopicHandler();
result = javaInstance.handleMessage(
msg.getActualMessage().getMessageId(),
msg.getTopicName(),
input);
+ removeLogTopicHandler();
long doneProcessing = System.currentTimeMillis();
log.debug("Got result: {}", result.getResult());
@@ -755,4 +763,34 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
}
}
}
+
+ private void setupLogHandler() {
+ if (instanceConfig.getFunctionConfig().getLogTopic() != null &&
+ !instanceConfig.getFunctionConfig().getLogTopic().isEmpty()) {
+ logAppender = new LogAppender(client, instanceConfig.getFunctionConfig().getLogTopic(),
+ FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()));
+ logAppender.start();
+ }
+ }
+
+ private void addLogTopicHandler() {
+ if (logAppender == null) return;
+ LoggerContext context = LoggerContext.getContext(false);
+ Configuration config = context.getConfiguration();
+ config.addAppender(logAppender);
+ for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
+ loggerConfig.addAppender(logAppender, null, null);
+ }
+ config.getRootLogger().addAppender(logAppender, null, null);
+ }
+
+ private void removeLogTopicHandler() {
+ if (logAppender == null) return;
+ LoggerContext context = LoggerContext.getContext(false);
+ Configuration config = context.getConfiguration();
+ for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
+ loggerConfig.removeAppender(logAppender.getName());
+ }
+ config.getRootLogger().removeAppender(logAppender.getName());
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
new file mode 100644
index 0000000..3dd71a2
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import org.apache.logging.log4j.core.*;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * LogAppender class that is used to send log statements from Pulsar Functions logger
+ * to a log topic.
+ */
+public class LogAppender implements Appender {
+ private PulsarClient pulsarClient;
+ private String logTopic;
+ private String fqn;
+ private State state;
+ private ErrorHandler errorHandler;
+ private Producer<byte[]> producer;
+
+ public LogAppender(PulsarClient pulsarClient, String logTopic, String fqn) {
+ this.pulsarClient = pulsarClient;
+ this.logTopic = logTopic;
+ this.fqn = fqn;
+ }
+
+ @Override
+ public void append(LogEvent logEvent) {
+ producer.sendAsync(logEvent.getMessage().getFormattedMessage().getBytes());
+ }
+
+ @Override
+ public String getName() {
+ return fqn;
+ }
+
+ @Override
+ public Layout<? extends Serializable> getLayout() {
+ return null;
+ }
+
+ @Override
+ public boolean ignoreExceptions() {
+ return false;
+ }
+
+ @Override
+ public ErrorHandler getHandler() {
+ return errorHandler;
+ }
+
+ @Override
+ public void setHandler(ErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public void initialize() {
+ this.state = State.INITIALIZED;
+ }
+
+ @Override
+ public void start() {
+ this.state = State.STARTING;
+ try {
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+ .topic(logTopic)
+ .producerName(fqn)
+ .blockIfQueueFull(false)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
+ producer = producerBuilder.create();
+ } catch (Exception e) {
+ throw new RuntimeException("Error starting LogTopic Producer", e);
+ }
+ this.state = State.STARTED;
+ }
+
+ @Override
+ public void stop() {
+ this.state = State.STOPPING;
+ producer.closeAsync();
+ producer = null;
+ this.state = State.STOPPED;
+ }
+
+ @Override
+ public boolean isStarted() {
+ return state == State.STARTED;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return state == State.STOPPED;
+ }
+}
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.