You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/07 15:29:25 UTC
[incubator-pulsar] branch master updated: Have the ability to send
log messages to a topic in Python (#1353)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a8a595d Have the ability to send log messages to a topic in Python (#1353)
a8a595d is described below
commit a8a595d9124d274f030cabff230e3bddc76cc241
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Mar 7 07:29:23 2018 -0800
Have the ability to send log messages to a topic in Python (#1353)
* Have the ability to send log messages to a topic in Python
* Address review comments
---
pulsar-functions/instance/src/main/python/log.py | 26 ++++++++++++++++++++--
.../instance/src/main/python/python_instance.py | 16 ++++++++++++-
.../src/main/python/python_instance_main.py | 4 +++-
3 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/log.py b/pulsar-functions/instance/src/main/python/log.py
index f36e684..85b2104 100644
--- a/pulsar-functions/instance/src/main/python/log.py
+++ b/pulsar-functions/instance/src/main/python/log.py
@@ -23,6 +23,7 @@
''' log.py '''
import logging
from logging.handlers import RotatingFileHandler
+import pulsar
# Create the logger
# pylint: disable=invalid-name
@@ -34,6 +35,19 @@ Log = logging.getLogger()
# see time formatter documentation for more
date_format = "%Y-%m-%d %H:%M:%S %z"
+class LogTopicHandler(logging.Handler):
+ def __init__(self, topic_name, pulsar_client):
+ Log.info("Setting up producer for log topic %s" % topic_name)
+ self.producer = pulsar_client.create_producer(
+ str(topic_name),
+ block_if_queue_full=True,
+ batching_enabled=True,
+ batching_max_publish_delay_ms=100,
+ compression_type=pulsar._pulsar.CompressionType.LZ4)
+
+ def emit(self, record):
+ self.producer.send_async(record)
+
def configure(level=logging.INFO):
""" Configure logger which dumps log on terminal
@@ -50,14 +64,22 @@ def configure(level=logging.INFO):
Log.handlers.remove(handler)
Log.setLevel(level)
+ stream_handler = logging.StreamHandler()
+ add_handler(stream_handler)
+def remove_all_handlers():
+ retval = None
+ for handler in Log.handlers:
+ Log.handlers.remove(handler)
+ retval = handler
+ return retval
+
+def add_handler(stream_handler):
log_format = "[%(asctime)s] [%(levelname)s]: %(message)s"
formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
- stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
Log.addHandler(stream_handler)
-
def init_rotating_logger(level, logfile, max_files, max_bytes):
"""Initializes a rotating logger
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 08fcef5..077cb72 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -94,10 +94,13 @@ class Stats(object):
return self.latency / self.nsuccessfullyprocessed
class PythonInstance(object):
- def __init__(self, instance_id, function_id, function_version, function_config, max_buffered_tuples, user_code, pulsar_client):
+ def __init__(self, instance_id, function_id, function_version, function_config, max_buffered_tuples, user_code, log_topic, pulsar_client):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_config, max_buffered_tuples)
self.user_code = user_code
self.queue = Queue.Queue(max_buffered_tuples)
+ self.log_topic_handler = None
+ if log_topic is not None:
+ self.log_topic_handler = log.LogTopicHandler(str(log_topic), pulsar_client)
self.pulsar_client = pulsar_client
self.input_serdes = {}
self.consumers = {}
@@ -174,7 +177,11 @@ class PythonInstance(object):
continue
self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic)
output_object = None
+ self.saved_log_handler = None
try:
+ if self.log_topic_handler is not None:
+ self.saved_log_handler = log.remove_all_handlers()
+ log.add_handler(self.log_topic_handler)
start_time = time.time()
self.current_stats.increment_processed(int(start_time) * 1000)
self.total_stats.increment_processed(int(start_time) * 1000)
@@ -188,9 +195,16 @@ class PythonInstance(object):
self.current_stats.increment_successfully_processed(latency)
self.process_result(output_object, msg)
except Exception as e:
+ if self.log_topic_handler is not None:
+ log.remove_all_handlers()
+ log.add_handler(self.saved_log_handler)
Log.exception("Exception while executing user method")
self.total_stats.record_user_exception(e)
self.current_stats.record_user_exception(e)
+ finally:
+ if self.log_topic_handler is not None:
+ log.remove_all_handlers()
+ log.add_handler(self.saved_log_handler)
def done_producing(self, consumer, orig_message, result, sent_message):
if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 3fd4358..4fe1641 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -73,6 +73,7 @@ def main():
parser.add_argument('--logging_directory', required=True, help='Logging Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?')
+ parser.add_argument('--log_topic', required=False, help='Topic to send Log Messages')
args = parser.parse_args()
log_file = os.path.join(args.logging_directory, args.logging_file + ".log.0")
@@ -117,7 +118,8 @@ def main():
pulsar_client = pulsar.Client(args.pulsar_serviceurl)
pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
str(args.function_version), function_config,
- int(args.max_buffered_tuples), str(args.py), pulsar_client)
+ int(args.max_buffered_tuples), str(args.py),
+ args.log_topic, pulsar_client)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.