You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/12/11 04:40:05 UTC
[skywalking-python] branch master updated: [Bugfix] allow pending
data to send before exit (#98)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new b87cbd1 [Bugfix] allow pending data to send before exit (#98)
b87cbd1 is described below
commit b87cbd13311a5604d4b9409645cf05f2d344c590
Author: Tomasz Pytel <to...@gmail.com>
AuthorDate: Fri Dec 11 01:39:55 2020 -0300
[Bugfix] allow pending data to send before exit (#98)
---
skywalking/agent/__init__.py | 14 +++++++++++---
skywalking/agent/protocol/__init__.py | 2 +-
skywalking/agent/protocol/grpc.py | 4 ++--
skywalking/agent/protocol/http.py | 4 ++--
skywalking/agent/protocol/kafka.py | 4 ++--
5 files changed, 18 insertions(+), 10 deletions(-)
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 3a6b001..e8544dc 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -15,12 +15,13 @@
# limitations under the License.
#
-from skywalking.loggings import logger
+import atexit
from queue import Queue
from threading import Thread, Event
from typing import TYPE_CHECKING
-from skywalking import config, plugins
+from skywalking import config, plugins, loggings
+from skywalking.loggings import logger
from skywalking.agent.protocol import Protocol
if TYPE_CHECKING:
@@ -66,20 +67,27 @@ def __init():
plugins.install()
+def __fini():
+ __protocol.report(__queue, False)
+ __queue.join()
+
+
def start():
global __started
if __started:
raise RuntimeError('the agent can only be started once')
- from skywalking import loggings
loggings.init()
config.finalize()
__started = True
__init()
__heartbeat_thread.start()
__report_thread.start()
+ atexit.register(__fini)
def stop():
+ atexit.unregister(__fini)
+ __fini()
__finished.set()
diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py
index 8237f32..0f6e62e 100644
--- a/skywalking/agent/protocol/__init__.py
+++ b/skywalking/agent/protocol/__init__.py
@@ -26,5 +26,5 @@ class Protocol(ABC):
def heartbeat(self):
raise NotImplementedError()
- def report(self, queue: Queue):
+ def report(self, queue: Queue, block: bool = True):
raise NotImplementedError()
diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py
index 0a7da16..f5468a4 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -67,10 +67,10 @@ class GrpcProtocol(Protocol):
self.channel.unsubscribe(self._cb)
self.channel.subscribe(self._cb, try_to_connect=True)
- def report(self, queue: Queue):
+ def report(self, queue: Queue, block: bool = True):
def generator():
while True:
- segment = queue.get() # type: Segment
+ segment = queue.get(block=block) # type: Segment
logger.debug('reporting segment %s', segment)
diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py
index e97cb22..331f71a 100644
--- a/skywalking/agent/protocol/http.py
+++ b/skywalking/agent/protocol/http.py
@@ -38,10 +38,10 @@ class HttpProtocol(Protocol):
def connected(self):
return True
- def report(self, queue: Queue):
+ def report(self, queue: Queue, block: bool = True):
def generator():
while True:
- segment = queue.get() # type: Segment
+ segment = queue.get(block=block) # type: Segment
logger.debug('reporting segment %s', segment)
diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py
index 405a728..8e6e75a 100644
--- a/skywalking/agent/protocol/kafka.py
+++ b/skywalking/agent/protocol/kafka.py
@@ -42,10 +42,10 @@ class KafkaProtocol(Protocol):
def heartbeat(self):
self.service_management.send_heart_beat()
- def report(self, queue: Queue):
+ def report(self, queue: Queue, block: bool = True):
def generator():
while True:
- segment = queue.get() # type: Segment
+ segment = queue.get(block=block) # type: Segment
logger.debug('reporting segment %s', segment)