You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/24 16:14:55 UTC
[pulsar] branch master updated: Allow Python functions logging to
be more configurable (#2832)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 203c8ee Allow Python functions logging to be more configurable (#2832)
203c8ee is described below
commit 203c8ee2b576704803135ddb18c2699322917c36
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Oct 24 11:14:48 2018 -0500
Allow Python functions logging to be more configurable (#2832)
* allow python functions logging to be configured using config files
* adding license headers
* fix log file
* changing config file name
* fix unittests
---
conf/functions-logging/console_logging_config.ini | 41 ++++++++++++++++
conf/functions-logging/logging_config.ini | 41 ++++++++++++++++
pulsar-functions/instance/src/main/python/log.py | 55 +++++++++++-----------
.../src/main/python/python_instance_main.py | 4 +-
.../functions/runtime/KubernetesRuntime.java | 13 ++++-
.../pulsar/functions/runtime/ProcessRuntime.java | 11 ++++-
.../pulsar/functions/runtime/RuntimeUtils.java | 7 ++-
.../functions/runtime/KubernetesRuntimeTest.java | 5 +-
.../functions/runtime/ProcessRuntimeTest.java | 8 ++--
9 files changed, 144 insertions(+), 41 deletions(-)
diff --git a/conf/functions-logging/console_logging_config.ini b/conf/functions-logging/console_logging_config.ini
new file mode 100644
index 0000000..b97bd12
--- /dev/null
+++ b/conf/functions-logging/console_logging_config.ini
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[loggers]
+keys=root
+
+[handlers]
+keys=stream_handler
+
+[formatters]
+keys=formatter
+
+[logger_root]
+level=INFO
+handlers=stream_handler
+
+[handler_stream_handler]
+class=StreamHandler
+level=INFO
+formatter=formatter
+args=(sys.stdout,)
+
+[formatter_formatter]
+format=[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s
+datefmt=%Y-%m-%d %H:%M:%S %z
\ No newline at end of file
diff --git a/conf/functions-logging/logging_config.ini b/conf/functions-logging/logging_config.ini
new file mode 100644
index 0000000..4bb11c3
--- /dev/null
+++ b/conf/functions-logging/logging_config.ini
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[loggers]
+keys=root
+
+[handlers]
+keys=rotating_file_handler
+
+[formatters]
+keys=formatter
+
+[logger_root]
+level=INFO
+handlers=rotating_file_handler
+
+[handler_rotating_file_handler]
+class=log.CreatePathRotatingFileHandler
+level=INFO
+formatter=formatter
+args=(os.getenv("LOG_FILE",""), 5, 10 * 1024 * 1024)
+
+[formatter_formatter]
+format=[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s
+datefmt=%Y-%m-%d %H:%M:%S %z
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/log.py b/pulsar-functions/instance/src/main/python/log.py
index ee596ce..4eb02c8 100644
--- a/pulsar-functions/instance/src/main/python/log.py
+++ b/pulsar-functions/instance/src/main/python/log.py
@@ -22,6 +22,8 @@
''' log.py '''
import logging
+import logging.config
+import logging.handlers
import os
import errno
from logging.handlers import RotatingFileHandler
@@ -52,6 +54,24 @@ class LogTopicHandler(logging.Handler):
msg = self.format(record)
self.producer.send_async(str(msg).encode('utf-8'), None)
+def mkdir_p(path):
+ try:
+ os.makedirs(path, exist_ok=True) # Python>3.2
+ except TypeError:
+ try:
+ os.makedirs(path)
+ except OSError as exc: # Python >2.5
+ if exc.errno == errno.EEXIST and os.path.isdir(path):
+ pass
+ else: raise
+
+# logging handler that is RotatingFileHandler but creates path to log file for you
+# if it doesn't exist
+class CreatePathRotatingFileHandler(logging.handlers.RotatingFileHandler):
+ def __init__(self, filename, mode='a', maxBytes=10 * 1024 * 1024, backupCount=5, encoding=None, delay=0):
+ mkdir_p(os.path.dirname(filename))
+ logging.handlers.RotatingFileHandler.__init__(self, filename, mode=mode, maxBytes=maxBytes, backupCount=backupCount, encoding=encoding, delay=delay)
+
def configure(level=logging.INFO):
""" Configure logger which dumps log on terminal
@@ -84,34 +104,13 @@ def add_handler(stream_handler):
stream_handler.setFormatter(formatter)
Log.addHandler(stream_handler)
-def init_rotating_logger(level, logfile, max_files, max_bytes):
- """Initializes a rotating logger
-
- It also makes sure that any StreamHandler is removed, so as to avoid stdout/stderr
- constipation issues
- """
- # create log directory if necessary
- try:
- os.makedirs(os.path.dirname(logfile))
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise
-
- logging.basicConfig()
-
- root_logger = logging.getLogger()
- log_format = "[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s"
-
- root_logger.setLevel(level)
- handler = RotatingFileHandler(logfile, maxBytes=max_bytes, backupCount=max_files)
- handler.setFormatter(logging.Formatter(fmt=log_format, datefmt=date_format))
- root_logger.addHandler(handler)
-
- for handler in root_logger.handlers:
- root_logger.debug("Associated handlers - " + str(handler))
- if isinstance(handler, logging.StreamHandler):
- root_logger.debug("Removing StreamHandler: " + str(handler))
- root_logger.handlers.remove(handler)
+def init_logger(level, logfile, logging_config_file):
+ global Log
+ # get log file location for function instance
+ os.environ['LOG_FILE'] = logfile;
+ logging.config.fileConfig(logging_config_file)
+ Log = logging.getLogger()
+ Log = logging.LoggerAdapter(Log, {'level': level})
def set_logging_level(cl_args):
"""simply set verbose level based on command-line args
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 1bfc793..2acfd6e 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -70,6 +70,7 @@ def main():
parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples')
parser.add_argument('--logging_directory', required=True, help='Logging Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
+ parser.add_argument('--logging_config_file', required=True, help='Config file for logging')
parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
@@ -102,8 +103,7 @@ def main():
log_file = os.path.join(args.logging_directory,
util.getFullyQualifiedFunctionName(function_details.tenant, function_details.namespace, function_details.name),
"%s-%s.log" % (args.logging_file, args.instance_id))
- log.init_rotating_logger(level=logging.INFO, logfile=log_file,
- max_files=5, max_bytes=10 * 1024 * 1024)
+ log.init_logger(logging.INFO, log_file, args.logging_config_file)
Log.info("Starting Python instance with %s" % str(args))
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index ee7125f..80eb840 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -130,9 +130,18 @@ class KubernetesRuntime implements Runtime {
this.userCodePkgUrl = userCodePkgUrl;
this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
this.pulsarAdminUrl = pulsarAdminUrl;
+ String logConfigFile = null;
+ switch (instanceConfig.getFunctionDetails().getRuntime()) {
+ case JAVA:
+ logConfigFile = "kubernetes_instance_log4j2.yml";
+ break;
+ case PYTHON:
+ logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
+ break;
+ }
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
- authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies,
- pythonDependencyRepository, pythonExtraDependencyRepository);
+ authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, logConfigFile,
+ installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
running = false;
doChecks(instanceConfig.getFunctionDetails());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 1cf0db9..2fa7f82 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -73,9 +73,18 @@ class ProcessRuntime implements Runtime {
this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort();
this.expectedHealthCheckInterval = expectedHealthCheckInterval;
+ String logConfigFile = null;
+ switch (instanceConfig.getFunctionDetails().getRuntime()) {
+ case JAVA:
+ logConfigFile = "java_instance_log4j2.yml";
+ break;
+ case PYTHON:
+ logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
+ break;
+ }
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
- "java_instance_log4j2.yml", false, null, null);
+ logConfigFile, false, null, null);
}
/**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 00a04b5..2647536 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -48,7 +48,7 @@ class RuntimeUtils {
String shardId,
Integer grpcPort,
Long expectedHealthCheckInterval,
- String javaLog4jFileName,
+ String logConfigFile,
Boolean installUserCodeDepdendencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository) throws Exception {
@@ -61,7 +61,7 @@ class RuntimeUtils {
// Keep the same env property pointing to the Java instance file so that it can be picked up
// by the child process and manually added to classpath
args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
- args.add("-Dlog4j.configurationFile=" + javaLog4jFileName);
+ args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + String.format(
"%s/%s",
logDirectory,
@@ -88,6 +88,9 @@ class RuntimeUtils {
args.add(logDirectory);
args.add("--logging_file");
args.add(instanceConfig.getFunctionDetails().getName());
+ // set logging config file
+ args.add("--logging_config_file");
+ args.add(logConfigFile);
// `installUserCodeDependencies` is only valid for python runtime
if (installUserCodeDepdendencies != null && installUserCodeDepdendencies) {
args.add("--install_usercode_dependencies");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index bc2a823..e82c75f 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -145,11 +145,12 @@ public class KubernetesRuntimeTest {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 30);
+ assertEquals(args.size(), 32);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + pulsarRootDir + "/" + userJarFile
+ " --logging_directory " + logDirectory
+ " --logging_file " + config.getFunctionDetails().getName()
+ + " --logging_config_file " + args.get(9)
+ " --install_usercode_dependencies True"
+ " --dependency_repository myrepo"
+ " --extra_dependency_repository anotherrepo"
@@ -158,7 +159,7 @@ public class KubernetesRuntimeTest {
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(27)
+ + " --max_buffered_tuples 1024 --port " + args.get(29)
+ " --expected_healthcheck_interval -1";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index b09b9af..fe492b8 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.runtime;
import static org.testng.Assert.assertEquals;
-import com.google.gson.Gson;
import com.google.protobuf.util.JsonFormat;
import java.util.HashMap;
@@ -140,15 +139,16 @@ public class ProcessRuntimeTest {
ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 24);
+ assertEquals(args.size(), 26);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + userJarFile + " --logging_directory "
- + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
+ + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName()
+ + " --logging_config_file " + args.get(9) + " --instance_id "
+ config.getInstanceId() + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(21)
+ + " --max_buffered_tuples 1024 --port " + args.get(23)
+ " --expected_healthcheck_interval 30";
assertEquals(String.join(" ", args), expectedArgs);
}