You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by to...@apache.org on 2021/08/03 12:56:49 UTC
[skywalking-python] branch master updated: fix grpc disconnect,
SW_AGENT_MAX_BUFFER_SIZE (#138)
This is an automated email from the ASF dual-hosted git repository.
tompytel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new bcb5962 fix grpc disconnect, SW_AGENT_MAX_BUFFER_SIZE (#138)
bcb5962 is described below
commit bcb5962b6abf2bd166fb0691252b21303ac79640
Author: Tomasz Pytel <to...@gmail.com>
AuthorDate: Tue Aug 3 09:55:27 2021 -0300
fix grpc disconnect, SW_AGENT_MAX_BUFFER_SIZE (#138)
Co-authored-by: kezhenxu94 <ke...@apache.org>
---
docs/EnvVars.md | 1 +
skywalking/agent/__init__.py | 22 ++++++++++---------
skywalking/agent/protocol/__init__.py | 3 ---
skywalking/agent/protocol/grpc.py | 41 ++++++++++++-----------------------
skywalking/agent/protocol/http.py | 11 +++++-----
skywalking/agent/protocol/kafka.py | 10 +++++++--
skywalking/client/grpc.py | 2 +-
skywalking/config.py | 6 ++---
skywalking/trace/context.py | 2 +-
9 files changed, 44 insertions(+), 54 deletions(-)
diff --git a/docs/EnvVars.md b/docs/EnvVars.md
index 6874e6a..5c6e18d 100644
--- a/docs/EnvVars.md
+++ b/docs/EnvVars.md
@@ -11,6 +11,7 @@ Environment Variable | Description | Default
| `SW_AGENT_AUTHENTICATION` | The authentication token to verify that the agent is trusted by the backend OAP, as for how to configure the backend, refer to [the yaml](https://github.com/apache/skywalking/blob/4f0f39ffccdc9b41049903cc540b8904f7c9728e/oap-server/server-bootstrap/src/main/resources/application.yml#L155-L158). | unset |
| `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `CRITICAL`, `FATAL`, `ERROR`, `WARN`(`WARNING`), `INFO`, `DEBUG` | `INFO` |
| `SW_AGENT_DISABLE_PLUGINS` | The name patterns in CSV pattern, plugins whose name matches one of the pattern won't be installed | `''` |
+| `SW_AGENT_MAX_BUFFER_SIZE` | The maximum queue backlog size for sending the segment data to backend, segments beyond this are silently dropped | `'1000'` |
| `SW_SQL_PARAMETERS_LENGTH` | The maximum length of the collected parameter, parameters longer than the specified length will be truncated, length 0 turns off parameter tracing | `0` |
| `SW_PYMONGO_TRACE_PARAMETERS` | Indicates whether to collect the filters of pymongo | `False` |
| `SW_PYMONGO_PARAMETERS_MAX_LENGTH` | The maximum length of the collected filters, filters longer than the specified length will be truncated | `512` |
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 9ec960a..4587e5e 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -38,24 +38,30 @@ __heartbeat_thread = __report_thread = __query_profile_thread = __command_dispat
def __heartbeat():
while not __finished.is_set():
- if connected():
+ try:
__protocol.heartbeat()
+ except Exception as exc:
+ logger.error(str(exc))
- __finished.wait(30 if connected() else 3)
+ __finished.wait(30)
def __report():
while not __finished.is_set():
- if connected():
+ try:
__protocol.report(__queue) # is blocking actually, blocks for max config.QUEUE_TIMEOUT seconds
+ except Exception as exc:
+ logger.error(str(exc))
- __finished.wait(1)
+ __finished.wait(0)
def __query_profile_command():
while not __finished.is_set():
- if connected():
+ try:
__protocol.query_profile_commands()
+ except Exception as exc:
+ logger.error(str(exc))
__finished.wait(profile_task_query_interval)
@@ -68,7 +74,7 @@ def __command_dispatch():
def __init_threading():
global __heartbeat_thread, __report_thread, __query_profile_thread, __command_dispatch_thread, __queue, __finished
- __queue = Queue(maxsize=10000)
+ __queue = Queue(maxsize=config.max_buffer_size)
__finished = Event()
__heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
__report_thread = Thread(name='ReportThread', target=__report, daemon=True)
@@ -162,10 +168,6 @@ def started():
return __started
-def connected():
- return __protocol.connected()
-
-
def archive(segment: 'Segment'):
try: # unlike checking __queue.full() then inserting, this is atomic
__queue.put(segment, block=False)
diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py
index 38f8482..96816da 100644
--- a/skywalking/agent/protocol/__init__.py
+++ b/skywalking/agent/protocol/__init__.py
@@ -29,9 +29,6 @@ class Protocol(ABC):
def fork_after_in_child(self):
pass
- def connected(self):
- return False
-
def heartbeat(self):
raise NotImplementedError()
diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py
index 5f76472..6ef9e78 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -17,7 +17,7 @@
import logging
import traceback
-from queue import Queue, Empty, Full
+from queue import Queue, Empty
from time import time
import grpc
@@ -35,15 +35,13 @@ from skywalking.trace.segment import Segment
class GrpcProtocol(Protocol):
def __init__(self):
+ self.properties_sent = False
self.state = None
if config.force_tls:
- self.channel = grpc.secure_channel(config.collector_address, grpc.ssl_channel_credentials(),
- options=(('grpc.max_connection_age_grace_ms',
- 1000 * config.GRPC_TIMEOUT),))
+ self.channel = grpc.secure_channel(config.collector_address, grpc.ssl_channel_credentials())
else:
- self.channel = grpc.insecure_channel(config.collector_address, options=(('grpc.max_connection_age_grace_ms',
- 1000 * config.GRPC_TIMEOUT),))
+ self.channel = grpc.insecure_channel(config.collector_address)
if config.authentication:
self.channel = grpc.intercept_channel(
@@ -58,11 +56,6 @@ class GrpcProtocol(Protocol):
def _cb(self, state):
logger.debug('grpc channel connectivity changed, [%s -> %s]', self.state, state)
self.state = state
- if self.connected():
- try:
- self.service_management.send_instance_props()
- except grpc.RpcError:
- self.on_error()
def query_profile_commands(self):
logger.debug("query profile commands")
@@ -70,13 +63,15 @@ class GrpcProtocol(Protocol):
def heartbeat(self):
try:
+ if not self.properties_sent:
+ self.service_management.send_instance_props()
+ self.properties_sent = True
+
self.service_management.send_heart_beat()
+
except grpc.RpcError:
self.on_error()
- def connected(self):
- return self.state == grpc.ChannelConnectivity.READY
-
def on_error(self):
traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None
self.channel.unsubscribe(self._cb)
@@ -84,18 +79,19 @@ class GrpcProtocol(Protocol):
def report(self, queue: Queue, block: bool = True):
start = time()
- segment = None
def generator():
- nonlocal segment
-
while True:
try:
- timeout = max(0, config.QUEUE_TIMEOUT - int(time() - start)) # type: int
+ timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
+ if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
+ return
segment = queue.get(block=block, timeout=timeout) # type: Segment
except Empty:
return
+ queue.task_done()
+
logger.debug('reporting segment %s', segment)
s = SegmentObject(
@@ -137,16 +133,7 @@ class GrpcProtocol(Protocol):
yield s
- queue.task_done()
-
try:
self.traces_reporter.report(generator())
-
except grpc.RpcError:
self.on_error()
-
- if segment:
- try:
- queue.put(segment, block=False)
- except Full:
- pass
diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py
index 809d1f8..f5b1147 100644
--- a/skywalking/agent/protocol/http.py
+++ b/skywalking/agent/protocol/http.py
@@ -35,9 +35,6 @@ class HttpProtocol(Protocol):
self.service_management.fork_after_in_child()
self.traces_reporter.fork_after_in_child()
- def connected(self):
- return True
-
def heartbeat(self):
if not self.properties_sent:
self.service_management.send_instance_props()
@@ -50,17 +47,19 @@ class HttpProtocol(Protocol):
def generator():
while True:
try:
- timeout = max(0, config.QUEUE_TIMEOUT - int(time() - start)) # type: int
+ timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
+ if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
+ return
segment = queue.get(block=block, timeout=timeout) # type: Segment
except Empty:
return
+ queue.task_done()
+
logger.debug('reporting segment %s', segment)
yield segment
- queue.task_done()
-
try:
self.traces_reporter.report(generator=generator())
except Exception:
diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py
index 3eabfdb..6eaabcb 100644
--- a/skywalking/agent/protocol/kafka.py
+++ b/skywalking/agent/protocol/kafka.py
@@ -18,6 +18,7 @@
import logging
from skywalking.loggings import logger, getLogger
from queue import Queue, Empty
+from time import time
from skywalking import config
from skywalking.agent import Protocol
@@ -43,13 +44,20 @@ class KafkaProtocol(Protocol):
self.service_management.send_heart_beat()
def report(self, queue: Queue, block: bool = True):
+ start = time()
+
def generator():
while True:
try:
+ timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
+ if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
+ return
segment = queue.get(block=block) # type: Segment
except Empty:
return
+ queue.task_done()
+
logger.debug('reporting segment %s', segment)
s = SegmentObject(
@@ -91,6 +99,4 @@ class KafkaProtocol(Protocol):
yield s
- queue.task_done()
-
self.traces_reporter.report(generator())
diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py
index 5545170..6cef4a6 100644
--- a/skywalking/client/grpc.py
+++ b/skywalking/client/grpc.py
@@ -60,7 +60,7 @@ class GrpcTraceSegmentReportService(TraceSegmentReportService):
self.report_stub = TraceSegmentReportServiceStub(channel)
def report(self, generator):
- self.report_stub.collect(generator, timeout=config.GRPC_TIMEOUT)
+ self.report_stub.collect(generator)
class GrpcProfileTaskChannelService(ProfileTaskChannelService):
diff --git a/skywalking/config.py b/skywalking/config.py
index 8741f49..6e35500 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -22,10 +22,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import List
-# In order to prevent timeouts and possible segment loss make sure QUEUE_TIMEOUT is always at least few seconds lower
-# than GRPC_TIMEOUT.
-GRPC_TIMEOUT = 300 # type: int
-QUEUE_TIMEOUT = 240 # type: int
+QUEUE_TIMEOUT = 1 # type: int
RE_IGNORE_PATH = re.compile('^$') # type: re.Pattern
@@ -41,6 +38,7 @@ protocol = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower() # type: str
authentication = os.getenv('SW_AGENT_AUTHENTICATION') # type: str
logging_level = os.getenv('SW_AGENT_LOGGING_LEVEL') or 'INFO' # type: str
disable_plugins = (os.getenv('SW_AGENT_DISABLE_PLUGINS') or '').split(',') # type: List[str]
+max_buffer_size = int(os.getenv('SW_AGENT_MAX_BUFFER_SIZE', '1000')) # type: int
sql_parameters_length = int(os.getenv('SW_SQL_PARAMETERS_LENGTH') or '0') # type: int
pymongo_trace_parameters = True if os.getenv('SW_PYMONGO_TRACE_PARAMETERS') and \
os.getenv('SW_PYMONGO_TRACE_PARAMETERS') == 'True' else False # type: bool
diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py
index 61b0a23..5d65a6e 100644
--- a/skywalking/trace/context.py
+++ b/skywalking/trace/context.py
@@ -269,4 +269,4 @@ def get_context() -> SpanContext:
if spans:
return spans[len(spans) - 1].context
- return SpanContext() if agent.connected() else NoopContext()
+ return SpanContext()