You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/07 15:29:25 UTC

[incubator-pulsar] branch master updated: Have the ability to send log messages to a topic in Python (#1353)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a8a595d  Have the ability to send log messages to a topic in Python (#1353)
a8a595d is described below

commit a8a595d9124d274f030cabff230e3bddc76cc241
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Mar 7 07:29:23 2018 -0800

    Have the ability to send log messages to a topic in Python (#1353)
    
    * Have the ability to send log messages to a topic in Python
    
    * Address review comments
---
 pulsar-functions/instance/src/main/python/log.py   | 26 ++++++++++++++++++++--
 .../instance/src/main/python/python_instance.py    | 16 ++++++++++++-
 .../src/main/python/python_instance_main.py        |  4 +++-
 3 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/log.py b/pulsar-functions/instance/src/main/python/log.py
index f36e684..85b2104 100644
--- a/pulsar-functions/instance/src/main/python/log.py
+++ b/pulsar-functions/instance/src/main/python/log.py
@@ -23,6 +23,7 @@
 ''' log.py '''
 import logging
 from logging.handlers import RotatingFileHandler
+import pulsar
 
 # Create the logger
 # pylint: disable=invalid-name
@@ -34,6 +35,19 @@ Log = logging.getLogger()
 # see time formatter documentation for more
 date_format = "%Y-%m-%d %H:%M:%S %z"
 
+class LogTopicHandler(logging.Handler):
+  def __init__(self, topic_name, pulsar_client):
+    Log.info("Setting up producer for log topic %s" % topic_name)
+    self.producer = pulsar_client.create_producer(
+      str(topic_name),
+      block_if_queue_full=True,
+      batching_enabled=True,
+      batching_max_publish_delay_ms=100,
+      compression_type=pulsar._pulsar.CompressionType.LZ4)
+
+  def emit(self, record):
+    self.producer.send_async(record)
+
 def configure(level=logging.INFO):
   """ Configure logger which dumps log on terminal
 
@@ -50,14 +64,22 @@ def configure(level=logging.INFO):
       Log.handlers.remove(handler)
 
   Log.setLevel(level)
+  stream_handler = logging.StreamHandler()
+  add_handler(stream_handler)
 
+def remove_all_handlers():
+  retval = None
+  for handler in Log.handlers:
+    Log.handlers.remove(handler)
+    retval = handler
+  return retval
+
+def add_handler(stream_handler):
   log_format = "[%(asctime)s] [%(levelname)s]: %(message)s"
   formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
-  stream_handler = logging.StreamHandler()
   stream_handler.setFormatter(formatter)
   Log.addHandler(stream_handler)
 
-
 def init_rotating_logger(level, logfile, max_files, max_bytes):
   """Initializes a rotating logger
 
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 08fcef5..077cb72 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -94,10 +94,13 @@ class Stats(object):
       return self.latency / self.nsuccessfullyprocessed
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, function_config, max_buffered_tuples, user_code, pulsar_client):
+  def __init__(self, instance_id, function_id, function_version, function_config, max_buffered_tuples, user_code, log_topic, pulsar_client):
     self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_config, max_buffered_tuples)
     self.user_code = user_code
     self.queue = Queue.Queue(max_buffered_tuples)
+    self.log_topic_handler = None
+    if log_topic is not None:
+      self.log_topic_handler = log.LogTopicHandler(str(log_topic), pulsar_client)
     self.pulsar_client = pulsar_client
     self.input_serdes = {}
     self.consumers = {}
@@ -174,7 +177,11 @@ class PythonInstance(object):
         continue
       self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic)
       output_object = None
+      self.saved_log_handler = None
       try:
+        if self.log_topic_handler is not None:
+          self.saved_log_handler = log.remove_all_handlers()
+          log.add_handler(self.log_topic_handler)
         start_time = time.time()
         self.current_stats.increment_processed(int(start_time) * 1000)
         self.total_stats.increment_processed(int(start_time) * 1000)
@@ -188,9 +195,16 @@ class PythonInstance(object):
         self.current_stats.increment_successfully_processed(latency)
         self.process_result(output_object, msg)
       except Exception as e:
+        if self.log_topic_handler is not None:
+          log.remove_all_handlers()
+          log.add_handler(self.saved_log_handler)
         Log.exception("Exception while executing user method")
         self.total_stats.record_user_exception(e)
         self.current_stats.record_user_exception(e)
+      finally:
+        if self.log_topic_handler is not None:
+          log.remove_all_handlers()
+          log.add_handler(self.saved_log_handler)
 
   def done_producing(self, consumer, orig_message, result, sent_message):
     if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 3fd4358..4fe1641 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -73,6 +73,7 @@ def main():
   parser.add_argument('--logging_directory', required=True, help='Logging Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
   parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?')
+  parser.add_argument('--log_topic', required=False, help='Topic to send Log Messages')
 
   args = parser.parse_args()
   log_file = os.path.join(args.logging_directory, args.logging_file + ".log.0")
@@ -117,7 +118,8 @@ def main():
   pulsar_client = pulsar.Client(args.pulsar_serviceurl)
   pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
                                               str(args.function_version), function_config,
-                                              int(args.max_buffered_tuples), str(args.py), pulsar_client)
+                                              int(args.max_buffered_tuples), str(args.py),
+                                              args.log_topic, pulsar_client)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.