You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/24 16:14:50 UTC

[GitHub] merlimat closed pull request #2832: Allow Python functions logging to be more configurable

merlimat closed pull request #2832: Allow Python functions logging to be more configurable
URL: https://github.com/apache/pulsar/pull/2832
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions-logging/console_logging_config.ini b/conf/functions-logging/console_logging_config.ini
new file mode 100644
index 0000000000..b97bd12290
--- /dev/null
+++ b/conf/functions-logging/console_logging_config.ini
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[loggers]
+keys=root
+
+[handlers]
+keys=stream_handler
+
+[formatters]
+keys=formatter
+
+[logger_root]
+level=INFO
+handlers=stream_handler
+
+[handler_stream_handler]
+class=StreamHandler
+level=INFO
+formatter=formatter
+args=(sys.stdout,)
+
+[formatter_formatter]
+format=[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s
+datefmt=%Y-%m-%d %H:%M:%S %z
\ No newline at end of file
diff --git a/conf/functions-logging/logging_config.ini b/conf/functions-logging/logging_config.ini
new file mode 100644
index 0000000000..4bb11c3dcb
--- /dev/null
+++ b/conf/functions-logging/logging_config.ini
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[loggers]
+keys=root
+
+[handlers]
+keys=rotating_file_handler
+
+[formatters]
+keys=formatter
+
+[logger_root]
+level=INFO
+handlers=rotating_file_handler
+
+[handler_rotating_file_handler]
+class=log.CreatePathRotatingFileHandler
+level=INFO
+formatter=formatter
+args=(os.getenv("LOG_FILE",""), 5, 10 * 1024 * 1024)
+
+[formatter_formatter]
+format=[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s
+datefmt=%Y-%m-%d %H:%M:%S %z
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/python/log.py b/pulsar-functions/instance/src/main/python/log.py
index ee596ce271..4eb02c8791 100644
--- a/pulsar-functions/instance/src/main/python/log.py
+++ b/pulsar-functions/instance/src/main/python/log.py
@@ -22,6 +22,8 @@
 
 ''' log.py '''
 import logging
+import logging.config
+import logging.handlers
 import os
 import errno
 from logging.handlers import RotatingFileHandler
@@ -52,6 +54,24 @@ def emit(self, record):
     msg = self.format(record)
     self.producer.send_async(str(msg).encode('utf-8'), None)
 
+def mkdir_p(path):
+  try:
+    os.makedirs(path, exist_ok=True)  # Python>3.2
+  except TypeError:
+    try:
+      os.makedirs(path)
+    except OSError as exc: # Python >2.5
+      if exc.errno == errno.EEXIST and os.path.isdir(path):
+        pass
+      else: raise
+
+# logging handler that is RotatingFileHandler but creates path to log file for you
+# if it doesn't exist
+class CreatePathRotatingFileHandler(logging.handlers.RotatingFileHandler):
+  def __init__(self, filename, mode='a', maxBytes=10 * 1024 * 1024, backupCount=5, encoding=None, delay=0):
+    mkdir_p(os.path.dirname(filename))
+    logging.handlers.RotatingFileHandler.__init__(self, filename, mode=mode, maxBytes=maxBytes, backupCount=backupCount, encoding=encoding, delay=delay)
+
 def configure(level=logging.INFO):
   """ Configure logger which dumps log on terminal
 
@@ -84,34 +104,13 @@ def add_handler(stream_handler):
   stream_handler.setFormatter(formatter)
   Log.addHandler(stream_handler)
 
-def init_rotating_logger(level, logfile, max_files, max_bytes):
-  """Initializes a rotating logger
-
-  It also makes sure that any StreamHandler is removed, so as to avoid stdout/stderr
-  constipation issues
-  """
-  # create log directory if necessary
-  try:
-    os.makedirs(os.path.dirname(logfile))
-  except OSError as e:
-    if e.errno != errno.EEXIST:
-      raise
-
-  logging.basicConfig()
-
-  root_logger = logging.getLogger()
-  log_format = "[%(asctime)s] [%(levelname)s] %(filename)s: %(message)s"
-
-  root_logger.setLevel(level)
-  handler = RotatingFileHandler(logfile, maxBytes=max_bytes, backupCount=max_files)
-  handler.setFormatter(logging.Formatter(fmt=log_format, datefmt=date_format))
-  root_logger.addHandler(handler)
-
-  for handler in root_logger.handlers:
-    root_logger.debug("Associated handlers - " + str(handler))
-    if isinstance(handler, logging.StreamHandler):
-      root_logger.debug("Removing StreamHandler: " + str(handler))
-      root_logger.handlers.remove(handler)
+def init_logger(level, logfile, logging_config_file):
+  global Log
+  # get log file location for function instance
+  os.environ['LOG_FILE'] = logfile;
+  logging.config.fileConfig(logging_config_file)
+  Log = logging.getLogger()
+  Log = logging.LoggerAdapter(Log, {'level': level})
 
 def set_logging_level(cl_args):
   """simply set verbose level based on command-line args
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 1bfc7936e5..2acfd6e662 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -70,6 +70,7 @@ def main():
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples')
   parser.add_argument('--logging_directory', required=True, help='Logging Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
+  parser.add_argument('--logging_config_file', required=True, help='Config file for logging')
   parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
   parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
   parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
@@ -102,8 +103,7 @@ def main():
   log_file = os.path.join(args.logging_directory,
                           util.getFullyQualifiedFunctionName(function_details.tenant, function_details.namespace, function_details.name),
                           "%s-%s.log" % (args.logging_file, args.instance_id))
-  log.init_rotating_logger(level=logging.INFO, logfile=log_file,
-                           max_files=5, max_bytes=10 * 1024 * 1024)
+  log.init_logger(logging.INFO, log_file, args.logging_config_file)
 
   Log.info("Starting Python instance with %s" % str(args))
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index ee7125fbbe..80eb8400af 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -130,9 +130,18 @@
         this.userCodePkgUrl = userCodePkgUrl;
         this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
+        String logConfigFile = null;
+        switch (instanceConfig.getFunctionDetails().getRuntime()) {
+            case JAVA:
+                logConfigFile = "kubernetes_instance_log4j2.yml";
+                break;
+            case PYTHON:
+                logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
+                break;
+        }
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
-                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies,
-                pythonDependencyRepository, pythonExtraDependencyRepository);
+                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, logConfigFile,
+                installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
         this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 1cf0db9fa7..2fa7f82238 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -73,9 +73,18 @@
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
+        String logConfigFile = null;
+        switch (instanceConfig.getFunctionDetails().getRuntime()) {
+            case JAVA:
+                logConfigFile = "java_instance_log4j2.yml";
+                break;
+            case PYTHON:
+                logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
+                break;
+        }
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
-                "java_instance_log4j2.yml", false, null, null);
+                logConfigFile, false, null, null);
     }
 
     /**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 00a04b5455..26475369dd 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -48,7 +48,7 @@
                                            String shardId,
                                            Integer grpcPort,
                                            Long expectedHealthCheckInterval,
-                                           String javaLog4jFileName,
+                                           String logConfigFile,
                                            Boolean installUserCodeDepdendencies,
                                            String pythonDependencyRepository,
                                            String pythonExtraDependencyRepository) throws Exception {
@@ -61,7 +61,7 @@
             // Keep the same env property pointing to the Java instance file so that it can be picked up
             // by the child process and manually added to classpath
             args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
-            args.add("-Dlog4j.configurationFile=" + javaLog4jFileName);
+            args.add("-Dlog4j.configurationFile=" + logConfigFile);
             args.add("-Dpulsar.function.log.dir=" + String.format(
                     "%s/%s",
                     logDirectory,
@@ -88,6 +88,9 @@
             args.add(logDirectory);
             args.add("--logging_file");
             args.add(instanceConfig.getFunctionDetails().getName());
+            // set logging config file
+            args.add("--logging_config_file");
+            args.add(logConfigFile);
             // `installUserCodeDependencies` is only valid for python runtime
             if (installUserCodeDepdendencies != null && installUserCodeDepdendencies) {
                 args.add("--install_usercode_dependencies");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index bc2a8236c9..e82c75febf 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -145,11 +145,12 @@ public void testPythonConstructor() throws Exception {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 30);
+        assertEquals(args.size(), 32);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + pulsarRootDir + "/" + userJarFile
                 + " --logging_directory " + logDirectory
                 + " --logging_file " + config.getFunctionDetails().getName()
+                + " --logging_config_file " + args.get(9)
                 + " --install_usercode_dependencies True"
                 + " --dependency_repository myrepo"
                 + " --extra_dependency_repository anotherrepo"
@@ -158,7 +159,7 @@ public void testPythonConstructor() throws Exception {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(27)
+                + " --max_buffered_tuples 1024 --port " + args.get(29)
                 + " --expected_healthcheck_interval -1";
         assertEquals(String.join(" ", args), expectedArgs);
     }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index b09b9af492..fe492b859a 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,7 +21,6 @@
 
 import static org.testng.Assert.assertEquals;
 
-import com.google.gson.Gson;
 import com.google.protobuf.util.JsonFormat;
 
 import java.util.HashMap;
@@ -140,15 +139,16 @@ public void testPythonConstructor() throws Exception {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 24);
+        assertEquals(args.size(), 26);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
-                + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
+                + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName()
+                + " --logging_config_file " + args.get(9) + " --instance_id "
                 + config.getInstanceId() + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(21)
+                + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --expected_healthcheck_interval 30";
         assertEquals(String.join(" ", args), expectedArgs);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services