You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/10/26 17:13:43 UTC

[pulsar] branch master updated: Cleanup Logging for python functions (#2847)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fa212bc  Cleanup Logging for python functions (#2847)
fa212bc is described below

commit fa212bc8bb0cbd81729b498b6041cde794be9684
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Oct 26 10:13:39 2018 -0700

    Cleanup Logging for python functions (#2847)
    
    * Cleanup Logging for python functions
    
    * More statements to debug
    
    * More debug statements
    
    * More debug
---
 .../instance/src/main/python/python_instance.py    |  8 ++---
 .../instance/src/main/python/server.py             |  8 ++---
 pulsar-functions/instance/src/main/python/util.py  | 37 ++++++++++------------
 3 files changed, 24 insertions(+), 29 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 03dafae..5f1645d 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -187,7 +187,7 @@ class PythonInstance(object):
       else:
         serde_kclass = util.import_class(os.path.dirname(self.user_code), serde)
       self.input_serdes[topic] = serde_kclass()
-      Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
+      Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
       self.consumers[topic] = self.pulsar_client.subscribe(
         str(topic), subscription_name,
         consumer_type=mode,
@@ -201,7 +201,7 @@ class PythonInstance(object):
       else:
         serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName)
       self.input_serdes[topic] = serde_kclass()
-      Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
+      Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
       if consumer_conf.isRegexPattern:
         self.consumers[topic] = self.pulsar_client.subscribe(
           re.compile(str(topic)), subscription_name,
@@ -237,7 +237,7 @@ class PythonInstance(object):
       Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
 
   def actual_execution(self):
-    Log.info("Started Thread for executing the function")
+    Log.debug("Started Thread for executing the function")
     while True:
       msg = self.queue.get(True)
       if isinstance(msg, InternalQuitMessage):
@@ -321,7 +321,7 @@ class PythonInstance(object):
   def setup_producer(self):
     if self.instance_config.function_details.sink.topic != None and \
             len(self.instance_config.function_details.sink.topic) > 0:
-      Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic)
+      Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic)
       self.producer = self.pulsar_client.create_producer(
         str(self.instance_config.function_details.sink.topic),
         block_if_queue_full=True,
diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py
index 611a737..58d43d2 100644
--- a/pulsar-functions/instance/src/main/python/server.py
+++ b/pulsar-functions/instance/src/main/python/server.py
@@ -35,20 +35,20 @@ class InstanceCommunicationServicer(InstanceCommunication_pb2_grpc.InstanceContr
     self.pyinstance = pyinstance
 
   def GetFunctionStatus(self, request, context):
-    Log.info("Came in GetFunctionStatus")
+    Log.debug("Came in GetFunctionStatus")
     return self.pyinstance.get_function_status()
 
   def GetAndResetMetrics(self, request, context):
-    Log.info("Came in GetAndResetMetrics")
+    Log.debug("Came in GetAndResetMetrics")
     return self.pyinstance.get_and_reset_metrics()
 
   def ResetMetrics(self, request, context):
-    Log.info("Came in ResetMetrics")
+    Log.debug("Came in ResetMetrics")
     self.pyinstance.reset_metrics()
     return request
 
   def GetMetrics(self, request, context):
-    Log.info("Came in GetMetrics")
+    Log.debug("Came in GetMetrics")
     return self.pyinstance.get_metrics()
 
   def HealthCheck(self, request, context):
diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py
index 4736457..56c1ce1 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -36,38 +36,33 @@ PULSAR_FUNCTIONS_API_ROOT = 'functions'
 def import_class(from_path, full_class_name):
   from_path = str(from_path)
   full_class_name = str(full_class_name)
-  kclass = import_class_from_path(from_path, full_class_name)
-  if kclass is None:
+  try:
+    return import_class_from_path(from_path, full_class_name)
+  except Exception as e:
     our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
     api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT)
-    kclass = import_class_from_path(api_dir, full_class_name)
-  return kclass
+    try:
+      return import_class_from_path(api_dir, full_class_name)
+    except Exception as e:
+      Log.info("Failed to import class %s from path %s" % (full_class_name, from_path))
+      Log.info(e, exc_info=True)
+      return None
 
 def import_class_from_path(from_path, full_class_name):
-  Log.info('Trying to import %s from path %s' % (full_class_name, from_path))
+  Log.debug('Trying to import %s from path %s' % (full_class_name, from_path))
   split = full_class_name.split('.')
   classname_path = '.'.join(split[:-1])
   class_name = full_class_name.split('.')[-1]
   if from_path not in sys.path:
-    Log.info("Add a new dependency to the path: %s" % from_path)
+    Log.debug("Add a new dependency to the path: %s" % from_path)
     sys.path.insert(0, from_path)
   if not classname_path:
-    try:
-      mod = importlib.import_module(class_name)
-      return mod
-    except Exception as e:
-      Log.info("Import failed class_name %s from path %s" % (class_name, from_path))
-      Log.info(e, exc_info=True)
-      return None
+    mod = importlib.import_module(class_name)
+    return mod
   else:
-    try:
-      mod = importlib.import_module(classname_path)
-      retval = getattr(mod, class_name)
-      return retval
-    except Exception as e:
-      Log.info("Import failed class_name %s from path %s" % (class_name, from_path))
-      Log.info(e, exc_info=True)
-      return None
+    mod = importlib.import_module(classname_path)
+    retval = getattr(mod, class_name)
+    return retval
 
 def getFullyQualifiedFunctionName(tenant, namespace, name):
   return "%s/%s/%s" % (tenant, namespace, name)