You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/24 16:14:55 UTC

[pulsar] branch master updated: Allow Python functions logging to be more configurable (#2832)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 203c8ee  Allow Python functions logging to be more configurable (#2832)
203c8ee is described below

commit 203c8ee2b576704803135ddb18c2699322917c36
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Oct 24 11:14:48 2018 -0500

    Allow Python functions logging to be more configurable (#2832)
    
    * allow python functions logging to be configured using config files
    
    * adding license headers
    
    * fix log file
    
    * changing config file name
    
    * fix unittests
---
 conf/functions-logging/console_logging_config.ini  | 41 ++++++++++++++++
 conf/functions-logging/logging_config.ini          | 41 ++++++++++++++++
 pulsar-functions/instance/src/main/python/log.py   | 55 +++++++++++-----------
 .../src/main/python/python_instance_main.py        |  4 +-
 .../functions/runtime/KubernetesRuntime.java       | 13 ++++-
 .../pulsar/functions/runtime/ProcessRuntime.java   | 11 ++++-
 .../pulsar/functions/runtime/RuntimeUtils.java     |  7 ++-
 .../functions/runtime/KubernetesRuntimeTest.java   |  5 +-
 .../functions/runtime/ProcessRuntimeTest.java      |  8 ++--
 9 files changed, 144 insertions(+), 41 deletions(-)

diff --git a/conf/functions-logging/console_logging_config.ini b/conf/functions-logging/console_logging_config.ini
new file mode 100644
index 0000000..b97bd12
--- /dev/null
+++ b/conf/functions-logging/console_logging_config.ini
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[loggers]
+keys=root
+
+[handlers]
+keys=stream_handler
+
+[formatters]
+keys=formatter
+
+[logger_root]
+level=INFO
+handlers=stream_handler
+
+[handler_stream_handler]
+class=StreamHandler
+level=INFO
+formatter=formatter
+args=(sys.stdout,)
+
+[formatter_formatter]
+format=[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s
+datefmt=%Y-%m-%d %H:%M:%S %z
\ No newline at end of file
diff --git a/conf/functions-logging/logging_config.ini b/conf/functions-logging/logging_config.ini
new file mode 100644
index 0000000..4bb11c3
--- /dev/null
+++ b/conf/functions-logging/logging_config.ini
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[loggers]
+keys=root
+
+[handlers]
+keys=rotating_file_handler
+
+[formatters]
+keys=formatter
+
+[logger_root]
+level=INFO
+handlers=rotating_file_handler
+
+[handler_rotating_file_handler]
+class=log.CreatePathRotatingFileHandler
+level=INFO
+formatter=formatter
+args=(os.getenv("LOG_FILE",""), 5, 10 * 1024 * 1024)
+
+[formatter_formatter]
+format=[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s
+datefmt=%Y-%m-%d %H:%M:%S %z
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/log.py b/pulsar-functions/instance/src/main/python/log.py
index ee596ce..4eb02c8 100644
--- a/pulsar-functions/instance/src/main/python/log.py
+++ b/pulsar-functions/instance/src/main/python/log.py
@@ -22,6 +22,8 @@
 
 ''' log.py '''
 import logging
+import logging.config
+import logging.handlers
 import os
 import errno
 from logging.handlers import RotatingFileHandler
@@ -52,6 +54,24 @@ class LogTopicHandler(logging.Handler):
     msg = self.format(record)
     self.producer.send_async(str(msg).encode('utf-8'), None)
 
+def mkdir_p(path):
+  try:
+    os.makedirs(path, exist_ok=True)  # Python>3.2
+  except TypeError:
+    try:
+      os.makedirs(path)
+    except OSError as exc: # Python >2.5
+      if exc.errno == errno.EEXIST and os.path.isdir(path):
+        pass
+      else: raise
+
+# logging handler that is RotatingFileHandler but creates path to log file for you
+# if it doesn't exist
+class CreatePathRotatingFileHandler(logging.handlers.RotatingFileHandler):
+  def __init__(self, filename, mode='a', maxBytes=10 * 1024 * 1024, backupCount=5, encoding=None, delay=0):
+    mkdir_p(os.path.dirname(filename))
+    logging.handlers.RotatingFileHandler.__init__(self, filename, mode=mode, maxBytes=maxBytes, backupCount=backupCount, encoding=encoding, delay=delay)
+
 def configure(level=logging.INFO):
   """ Configure logger which dumps log on terminal
 
@@ -84,34 +104,13 @@ def add_handler(stream_handler):
   stream_handler.setFormatter(formatter)
   Log.addHandler(stream_handler)
 
-def init_rotating_logger(level, logfile, max_files, max_bytes):
-  """Initializes a rotating logger
-
-  It also makes sure that any StreamHandler is removed, so as to avoid stdout/stderr
-  constipation issues
-  """
-  # create log directory if necessary
-  try:
-    os.makedirs(os.path.dirname(logfile))
-  except OSError as e:
-    if e.errno != errno.EEXIST:
-      raise
-
-  logging.basicConfig()
-
-  root_logger = logging.getLogger()
-  log_format = "[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s"
-
-  root_logger.setLevel(level)
-  handler = RotatingFileHandler(logfile, maxBytes=max_bytes, backupCount=max_files)
-  handler.setFormatter(logging.Formatter(fmt=log_format, datefmt=date_format))
-  root_logger.addHandler(handler)
-
-  for handler in root_logger.handlers:
-    root_logger.debug("Associated handlers - " + str(handler))
-    if isinstance(handler, logging.StreamHandler):
-      root_logger.debug("Removing StreamHandler: " + str(handler))
-      root_logger.handlers.remove(handler)
+def init_logger(level, logfile, logging_config_file):
+  global Log
+  # get log file location for function instance
+  os.environ['LOG_FILE'] = logfile;
+  logging.config.fileConfig(logging_config_file)
+  Log = logging.getLogger()
+  Log = logging.LoggerAdapter(Log, {'level': level})
 
 def set_logging_level(cl_args):
   """simply set verbose level based on command-line args
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 1bfc793..2acfd6e 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -70,6 +70,7 @@ def main():
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples')
   parser.add_argument('--logging_directory', required=True, help='Logging Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
+  parser.add_argument('--logging_config_file', required=True, help='Config file for logging')
   parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
   parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
   parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
@@ -102,8 +103,7 @@ def main():
   log_file = os.path.join(args.logging_directory,
                           util.getFullyQualifiedFunctionName(function_details.tenant, function_details.namespace, function_details.name),
                           "%s-%s.log" % (args.logging_file, args.instance_id))
-  log.init_rotating_logger(level=logging.INFO, logfile=log_file,
-                           max_files=5, max_bytes=10 * 1024 * 1024)
+  log.init_logger(logging.INFO, log_file, args.logging_config_file)
 
   Log.info("Starting Python instance with %s" % str(args))
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index ee7125f..80eb840 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -130,9 +130,18 @@ class KubernetesRuntime implements Runtime {
         this.userCodePkgUrl = userCodePkgUrl;
         this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
+        String logConfigFile = null;
+        switch (instanceConfig.getFunctionDetails().getRuntime()) {
+            case JAVA:
+                logConfigFile = "kubernetes_instance_log4j2.yml";
+                break;
+            case PYTHON:
+                logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
+                break;
+        }
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
-                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies,
-                pythonDependencyRepository, pythonExtraDependencyRepository);
+                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, logConfigFile,
+                installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
         this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 1cf0db9..2fa7f82 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -73,9 +73,18 @@ class ProcessRuntime implements Runtime {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
+        String logConfigFile = null;
+        switch (instanceConfig.getFunctionDetails().getRuntime()) {
+            case JAVA:
+                logConfigFile = "java_instance_log4j2.yml";
+                break;
+            case PYTHON:
+                logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
+                break;
+        }
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
-                "java_instance_log4j2.yml", false, null, null);
+                logConfigFile, false, null, null);
     }
 
     /**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 00a04b5..2647536 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -48,7 +48,7 @@ class RuntimeUtils {
                                            String shardId,
                                            Integer grpcPort,
                                            Long expectedHealthCheckInterval,
-                                           String javaLog4jFileName,
+                                           String logConfigFile,
                                            Boolean installUserCodeDepdendencies,
                                            String pythonDependencyRepository,
                                            String pythonExtraDependencyRepository) throws Exception {
@@ -61,7 +61,7 @@ class RuntimeUtils {
             // Keep the same env property pointing to the Java instance file so that it can be picked up
             // by the child process and manually added to classpath
             args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
-            args.add("-Dlog4j.configurationFile=" + javaLog4jFileName);
+            args.add("-Dlog4j.configurationFile=" + logConfigFile);
             args.add("-Dpulsar.function.log.dir=" + String.format(
                     "%s/%s",
                     logDirectory,
@@ -88,6 +88,9 @@ class RuntimeUtils {
             args.add(logDirectory);
             args.add("--logging_file");
             args.add(instanceConfig.getFunctionDetails().getName());
+            // set logging config file
+            args.add("--logging_config_file");
+            args.add(logConfigFile);
             // `installUserCodeDependencies` is only valid for python runtime
             if (installUserCodeDepdendencies != null && installUserCodeDepdendencies) {
                 args.add("--install_usercode_dependencies");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index bc2a823..e82c75f 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -145,11 +145,12 @@ public class KubernetesRuntimeTest {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 30);
+        assertEquals(args.size(), 32);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + pulsarRootDir + "/" + userJarFile
                 + " --logging_directory " + logDirectory
                 + " --logging_file " + config.getFunctionDetails().getName()
+                + " --logging_config_file " + args.get(9)
                 + " --install_usercode_dependencies True"
                 + " --dependency_repository myrepo"
                 + " --extra_dependency_repository anotherrepo"
@@ -158,7 +159,7 @@ public class KubernetesRuntimeTest {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(27)
+                + " --max_buffered_tuples 1024 --port " + args.get(29)
                 + " --expected_healthcheck_interval -1";
         assertEquals(String.join(" ", args), expectedArgs);
     }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index b09b9af..fe492b8 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.runtime;
 
 import static org.testng.Assert.assertEquals;
 
-import com.google.gson.Gson;
 import com.google.protobuf.util.JsonFormat;
 
 import java.util.HashMap;
@@ -140,15 +139,16 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 24);
+        assertEquals(args.size(), 26);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
-                + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
+                + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName()
+                + " --logging_config_file " + args.get(9) + " --instance_id "
                 + config.getInstanceId() + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(21)
+                + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --expected_healthcheck_interval 30";
         assertEquals(String.join(" ", args), expectedArgs);
     }