You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/10/26 17:13:43 UTC
[pulsar] branch master updated: Cleanup Logging for python
functions (#2847)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fa212bc Cleanup Logging for python functions (#2847)
fa212bc is described below
commit fa212bc8bb0cbd81729b498b6041cde794be9684
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Oct 26 10:13:39 2018 -0700
Cleanup Logging for python functions (#2847)
* Cleanup Logging for python functions
* More statements to debug
* More debug statements
* More debug
---
.../instance/src/main/python/python_instance.py | 8 ++---
.../instance/src/main/python/server.py | 8 ++---
pulsar-functions/instance/src/main/python/util.py | 37 ++++++++++------------
3 files changed, 24 insertions(+), 29 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 03dafae..5f1645d 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -187,7 +187,7 @@ class PythonInstance(object):
else:
serde_kclass = util.import_class(os.path.dirname(self.user_code), serde)
self.input_serdes[topic] = serde_kclass()
- Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
+ Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
self.consumers[topic] = self.pulsar_client.subscribe(
str(topic), subscription_name,
consumer_type=mode,
@@ -201,7 +201,7 @@ class PythonInstance(object):
else:
serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName)
self.input_serdes[topic] = serde_kclass()
- Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
+ Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
if consumer_conf.isRegexPattern:
self.consumers[topic] = self.pulsar_client.subscribe(
re.compile(str(topic)), subscription_name,
@@ -237,7 +237,7 @@ class PythonInstance(object):
Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
def actual_execution(self):
- Log.info("Started Thread for executing the function")
+ Log.debug("Started Thread for executing the function")
while True:
msg = self.queue.get(True)
if isinstance(msg, InternalQuitMessage):
@@ -321,7 +321,7 @@ class PythonInstance(object):
def setup_producer(self):
if self.instance_config.function_details.sink.topic != None and \
len(self.instance_config.function_details.sink.topic) > 0:
- Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic)
+ Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic)
self.producer = self.pulsar_client.create_producer(
str(self.instance_config.function_details.sink.topic),
block_if_queue_full=True,
diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py
index 611a737..58d43d2 100644
--- a/pulsar-functions/instance/src/main/python/server.py
+++ b/pulsar-functions/instance/src/main/python/server.py
@@ -35,20 +35,20 @@ class InstanceCommunicationServicer(InstanceCommunication_pb2_grpc.InstanceContr
self.pyinstance = pyinstance
def GetFunctionStatus(self, request, context):
- Log.info("Came in GetFunctionStatus")
+ Log.debug("Came in GetFunctionStatus")
return self.pyinstance.get_function_status()
def GetAndResetMetrics(self, request, context):
- Log.info("Came in GetAndResetMetrics")
+ Log.debug("Came in GetAndResetMetrics")
return self.pyinstance.get_and_reset_metrics()
def ResetMetrics(self, request, context):
- Log.info("Came in ResetMetrics")
+ Log.debug("Came in ResetMetrics")
self.pyinstance.reset_metrics()
return request
def GetMetrics(self, request, context):
- Log.info("Came in GetMetrics")
+ Log.debug("Came in GetMetrics")
return self.pyinstance.get_metrics()
def HealthCheck(self, request, context):
diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py
index 4736457..56c1ce1 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -36,38 +36,33 @@ PULSAR_FUNCTIONS_API_ROOT = 'functions'
def import_class(from_path, full_class_name):
from_path = str(from_path)
full_class_name = str(full_class_name)
- kclass = import_class_from_path(from_path, full_class_name)
- if kclass is None:
+ try:
+ return import_class_from_path(from_path, full_class_name)
+ except Exception as e:
our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT)
- kclass = import_class_from_path(api_dir, full_class_name)
- return kclass
+ try:
+ return import_class_from_path(api_dir, full_class_name)
+ except Exception as e:
+ Log.info("Failed to import class %s from path %s" % (full_class_name, from_path))
+ Log.info(e, exc_info=True)
+ return None
def import_class_from_path(from_path, full_class_name):
- Log.info('Trying to import %s from path %s' % (full_class_name, from_path))
+ Log.debug('Trying to import %s from path %s' % (full_class_name, from_path))
split = full_class_name.split('.')
classname_path = '.'.join(split[:-1])
class_name = full_class_name.split('.')[-1]
if from_path not in sys.path:
- Log.info("Add a new dependency to the path: %s" % from_path)
+ Log.debug("Add a new dependency to the path: %s" % from_path)
sys.path.insert(0, from_path)
if not classname_path:
- try:
- mod = importlib.import_module(class_name)
- return mod
- except Exception as e:
- Log.info("Import failed class_name %s from path %s" % (class_name, from_path))
- Log.info(e, exc_info=True)
- return None
+ mod = importlib.import_module(class_name)
+ return mod
else:
- try:
- mod = importlib.import_module(classname_path)
- retval = getattr(mod, class_name)
- return retval
- except Exception as e:
- Log.info("Import failed class_name %s from path %s" % (class_name, from_path))
- Log.info(e, exc_info=True)
- return None
+ mod = importlib.import_module(classname_path)
+ retval = getattr(mod, class_name)
+ return retval
def getFullyQualifiedFunctionName(tenant, namespace, name):
return "%s/%s/%s" % (tenant, namespace, name)