You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by to...@apache.org on 2021/08/03 12:56:49 UTC

[skywalking-python] branch master updated: fix grpc disconnect, SW_AGENT_MAX_BUFFER_SIZE (#138)

This is an automated email from the ASF dual-hosted git repository.

tompytel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb5962  fix grpc disconnect, SW_AGENT_MAX_BUFFER_SIZE (#138)
bcb5962 is described below

commit bcb5962b6abf2bd166fb0691252b21303ac79640
Author: Tomasz Pytel <to...@gmail.com>
AuthorDate: Tue Aug 3 09:55:27 2021 -0300

    fix grpc disconnect, SW_AGENT_MAX_BUFFER_SIZE (#138)
    
    Co-authored-by: kezhenxu94 <ke...@apache.org>
---
 docs/EnvVars.md                       |  1 +
 skywalking/agent/__init__.py          | 22 ++++++++++---------
 skywalking/agent/protocol/__init__.py |  3 ---
 skywalking/agent/protocol/grpc.py     | 41 ++++++++++++-----------------------
 skywalking/agent/protocol/http.py     | 11 +++++-----
 skywalking/agent/protocol/kafka.py    | 10 +++++++--
 skywalking/client/grpc.py             |  2 +-
 skywalking/config.py                  |  6 ++---
 skywalking/trace/context.py           |  2 +-
 9 files changed, 44 insertions(+), 54 deletions(-)

diff --git a/docs/EnvVars.md b/docs/EnvVars.md
index 6874e6a..5c6e18d 100644
--- a/docs/EnvVars.md
+++ b/docs/EnvVars.md
@@ -11,6 +11,7 @@ Environment Variable | Description | Default
 | `SW_AGENT_AUTHENTICATION` | The authentication token to verify that the agent is trusted by the backend OAP, as for how to configure the backend, refer to [the yaml](https://github.com/apache/skywalking/blob/4f0f39ffccdc9b41049903cc540b8904f7c9728e/oap-server/server-bootstrap/src/main/resources/application.yml#L155-L158). | unset |
 | `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `CRITICAL`, `FATAL`, `ERROR`, `WARN`(`WARNING`), `INFO`, `DEBUG` | `INFO` |
 | `SW_AGENT_DISABLE_PLUGINS` | The name patterns in CSV pattern, plugins whose name matches one of the pattern won't be installed | `''` |
+| `SW_AGENT_MAX_BUFFER_SIZE` | The maximum queue backlog size for sending the segment data to backend, segments beyond this are silently dropped | `'1000'` |
 | `SW_SQL_PARAMETERS_LENGTH` | The maximum length of the collected parameter, parameters longer than the specified length will be truncated, length 0 turns off parameter tracing | `0` |
 | `SW_PYMONGO_TRACE_PARAMETERS` | Indicates whether to collect the filters of pymongo | `False` |
 | `SW_PYMONGO_PARAMETERS_MAX_LENGTH` | The maximum length of the collected filters, filters longer than the specified length will be truncated |  `512` |
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 9ec960a..4587e5e 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -38,24 +38,30 @@ __heartbeat_thread = __report_thread = __query_profile_thread = __command_dispat
 
 def __heartbeat():
     while not __finished.is_set():
-        if connected():
+        try:
             __protocol.heartbeat()
+        except Exception as exc:
+            logger.error(str(exc))
 
-        __finished.wait(30 if connected() else 3)
+        __finished.wait(30)
 
 
 def __report():
     while not __finished.is_set():
-        if connected():
+        try:
             __protocol.report(__queue)  # is blocking actually, blocks for max config.QUEUE_TIMEOUT seconds
+        except Exception as exc:
+            logger.error(str(exc))
 
-        __finished.wait(1)
+        __finished.wait(0)
 
 
 def __query_profile_command():
     while not __finished.is_set():
-        if connected():
+        try:
             __protocol.query_profile_commands()
+        except Exception as exc:
+            logger.error(str(exc))
 
         __finished.wait(profile_task_query_interval)
 
@@ -68,7 +74,7 @@ def __command_dispatch():
 def __init_threading():
     global __heartbeat_thread, __report_thread,  __query_profile_thread, __command_dispatch_thread, __queue, __finished
 
-    __queue = Queue(maxsize=10000)
+    __queue = Queue(maxsize=config.max_buffer_size)
     __finished = Event()
     __heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
     __report_thread = Thread(name='ReportThread', target=__report, daemon=True)
@@ -162,10 +168,6 @@ def started():
     return __started
 
 
-def connected():
-    return __protocol.connected()
-
-
 def archive(segment: 'Segment'):
     try:  # unlike checking __queue.full() then inserting, this is atomic
         __queue.put(segment, block=False)
diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py
index 38f8482..96816da 100644
--- a/skywalking/agent/protocol/__init__.py
+++ b/skywalking/agent/protocol/__init__.py
@@ -29,9 +29,6 @@ class Protocol(ABC):
     def fork_after_in_child(self):
         pass
 
-    def connected(self):
-        return False
-
     def heartbeat(self):
         raise NotImplementedError()
 
diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py
index 5f76472..6ef9e78 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -17,7 +17,7 @@
 
 import logging
 import traceback
-from queue import Queue, Empty, Full
+from queue import Queue, Empty
 from time import time
 
 import grpc
@@ -35,15 +35,13 @@ from skywalking.trace.segment import Segment
 
 class GrpcProtocol(Protocol):
     def __init__(self):
+        self.properties_sent = False
         self.state = None
 
         if config.force_tls:
-            self.channel = grpc.secure_channel(config.collector_address, grpc.ssl_channel_credentials(),
-                                               options=(('grpc.max_connection_age_grace_ms',
-                                                         1000 * config.GRPC_TIMEOUT),))
+            self.channel = grpc.secure_channel(config.collector_address, grpc.ssl_channel_credentials())
         else:
-            self.channel = grpc.insecure_channel(config.collector_address, options=(('grpc.max_connection_age_grace_ms',
-                                                 1000 * config.GRPC_TIMEOUT),))
+            self.channel = grpc.insecure_channel(config.collector_address)
 
         if config.authentication:
             self.channel = grpc.intercept_channel(
@@ -58,11 +56,6 @@ class GrpcProtocol(Protocol):
     def _cb(self, state):
         logger.debug('grpc channel connectivity changed, [%s -> %s]', self.state, state)
         self.state = state
-        if self.connected():
-            try:
-                self.service_management.send_instance_props()
-            except grpc.RpcError:
-                self.on_error()
 
     def query_profile_commands(self):
         logger.debug("query profile commands")
@@ -70,13 +63,15 @@ class GrpcProtocol(Protocol):
 
     def heartbeat(self):
         try:
+            if not self.properties_sent:
+                self.service_management.send_instance_props()
+                self.properties_sent = True
+
             self.service_management.send_heart_beat()
+
         except grpc.RpcError:
             self.on_error()
 
-    def connected(self):
-        return self.state == grpc.ChannelConnectivity.READY
-
     def on_error(self):
         traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None
         self.channel.unsubscribe(self._cb)
@@ -84,18 +79,19 @@ class GrpcProtocol(Protocol):
 
     def report(self, queue: Queue, block: bool = True):
         start = time()
-        segment = None
 
         def generator():
-            nonlocal segment
-
             while True:
                 try:
-                    timeout = max(0, config.QUEUE_TIMEOUT - int(time() - start))  # type: int
+                    timeout = config.QUEUE_TIMEOUT - int(time() - start)  # type: int
+                    if timeout <= 0:  # this is to make sure we exit eventually instead of being fed continuously
+                        return
                     segment = queue.get(block=block, timeout=timeout)  # type: Segment
                 except Empty:
                     return
 
+                queue.task_done()
+
                 logger.debug('reporting segment %s', segment)
 
                 s = SegmentObject(
@@ -137,16 +133,7 @@ class GrpcProtocol(Protocol):
 
                 yield s
 
-                queue.task_done()
-
         try:
             self.traces_reporter.report(generator())
-
         except grpc.RpcError:
             self.on_error()
-
-            if segment:
-                try:
-                    queue.put(segment, block=False)
-                except Full:
-                    pass
diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py
index 809d1f8..f5b1147 100644
--- a/skywalking/agent/protocol/http.py
+++ b/skywalking/agent/protocol/http.py
@@ -35,9 +35,6 @@ class HttpProtocol(Protocol):
         self.service_management.fork_after_in_child()
         self.traces_reporter.fork_after_in_child()
 
-    def connected(self):
-        return True
-
     def heartbeat(self):
         if not self.properties_sent:
             self.service_management.send_instance_props()
@@ -50,17 +47,19 @@ class HttpProtocol(Protocol):
         def generator():
             while True:
                 try:
-                    timeout = max(0, config.QUEUE_TIMEOUT - int(time() - start))  # type: int
+                    timeout = config.QUEUE_TIMEOUT - int(time() - start)  # type: int
+                    if timeout <= 0:  # this is to make sure we exit eventually instead of being fed continuously
+                        return
                     segment = queue.get(block=block, timeout=timeout)  # type: Segment
                 except Empty:
                     return
 
+                queue.task_done()
+
                 logger.debug('reporting segment %s', segment)
 
                 yield segment
 
-                queue.task_done()
-
         try:
             self.traces_reporter.report(generator=generator())
         except Exception:
diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py
index 3eabfdb..6eaabcb 100644
--- a/skywalking/agent/protocol/kafka.py
+++ b/skywalking/agent/protocol/kafka.py
@@ -18,6 +18,7 @@
 import logging
 from skywalking.loggings import logger, getLogger
 from queue import Queue, Empty
+from time import time
 
 from skywalking import config
 from skywalking.agent import Protocol
@@ -43,13 +44,20 @@ class KafkaProtocol(Protocol):
         self.service_management.send_heart_beat()
 
     def report(self, queue: Queue, block: bool = True):
+        start = time()
+
         def generator():
             while True:
                 try:
+                    timeout = config.QUEUE_TIMEOUT - int(time() - start)  # type: int
+                    if timeout <= 0:  # this is to make sure we exit eventually instead of being fed continuously
+                        return
                     segment = queue.get(block=block)  # type: Segment
                 except Empty:
                     return
 
+                queue.task_done()
+
                 logger.debug('reporting segment %s', segment)
 
                 s = SegmentObject(
@@ -91,6 +99,4 @@ class KafkaProtocol(Protocol):
 
                 yield s
 
-                queue.task_done()
-
         self.traces_reporter.report(generator())
diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py
index 5545170..6cef4a6 100644
--- a/skywalking/client/grpc.py
+++ b/skywalking/client/grpc.py
@@ -60,7 +60,7 @@ class GrpcTraceSegmentReportService(TraceSegmentReportService):
         self.report_stub = TraceSegmentReportServiceStub(channel)
 
     def report(self, generator):
-        self.report_stub.collect(generator, timeout=config.GRPC_TIMEOUT)
+        self.report_stub.collect(generator)
 
 
 class GrpcProfileTaskChannelService(ProfileTaskChannelService):
diff --git a/skywalking/config.py b/skywalking/config.py
index 8741f49..6e35500 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -22,10 +22,7 @@ from typing import TYPE_CHECKING
 if TYPE_CHECKING:
     from typing import List
 
-# In order to prevent timeouts and possible segment loss make sure QUEUE_TIMEOUT is always at least few seconds lower
-# than GRPC_TIMEOUT.
-GRPC_TIMEOUT = 300  # type: int
-QUEUE_TIMEOUT = 240  # type: int
+QUEUE_TIMEOUT = 1  # type: int
 
 RE_IGNORE_PATH = re.compile('^$')  # type: re.Pattern
 
@@ -41,6 +38,7 @@ protocol = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower()  # type: str
 authentication = os.getenv('SW_AGENT_AUTHENTICATION')  # type: str
 logging_level = os.getenv('SW_AGENT_LOGGING_LEVEL') or 'INFO'  # type: str
 disable_plugins = (os.getenv('SW_AGENT_DISABLE_PLUGINS') or '').split(',')  # type: List[str]
+max_buffer_size = int(os.getenv('SW_AGENT_MAX_BUFFER_SIZE', '1000'))  # type: int
 sql_parameters_length = int(os.getenv('SW_SQL_PARAMETERS_LENGTH') or '0')  # type: int
 pymongo_trace_parameters = True if os.getenv('SW_PYMONGO_TRACE_PARAMETERS') and \
                                    os.getenv('SW_PYMONGO_TRACE_PARAMETERS') == 'True' else False  # type: bool
diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py
index 61b0a23..5d65a6e 100644
--- a/skywalking/trace/context.py
+++ b/skywalking/trace/context.py
@@ -269,4 +269,4 @@ def get_context() -> SpanContext:
     if spans:
         return spans[len(spans) - 1].context
 
-    return SpanContext() if agent.connected() else NoopContext()
+    return SpanContext()