You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/12/11 04:40:05 UTC

[skywalking-python] branch master updated: [Bugfix] allow pending data to send before exit (#98)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new b87cbd1  [Bugfix] allow pending data to send before exit (#98)
b87cbd1 is described below

commit b87cbd13311a5604d4b9409645cf05f2d344c590
Author: Tomasz Pytel <to...@gmail.com>
AuthorDate: Fri Dec 11 01:39:55 2020 -0300

    [Bugfix] allow pending data to send before exit (#98)
---
 skywalking/agent/__init__.py          | 14 +++++++++++---
 skywalking/agent/protocol/__init__.py |  2 +-
 skywalking/agent/protocol/grpc.py     |  4 ++--
 skywalking/agent/protocol/http.py     |  4 ++--
 skywalking/agent/protocol/kafka.py    |  4 ++--
 5 files changed, 18 insertions(+), 10 deletions(-)

diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 3a6b001..e8544dc 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -15,12 +15,13 @@
 # limitations under the License.
 #
 
-from skywalking.loggings import logger
+import atexit
 from queue import Queue
 from threading import Thread, Event
 from typing import TYPE_CHECKING
 
-from skywalking import config, plugins
+from skywalking import config, plugins, loggings
+from skywalking.loggings import logger
 from skywalking.agent.protocol import Protocol
 
 if TYPE_CHECKING:
@@ -66,20 +67,27 @@ def __init():
     plugins.install()
 
 
+def __fini():
+    __protocol.report(__queue, False)
+    __queue.join()
+
+
 def start():
     global __started
     if __started:
         raise RuntimeError('the agent can only be started once')
-    from skywalking import loggings
     loggings.init()
     config.finalize()
     __started = True
     __init()
     __heartbeat_thread.start()
     __report_thread.start()
+    atexit.register(__fini)
 
 
 def stop():
+    atexit.unregister(__fini)
+    __fini()
     __finished.set()
 
 
diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py
index 8237f32..0f6e62e 100644
--- a/skywalking/agent/protocol/__init__.py
+++ b/skywalking/agent/protocol/__init__.py
@@ -26,5 +26,5 @@ class Protocol(ABC):
     def heartbeat(self):
         raise NotImplementedError()
 
-    def report(self, queue: Queue):
+    def report(self, queue: Queue, block: bool = True):
         raise NotImplementedError()
diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py
index 0a7da16..f5468a4 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -67,10 +67,10 @@ class GrpcProtocol(Protocol):
         self.channel.unsubscribe(self._cb)
         self.channel.subscribe(self._cb, try_to_connect=True)
 
-    def report(self, queue: Queue):
+    def report(self, queue: Queue, block: bool = True):
         def generator():
             while True:
-                segment = queue.get()  # type: Segment
+                segment = queue.get(block=block)  # type: Segment
 
                 logger.debug('reporting segment %s', segment)
 
diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py
index e97cb22..331f71a 100644
--- a/skywalking/agent/protocol/http.py
+++ b/skywalking/agent/protocol/http.py
@@ -38,10 +38,10 @@ class HttpProtocol(Protocol):
     def connected(self):
         return True
 
-    def report(self, queue: Queue):
+    def report(self, queue: Queue, block: bool = True):
         def generator():
             while True:
-                segment = queue.get()  # type: Segment
+                segment = queue.get(block=block)  # type: Segment
 
                 logger.debug('reporting segment %s', segment)
 
diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py
index 405a728..8e6e75a 100644
--- a/skywalking/agent/protocol/kafka.py
+++ b/skywalking/agent/protocol/kafka.py
@@ -42,10 +42,10 @@ class KafkaProtocol(Protocol):
     def heartbeat(self):
         self.service_management.send_heart_beat()
 
-    def report(self, queue: Queue):
+    def report(self, queue: Queue, block: bool = True):
         def generator():
             while True:
-                segment = queue.get()  # type: Segment
+                segment = queue.get(block=block)  # type: Segment
 
                 logger.debug('reporting segment %s', segment)