You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/08/22 04:20:18 UTC
[skywalking-python] branch master updated: Add Profiling function
(#155)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new 97bf7fd Add Profiling function (#155)
97bf7fd is described below
commit 97bf7fd8b63aedfbb37b7035c84445c71b32cd07
Author: Humbert Zhang <50...@qq.com>
AuthorDate: Sun Aug 22 12:20:10 2021 +0800
Add Profiling function (#155)
---
docs/EnvVars.md | 6 +-
skywalking/agent/__init__.py | 51 ++++-
skywalking/agent/protocol/__init__.py | 6 +
skywalking/agent/protocol/grpc.py | 40 +++-
skywalking/client/__init__.py | 3 +
skywalking/client/grpc.py | 20 +-
skywalking/command/command_service.py | 20 +-
.../executors/profile_task_command_executor.py | 7 +-
skywalking/config.py | 12 +-
skywalking/profile/__init__.py | 12 +-
skywalking/profile/profile_context.py | 223 +++++++++++++++++++++
skywalking/profile/profile_service.py | 208 +++++++++++++++++++
.../__init__.py => profile/profile_status.py} | 39 ++--
skywalking/profile/profile_task.py | 2 +
.../profile/profile_task_execution_service.py | 101 ----------
skywalking/profile/snapshot.py | 44 ++++
skywalking/trace/context.py | 21 ++
skywalking/utils/array.py | 63 ++++++
.../{client/__init__.py => utils/atomic_ref.py} | 37 ++--
.../{profile/__init__.py => utils/integer.py} | 18 +-
skywalking/{profile/__init__.py => utils/time.py} | 6 +-
21 files changed, 770 insertions(+), 169 deletions(-)
diff --git a/docs/EnvVars.md b/docs/EnvVars.md
index 9416534..1fddacf 100644
--- a/docs/EnvVars.md
+++ b/docs/EnvVars.md
@@ -31,8 +31,12 @@ Environment Variable | Description | Default
| `SW_KAFKA_REPORTER_TOPIC_LOG` | Specifying Kafka topic name for Log data. | `skywalking-logs` |
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
-| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
+| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `True` |
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
+| `SW_AGENT_PROFILE_MAX_PARALLEL` | The number of parallel monitor segment count. | `5` |
+| `SW_AGENT_PROFILE_DURATION` | The maximum monitor segment time(minutes), if current segment monitor time out of limit, then stop it. | `10` |
+| `SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH` | The number of max dump thread stack depth | `500` |
+| `SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE` | The number of snapshot transport to backend buffer size | `50` |
| `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
| `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
| `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 97cd77c..f439f8d 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -27,8 +27,10 @@ from skywalking import config, plugins
from skywalking import loggings
from skywalking.agent.protocol import Protocol
from skywalking.command import command_service
-from skywalking.config import profile_active, profile_task_query_interval
from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
if TYPE_CHECKING:
from skywalking.trace.context import Segment
@@ -36,7 +38,7 @@ if TYPE_CHECKING:
__started = False
__protocol = Protocol() # type: Protocol
__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \
- = __queue = __log_queue = __finished = None
+ = __send_profile_thread = __queue = __log_queue = __snapshot_queue = __finished = None
def __heartbeat():
@@ -69,6 +71,16 @@ def __report_log():
__finished.wait(0)
+def __send_profile_snapshot():
+ while not __finished.is_set():
+ try:
+ __protocol.send_snapshot(__snapshot_queue)
+ except Exception as exc:
+ logger.error(str(exc))
+
+ __finished.wait(0.5)
+
+
def __query_profile_command():
while not __finished.is_set():
try:
@@ -76,7 +88,7 @@ def __query_profile_command():
except Exception as exc:
logger.error(str(exc))
- __finished.wait(profile_task_query_interval)
+ __finished.wait(config.get_profile_task_interval)
def __command_dispatch():
@@ -86,13 +98,12 @@ def __command_dispatch():
def __init_threading():
global __heartbeat_thread, __report_thread, __log_report_thread, __query_profile_thread, \
- __command_dispatch_thread, __queue, __log_queue, __finished
+ __command_dispatch_thread, __send_profile_thread, __queue, __log_queue, __snapshot_queue, __finished
__queue = Queue(maxsize=config.max_buffer_size)
__finished = Event()
__heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
__report_thread = Thread(name='ReportThread', target=__report, daemon=True)
- __query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
__command_dispatch_thread = Thread(name="CommandDispatchThread", target=__command_dispatch, daemon=True)
__heartbeat_thread.start()
@@ -104,13 +115,18 @@ def __init_threading():
__log_report_thread = Thread(name='LogReportThread', target=__report_log, daemon=True)
__log_report_thread.start()
- if profile_active:
+ if config.profile_active:
+ __snapshot_queue = Queue(maxsize=config.profile_snapshot_transport_buffer_size)
+
+ __query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
__query_profile_thread.start()
+ __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=__send_profile_snapshot, daemon=True)
+ __send_profile_thread.start()
+
def __init():
global __protocol
-
if config.protocol == 'grpc':
from skywalking.agent.protocol.grpc import GrpcProtocol
__protocol = GrpcProtocol()
@@ -132,9 +148,15 @@ def __init():
def __fini():
__protocol.report(__queue, False)
__queue.join()
+
if config.log_reporter_active:
__protocol.report_log(__log_queue, False)
__log_queue.join()
+
+ if config.profile_active:
+ __protocol.send_snapshot(__snapshot_queue, False)
+ __snapshot_queue.join()
+
__finished.set()
@@ -175,6 +197,7 @@ def start():
loggings.init()
config.finalize()
+ profile.init()
__init()
@@ -210,3 +233,17 @@ def archive_log(log_data: 'LogData'):
__log_queue.put(log_data, block=False)
except Full:
logger.warning('the queue is full, the log will be abandoned')
+
+
+def add_profiling_snapshot(snapshot: TracingThreadSnapshot):
+ try:
+ __snapshot_queue.put(snapshot)
+ except Full:
+ logger.warning('the snapshot queue is full, the snapshot will be abandoned')
+
+
+def notify_profile_finish(task: ProfileTask):
+ try:
+ __protocol.notify_profile_task_finish(task)
+ except Exception as e:
+ logger.error("notify profile task finish to backend fail. " + str(e))
diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py
index efc2fa4..24d4c3d 100644
--- a/skywalking/agent/protocol/__init__.py
+++ b/skywalking/agent/protocol/__init__.py
@@ -40,3 +40,9 @@ class Protocol(ABC):
def query_profile_commands(self):
pass
+
+ def send_snapshot(self, queue: Queue, block: bool = True):
+ pass
+
+ def notify_profile_task_finish(self, task):
+ pass
diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py
index 468ef5a..7ad0a8e 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -24,6 +24,7 @@ import grpc
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
from skywalking.protocol.logging.Logging_pb2 import LogData
+from skywalking.protocol.profile.Profile_pb2 import ThreadSnapshot, ThreadStack
from skywalking import config
from skywalking.agent import Protocol
@@ -32,6 +33,8 @@ from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegment
GrpcProfileTaskChannelService, GrpcLogDataReportService
from skywalking.loggings import logger
from skywalking.trace.segment import Segment
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.profile.profile_task import ProfileTask
class GrpcProtocol(Protocol):
@@ -52,7 +55,7 @@ class GrpcProtocol(Protocol):
self.channel.subscribe(self._cb, try_to_connect=True)
self.service_management = GrpcServiceManagementClient(self.channel)
self.traces_reporter = GrpcTraceSegmentReportService(self.channel)
- self.profile_query = GrpcProfileTaskChannelService(self.channel)
+ self.profile_channel = GrpcProfileTaskChannelService(self.channel)
self.log_reporter = GrpcLogDataReportService(self.channel)
def _cb(self, state):
@@ -61,7 +64,10 @@ class GrpcProtocol(Protocol):
def query_profile_commands(self):
logger.debug("query profile commands")
- self.profile_query.do_query()
+ self.profile_channel.do_query()
+
+ def notify_profile_task_finish(self, task: ProfileTask):
+ self.profile_channel.finish(task)
def heartbeat(self):
try:
@@ -163,3 +169,33 @@ class GrpcProtocol(Protocol):
self.log_reporter.report(generator())
except grpc.RpcError:
self.on_error()
+
+ def send_snapshot(self, queue: Queue, block: bool = True):
+ start = time()
+
+ def generator():
+ while True:
+ try:
+ timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
+ if timeout <= 0:
+ return
+ snapshot = queue.get(block=block, timeout=timeout) # type: TracingThreadSnapshot
+ except Empty:
+ return
+
+ queue.task_done()
+
+ transform_snapshot = ThreadSnapshot(
+ taskId=str(snapshot.task_id),
+ traceSegmentId=str(snapshot.trace_segment_id),
+ time=int(snapshot.time),
+ sequence=int(snapshot.sequence),
+ stack=ThreadStack(codeSignatures=snapshot.stack_list)
+ )
+
+ yield transform_snapshot
+
+ try:
+ self.profile_channel.send(generator())
+ except grpc.RpcError:
+ self.on_error()
diff --git a/skywalking/client/__init__.py b/skywalking/client/__init__.py
index 007e0bc..562e3a4 100644
--- a/skywalking/client/__init__.py
+++ b/skywalking/client/__init__.py
@@ -37,3 +37,6 @@ class LogDataReportService(object):
class ProfileTaskChannelService(object):
def do_query(self):
raise NotImplementedError()
+
+ def send(self, generator):
+ raise NotImplementedError()
diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py
index 4c1fe31..5ffc24a 100644
--- a/skywalking/client/grpc.py
+++ b/skywalking/client/grpc.py
@@ -18,11 +18,11 @@
import grpc
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub
+from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
+from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery, ProfileTaskFinishReport
from skywalking.protocol.logging.Logging_pb2_grpc import LogReportServiceStub
from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
from skywalking.protocol.management.Management_pb2_grpc import ManagementServiceStub
-from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery
-from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
from skywalking import config
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService, \
@@ -30,6 +30,7 @@ from skywalking.client import ServiceManagementClient, TraceSegmentReportService
from skywalking.command import command_service
from skywalking.loggings import logger
from skywalking.profile import profile_task_execution_service
+from skywalking.profile.profile_task import ProfileTask
class GrpcServiceManagementClient(ServiceManagementClient):
@@ -73,7 +74,7 @@ class GrpcLogDataReportService(LogDataReportService):
class GrpcProfileTaskChannelService(ProfileTaskChannelService):
def __init__(self, channel: grpc.Channel):
- self.task_stub = ProfileTaskStub(channel)
+ self.profile_stub = ProfileTaskStub(channel)
def do_query(self):
@@ -83,5 +84,16 @@ class GrpcProfileTaskChannelService(ProfileTaskChannelService):
lastCommandTime=profile_task_execution_service.get_last_command_create_time()
)
- commands = self.task_stub.getProfileTaskCommands(query)
+ commands = self.profile_stub.getProfileTaskCommands(query)
command_service.receive_command(commands)
+
+ def send(self, generator):
+ self.profile_stub.collectSnapshot(generator)
+
+ def finish(self, task: ProfileTask):
+ finish_report = ProfileTaskFinishReport(
+ service=config.service_name,
+ serviceInstance=config.service_instance,
+ taskId=task.task_id
+ )
+ self.profile_stub.reportTaskFinish(finish_report)
diff --git a/skywalking/command/command_service.py b/skywalking/command/command_service.py
index e1d4bbf..8f9244e 100644
--- a/skywalking/command/command_service.py
+++ b/skywalking/command/command_service.py
@@ -30,38 +30,38 @@ from skywalking.loggings import logger
class CommandService:
def __init__(self):
- self.__commands = queue.Queue() # type: queue.Queue
+ self._commands = queue.Queue() # type: queue.Queue
# don't execute same command twice
- self.__command_serial_number_cache = CommandSerialNumberCache()
+ self._command_serial_number_cache = CommandSerialNumberCache()
def dispatch(self):
while True:
# block until a command is available
- command = self.__commands.get() # type: BaseCommand
+ command = self._commands.get() # type: BaseCommand
if not self.__is_command_executed(command):
command_executor_service.execute(command)
- self.__command_serial_number_cache.add(command.serial_number)
+ self._command_serial_number_cache.add(command.serial_number)
def __is_command_executed(self, command: BaseCommand):
- return self.__command_serial_number_cache.contains(command.serial_number)
+ return self._command_serial_number_cache.contains(command.serial_number)
def receive_command(self, commands: Commands):
for command in commands.commands:
try:
base_command = CommandDeserializer.deserialize(command)
- logger.debug("Received command [{%s} {%s}]", base_command.command, base_command.serial_number)
+ logger.debug("received command [{%s} {%s}]", base_command.command, base_command.serial_number)
if self.__is_command_executed(base_command):
- logger.warning("Command[{%s}] is executed, ignored.", base_command.command)
+ logger.warning("command[{%s}] is executed, ignored.", base_command.command)
continue
try:
- self.__commands.put(base_command)
+ self._commands.put(base_command)
except queue.Full:
- logger.warning("Command[{%s}, {%s}] cannot add to command list. because the command list is full.",
+ logger.warning("command[{%s}, {%s}] cannot add to command list. because the command list is full.",
base_command.command, base_command.serial_number)
except UnsupportedCommandException as e:
- logger.warning("Received unsupported command[{%s}].", e.command.command)
+ logger.warning("received unsupported command[{%s}].", e.command.command)
class CommandSerialNumberCache:
diff --git a/skywalking/command/executors/profile_task_command_executor.py b/skywalking/command/executors/profile_task_command_executor.py
index 53fa5aa..2134d99 100644
--- a/skywalking/command/executors/profile_task_command_executor.py
+++ b/skywalking/command/executors/profile_task_command_executor.py
@@ -17,16 +17,13 @@
from skywalking.command.executors.command_executor import CommandExecutor
from skywalking.command.profile_task_command import ProfileTaskCommand
-from skywalking.loggings import logger
-from skywalking.profile import profile_task_execution_service
+from skywalking import profile
from skywalking.profile.profile_task import ProfileTask
class ProfileTaskCommandExecutor(CommandExecutor):
def execute(self, command: ProfileTaskCommand):
- logger.debug("ProfileTaskCommandExecutor start execute ProfileTaskCommand [{%s}]", command.serial_number)
-
profile_task = ProfileTask(task_id=command.task_id,
first_span_op_name=command.endpoint_name,
duration=command.duration,
@@ -36,4 +33,4 @@ class ProfileTaskCommandExecutor(CommandExecutor):
start_time=command.start_time,
create_time=command.create_time)
- profile_task_execution_service.add_profile_task(profile_task)
+ profile.profile_task_execution_service.add_profile_task(profile_task)
diff --git a/skywalking/config.py b/skywalking/config.py
index 0123f3b..041a6a8 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -64,9 +64,15 @@ kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "sky
kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments" # type: str
kafka_topic_log = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or "skywalking-logs" # type: str
celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
-profile_active = True if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
- os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False # type: bool
-profile_task_query_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20')
+
+# profile configs
+get_profile_task_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20') # type: int
+profile_active = False if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
+ os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'False' else True # type: bool
+profile_max_parallel = int(os.getenv("SW_AGENT_PROFILE_MAX_PARALLEL") or '5') # type: int
+profile_duration = int(os.getenv('SW_AGENT_PROFILE_DURATION') or '10') # type: int
+profile_dump_max_stack_depth = int(os.getenv('SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH') or '500') # type: int
+profile_snapshot_transport_buffer_size = int(os.getenv('SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE') or '50')
# NOTE - Log reporting requires a separate channel, will merge in the future.
log_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \
diff --git a/skywalking/profile/__init__.py b/skywalking/profile/__init__.py
index fdf00ab..b160441 100644
--- a/skywalking/profile/__init__.py
+++ b/skywalking/profile/__init__.py
@@ -15,6 +15,14 @@
# limitations under the License.
#
-from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService
+profile_task_execution_service = None
-profile_task_execution_service = ProfileTaskExecutionService()
+
+def init():
+ from skywalking.profile.profile_service import ProfileTaskExecutionService
+
+ global profile_task_execution_service
+ if profile_task_execution_service:
+ return
+
+ profile_task_execution_service = ProfileTaskExecutionService()
diff --git a/skywalking/profile/profile_context.py b/skywalking/profile/profile_context.py
new file mode 100644
index 0000000..1584c27
--- /dev/null
+++ b/skywalking/profile/profile_context.py
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+ def __init__(self, task: ProfileTask):
+ self.task = task # type: ProfileTask
+ self._current_profiling_cnt = AtomicInteger(var=0)
+ self._total_started_profiling_cnt = AtomicInteger(var=0)
+ self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+ self._profiling_thread = None # type: Optional[Thread]
+ self._profiling_stop_event = None # type: Optional[Event]
+
+ def start_profiling(self):
+ profile_thread = ProfileThread(self)
+ self._profiling_stop_event = Event()
+
+ self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+ self._profiling_thread.start()
+
+ def stop_profiling(self):
+ if self._profiling_thread is not None and self._profiling_stop_event is not None:
+ self._profiling_stop_event.set()
+
+ def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+ ProfileStatusReference:
+ """
+ check have available slot to profile and add it
+ """
+
+ # check has available slot
+ using_slot_cnt = self._current_profiling_cnt.get()
+ if using_slot_cnt >= config.profile_max_parallel:
+ return ProfileStatusReference.create_with_none()
+
+ # check first operation name matches
+ if not self.task.first_span_op_name == first_span_opname:
+ return ProfileStatusReference.create_with_none()
+
+ # if out limit started profiling count then stop add profiling
+ if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+ return ProfileStatusReference.create_with_none()
+
+ # try to occupy slot
+ if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+ using_slot_cnt + 1):
+ return ProfileStatusReference.create_with_none()
+
+ thread_profiler = ThreadProfiler(trace_context=trace_context,
+ segment_id=segment_id,
+ profiling_thread=current_thread(),
+ profile_context=self)
+
+ slot_length = self.profiling_segment_slots.length()
+ for idx in range(slot_length):
+ # occupy slot success
+ if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+ return thread_profiler.profile_status
+
+ return ProfileStatusReference.create_with_none()
+
+ def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+ if trace_context.profile_status.is_being_watched():
+ return
+
+ # if first_span_opname was changed by other plugin, there can start profile as well
+ trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+ segment_id,
+ first_span_opname).get())
+
+ def stop_tracing_profile(self, trace_context: SpanContext):
+ """
+ find tracing context and clear on slot
+ """
+ for idx, profiler in enumerate(self.profiling_segment_slots):
+ if profiler and profiler.matches(trace_context):
+ self.profiling_segment_slots.set(idx, None)
+ profiler.stop_profiling()
+ self._current_profiling_cnt.add_and_get(-1)
+ break
+
+ def is_start_profileable(self):
+ return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+ def __init__(self, context: ProfileTaskExecutionContext):
+ self._task_execution_context = context
+ self._task_execution_service = profile.profile_task_execution_service
+ self._stop_event = None # type: Optional[Event]
+
+ def start(self, stop_event: Event):
+ self._stop_event = stop_event
+
+ try:
+ self.profiling(self._task_execution_context)
+ except Exception as e:
+ logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+ finally:
+ self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+ def profiling(self, context: ProfileTaskExecutionContext):
+ max_sleep_period = context.task.thread_dump_period
+
+ while not self._stop_event.is_set():
+ current_loop_start_time = current_milli_time()
+ profilers = self._task_execution_context.profiling_segment_slots
+
+ for profiler in profilers: # type: ThreadProfiler
+ if profiler is None:
+ continue
+
+ if profiler.profile_status.get() is ProfileStatus.PENDING:
+ profiler.start_profiling_if_need()
+ elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+ snapshot = profiler.build_snapshot()
+ if snapshot is not None:
+ agent.add_profiling_snapshot(snapshot)
+ else:
+ # tell execution context current tracing thread dump failed, stop it
+ context.stop_tracing_profile(profiler.trace_context)
+
+ need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+ if not need_sleep > 0:
+ need_sleep = max_sleep_period
+
+ # convert to float second
+ time.sleep(need_sleep / 1000)
+
+
+class ThreadProfiler:
+ def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+ profile_context: ProfileTaskExecutionContext):
+ self.trace_context = trace_context
+ self._segment_id = segment_id
+ self._profiling_thread = profiling_thread
+ self._profile_context = profile_context
+ self._profile_start_time = -1
+ self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+ self.dump_sequence = 0
+
+ if trace_context.profile_status is None:
+ self.profile_status = ProfileStatusReference.create_with_pending()
+ else:
+ self.profile_status = trace_context.profile_status # type: ProfileStatusReference
+ self.profile_status.update_status(ProfileStatus.PENDING)
+
+ def start_profiling_if_need(self):
+ if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+ self._profile_start_time = current_milli_time()
+ self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+ def stop_profiling(self):
+ self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+ def build_snapshot(self) -> Optional[TracingThreadSnapshot]:
+ if not self._profiling_thread.is_alive():
+ return None
+
+ current_time = current_milli_time()
+
+ stack_list = []
+
+ # get thread stack of target thread
+ stack = sys._current_frames().get(int(self._profiling_thread.ident))
+ if not stack:
+ return None
+
+ extracted = traceback.extract_stack(stack)
+ for idx, item in enumerate(extracted):
+ if idx > config.profile_dump_max_stack_depth:
+ break
+
+ code_sig = f"{item.filename}.{item.name}: {item.lineno}"
+ stack_list.append(code_sig)
+
+ # if is first dump, check is can start profiling
+ if self.dump_sequence == 0 and not self._profile_context.is_start_profileable():
+ return None
+
+ t = TracingThreadSnapshot(self._profile_context.task.task_id,
+ self._segment_id,
+ self.dump_sequence,
+ current_time,
+ stack_list)
+ self.dump_sequence += 1
+ return t
+
+ def matches(self, trace_context: SpanContext) -> bool:
+ return self.trace_context == trace_context
diff --git a/skywalking/profile/profile_service.py b/skywalking/profile/profile_service.py
new file mode 100644
index 0000000..f3b6c8f
--- /dev/null
+++ b/skywalking/profile/profile_service.py
@@ -0,0 +1,208 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from typing import Tuple
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+ @staticmethod
+ def schedule(milliseconds, func, *args, **kwargs):
+ seconds = milliseconds / 1000
+ if seconds < 0:
+ seconds = 0
+
+ t = Timer(seconds, func, *args, **kwargs)
+ t.daemon = True
+ t.start()
+
+
+class ProfileTaskExecutionService:
+ MINUTE_TO_MILLIS = 60000
+
+ def __init__(self):
+ # Queue is thread safe
+ self._profile_task_list = Queue() # type: Queue
+ # queue_lock for making sure complex operation of this profile_task_list is thread safe
+ self.queue_lock = Lock()
+
+ self._last_command_create_time = -1 # type: int
+ # single thread executor
+ self.profile_executor = ThreadPoolExecutor(max_workers=1)
+ self.task_execution_context = AtomicRef(None)
+
+ self.profile_task_scheduler = Scheduler()
+
+ # rlock for process_profile_task and stop_current_profile_task
+ self._rlock = RLock()
+
+ def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+ """
+ Remove a task from profile_task_list in a thread safe state
+ """
+ with self.queue_lock:
+ item_left = []
+ result = False
+
+ while not self._profile_task_list.empty():
+ item = self._profile_task_list.get()
+ if item == task:
+ result = True
+ # not put in item_list for removing it
+ continue
+
+ item_left.append(item)
+
+ for item in item_left:
+ self._profile_task_list.put(item)
+
+ return result
+
+ def get_last_command_create_time(self) -> int:
+ return self._last_command_create_time
+
+ def add_profile_task(self, task: ProfileTask):
+ # update last command create time, which will be used in command query
+ if task.create_time > self._last_command_create_time:
+ self._last_command_create_time = task.create_time
+
+ # check profile task object
+ success, error_reason = self._check_profile_task(task)
+ if not success:
+ logger.warning("check command error, cannot process this profile task. reason: %s", error_reason)
+ return
+
+ # add task to list
+ self._profile_task_list.put(task)
+
+ delay_millis = task.start_time - current_milli_time()
+ # schedule to start task
+ self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+ def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+ execution_context = self.task_execution_context.get() # type: ProfileTaskExecutionContext
+ if execution_context is None:
+ return ProfileStatusReference.create_with_none()
+
+ return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+ def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+ """
+ Re-check current trace need profiling, in case that third-party plugins change the operation name.
+ """
+ execution_context = self.task_execution_context.get() # type: ProfileTaskExecutionContext
+ if execution_context is None:
+ return
+ execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+ # using reentrant lock for process_profile_task and stop_current_profile_task,
+ # to make sure thread safe.
+ def process_profile_task(self, task: ProfileTask):
+ with self._rlock:
+ # make sure prev profile task already stopped
+ self.stop_current_profile_task(self.task_execution_context.get())
+
+ # make stop task schedule and task context
+ current_context = ProfileTaskExecutionContext(task)
+ self.task_execution_context.set(current_context)
+
+ # start profiling this task
+ current_context.start_profiling()
+ logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+ millis = task.duration * self.MINUTE_TO_MILLIS
+ self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
+
+ def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
+ with self._rlock:
+ # need_stop is None or task_execution_context is not need_stop context
+ if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
+ return
+
+ need_stop.stop_profiling()
+ logger.debug("profile task [%s] for endpoint [%s] stopped", need_stop.task.task_id,
+ need_stop.task.first_span_op_name)
+
+ self.remove_from_profile_task_list(need_stop.task)
+
+ # notify profiling task has finished
+ agent.notify_profile_finish(need_stop.task)
+
+ def _check_profile_task(self, task: ProfileTask) -> Tuple[bool, str]:
+ try:
+ # endpoint name
+ if len(task.first_span_op_name) == 0:
+ return (False, "endpoint name [{}] error, "
+ "should be str and not empty".format(task.first_span_op_name))
+ # duration
+ if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE:
+ return (False, "monitor duration must greater"
+ " than {} minutes".format(ProfileConstants.TASK_DURATION_MIN_MINUTE))
+ if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE:
+ return (False, "monitor duration must less"
+ " than {} minutes".format(ProfileConstants.TASK_DURATION_MAX_MINUTE))
+ # min duration threshold
+ if task.min_duration_threshold < 0:
+ return False, "min duration threshold must greater than or equals zero"
+
+ # dump period
+ if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS:
+ return (False, "dump period must be greater than or equals to {}"
+ " milliseconds".format(ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS))
+
+ # max sampling count
+ if task.max_sampling_count <= 0:
+ return False, "max sampling count must greater than zero"
+ if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT:
+ return (False, "max sampling count must less"
+ " than {}".format(ProfileConstants.TASK_MAX_SAMPLING_COUNT))
+
+ # check task queue
+ task_finish_time = self._cal_profile_task_finish_time(task)
+
+ # lock the self._profile_task_list.queue when check the item in it, avoid concurrency errors
+ with self._profile_task_list.mutex:
+ for profile_task in self._profile_task_list.queue: # type: ProfileTask
+ # if the end time of the task to be added is during the execution of any data, means is a error data
+ if task.start_time <= task_finish_time <= self._cal_profile_task_finish_time(profile_task):
+ return (False, "there already have processing task in time range, "
+ "could not add a new task again. processing task monitor "
+ "endpoint name: {}".format(profile_task.first_span_op_name))
+
+ return True, ""
+
+ except TypeError:
+ return False, "ProfileTask attributes has type error"
+
+ def _cal_profile_task_finish_time(self, task: ProfileTask) -> int:
+ return task.start_time + task.duration * self.MINUTE_TO_MILLIS
diff --git a/skywalking/client/__init__.py b/skywalking/profile/profile_status.py
similarity index 51%
copy from skywalking/client/__init__.py
copy to skywalking/profile/profile_status.py
index 007e0bc..f594cff 100644
--- a/skywalking/client/__init__.py
+++ b/skywalking/profile/profile_status.py
@@ -15,25 +15,36 @@
# limitations under the License.
#
+from enum import Enum
-class ServiceManagementClient(object):
- def send_instance_props(self):
- raise NotImplementedError()
- def send_heart_beat(self):
- raise NotImplementedError()
+class ProfileStatus(Enum):
+ NONE = 0
+ PENDING = 1
+ PROFILING = 2
+ STOPPED = 3
-class TraceSegmentReportService(object):
- def report(self, generator):
- raise NotImplementedError()
+class ProfileStatusReference:
+ def __init__(self, status: ProfileStatus):
+ self.status = status
+ @staticmethod
+ def create_with_none():
+ return ProfileStatusReference(ProfileStatus.NONE)
-class LogDataReportService(object):
- def report(self, generator):
- raise NotImplementedError()
+ @staticmethod
+ def create_with_pending():
+ return ProfileStatusReference(ProfileStatus.PENDING)
+ def get(self) -> ProfileStatus:
+ return self.status
-class ProfileTaskChannelService(object):
- def do_query(self):
- raise NotImplementedError()
+ def update_status(self, update: ProfileStatus):
+ self.status = update
+
+ def is_being_watched(self):
+ return self.status is not ProfileStatus.NONE
+
+ def is_profiling(self):
+ return self.status is ProfileStatus.PROFILING
diff --git a/skywalking/profile/profile_task.py b/skywalking/profile/profile_task.py
index a11df51..0776627 100644
--- a/skywalking/profile/profile_task.py
+++ b/skywalking/profile/profile_task.py
@@ -33,7 +33,9 @@ class ProfileTask:
self.task_id = str(task_id) # type: str
self.first_span_op_name = str(first_span_op_name) # type: str
self.duration = int(duration) # type: int
+ # when can start profile after span context created
self.min_duration_threshold = int(min_duration_threshold) # type: int
+ # profile interval
self.thread_dump_period = int(thread_dump_period) # type: int
self.max_sampling_count = int(max_sampling_count) # type: int
self.start_time = int(start_time) # type: int
diff --git a/skywalking/profile/profile_task_execution_service.py b/skywalking/profile/profile_task_execution_service.py
deleted file mode 100644
index 35ab65d..0000000
--- a/skywalking/profile/profile_task_execution_service.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from queue import Queue
-from skywalking.loggings import logger
-from skywalking.profile.profile_constants import ProfileConstants
-from skywalking.profile.profile_task import ProfileTask
-
-
-class ProfileTaskExecutionService:
- MINUTE_TO_MILLIS = 60000
-
- def __init__(self):
- # Queue is thread safe
- self.__profile_task_list = Queue() # type: Queue
- self.__last_command_create_time = -1 # type: int
-
- def get_last_command_create_time(self) -> int:
- return self.__last_command_create_time
-
- def add_profile_task(self, task: ProfileTask):
- # update last command create time, which will be used in command query
- if task.create_time > self.__last_command_create_time:
- self.__last_command_create_time = task.create_time
-
- # check profile task object
- result = self.__check_profile_task(task)
- if not result.success:
- logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
- return
-
- # add task to list
- self.__profile_task_list.put(task)
-
- class CheckResult:
- def __init__(self, success: bool, error_reason: str):
- self.success = success # type: bool
- self.error_reason = error_reason # type: str
-
- def __check_profile_task(self, task: ProfileTask) -> CheckResult:
- try:
- # endpoint name
- if len(task.first_span_op_name) == 0:
- return self.CheckResult(False, "endpoint name [{}] error, "
- "should be str and not empty".format(task.first_span_op_name))
- # duration
- if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE:
- return self.CheckResult(False, "monitor duration must greater"
- " than {} minutes".format(ProfileConstants.TASK_DURATION_MIN_MINUTE))
- if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE:
- return self.CheckResult(False, "monitor duration must less"
- " than {} minutes".format(ProfileConstants.TASK_DURATION_MAX_MINUTE))
- # min duration threshold
- if task.min_duration_threshold < 0:
- return self.CheckResult(False, "min duration threshold must greater than or equals zero")
-
- # dump period
- if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS:
- return self.CheckResult(False, "dump period must be greater than or equals to {}"
- " milliseconds".format(ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS))
-
- # max sampling count
- if task.max_sampling_count <= 0:
- return self.CheckResult(False, "max sampling count must greater than zero")
- if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT:
- return self.CheckResult(False, "max sampling count must less"
- " than {}".format(ProfileConstants.TASK_MAX_SAMPLING_COUNT))
-
- # check task queue
- task_finish_time = self.__cal_profile_task_finish_time(task)
-
- # lock the self.__profile_task_list.queue when check the item in it, avoid concurrency errors
- with self.__profile_task_list.mutex:
- for profile_task in self.__profile_task_list.queue: # type: ProfileTask
- # if the end time of the task to be added is during the execution of any data, means is a error data
- if task.start_time <= task_finish_time <= self.__cal_profile_task_finish_time(profile_task):
- return self.CheckResult(False, "there already have processing task in time range, "
- "could not add a new task again. processing task monitor "
- "endpoint name: {}".format(profile_task.first_span_op_name))
-
- return self.CheckResult(True, "")
-
- except TypeError:
- return self.CheckResult(False, "ProfileTask attributes has type error")
-
- def __cal_profile_task_finish_time(self, task: ProfileTask) -> int:
- return task.start_time + task.duration * self.MINUTE_TO_MILLIS
diff --git a/skywalking/profile/snapshot.py b/skywalking/profile/snapshot.py
new file mode 100644
index 0000000..b3a8212
--- /dev/null
+++ b/skywalking/profile/snapshot.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import List
+
+from skywalking.protocol.profile.Profile_pb2 import ThreadSnapshot, ThreadStack
+
+
+class TracingThreadSnapshot:
+
+ def __init__(self, task_id: str, trace_segment_id: str, sequence: int, time: int, stack_list: List[str]):
+ self.task_id = task_id
+ self.trace_segment_id = trace_segment_id
+ self.sequence = sequence
+ self.time = time
+ self.stack_list = stack_list
+
+ def transform(self) -> ThreadSnapshot:
+ code_sigs = [code_sign for code_sign in self.stack_list]
+ stack = ThreadStack(codeSignatures=code_sigs)
+
+ snapshot = ThreadSnapshot(
+ taskId=str(self.task_id),
+ traceSegmentId=str(self.trace_segment_id),
+ time=int(self.time),
+ sequence=int(self.sequence),
+ stack=stack
+ )
+
+ return snapshot
diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py
index cdd57c4..9ffd8e4 100644
--- a/skywalking/trace/context.py
+++ b/skywalking/trace/context.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+from skywalking.profile.profile_status import ProfileStatusReference
from skywalking import Component, agent, config
from skywalking.agent import isfull
from skywalking.trace import ID
@@ -23,6 +24,8 @@ from skywalking.trace.segment import Segment, SegmentRef
from skywalking.trace.snapshot import Snapshot
from skywalking.trace.span import Span, Kind, NoopSpan, EntrySpan, ExitSpan
from skywalking.utils.counter import Counter
+from skywalking.utils.time import current_milli_time
+from skywalking import profile
try: # attempt to use async-local instead of thread-local context and spans
@@ -75,6 +78,8 @@ class SpanContext(object):
self._sid = Counter()
self._correlation = {} # type: dict
self._nspans = 0
+ self.profile_status = None # type: ProfileStatusReference
+ self.create_time = current_milli_time()
def ignore_check(self, op: str, kind: Kind, carrier: 'Carrier' = None):
if config.RE_IGNORE_PATH.match(op) or isfull() or (carrier is not None and carrier.is_suppressed):
@@ -106,7 +111,17 @@ class SpanContext(object):
spans = _spans()
parent = spans[-1] if spans else None # type: Span
+ # start profiling if profile_context is set
+ if self.profile_status is None:
+ self.profile_status = profile.profile_task_execution_service.add_profiling(self,
+ self.segment.segment_id,
+ op)
+
if parent is not None and parent.kind.is_entry and inherit == parent.component:
+ # Span's operation name could be override, recheck here
+ # if the op name now is being profiling, start profile it here
+ self.profiling_recheck(parent, op)
+
span = parent
span.op = op
else:
@@ -150,6 +165,12 @@ class SpanContext(object):
return span
+ def profiling_recheck(self, span: Span, op_name: str):
+ # only check first span, e.g, first opname is correct.
+ if span.sid != 0:
+ return
+ profile.profile_task_execution_service.profiling_recheck(self, self.segment.segment_id, op_name)
+
def start(self, span: Span):
self._nspans += 1
spans = _spans_dup()
diff --git a/skywalking/utils/array.py b/skywalking/utils/array.py
new file mode 100644
index 0000000..e32e3f9
--- /dev/null
+++ b/skywalking/utils/array.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+
+
+class AtomicArray:
+
+ def __init__(self, length: int):
+ self._length = length
+ self._array = [None] * self._length
+ self._lock = threading.Lock()
+
+ def __getitem__(self, idx):
+ # for iteration
+ with self._lock:
+ return self._array[idx]
+
+ def length(self) -> int:
+ return self._length
+
+ def set(self, idx: int, new_value):
+ if idx < 0 or idx >= self.length():
+ raise IndexError("atomic array assignment index out of range")
+
+ with self._lock:
+ self._array[idx] = new_value
+
+ def get(self, idx: int):
+ if idx < 0 or idx >= self.length():
+ raise IndexError("atomic array assignment index out of range")
+
+ with self._lock:
+ return self._array[idx]
+
+ def compare_and_set(self, idx: int, expect, update) -> bool:
+ """
+ Atomically sets the value of array to the given updated value if the current value == the expected value
+ :return: return True if success, False if the actual value was not equal to the expected value.
+ """
+ if idx < 0 or idx >= self.length():
+ raise IndexError("atomic array assignment index out of range")
+
+ with self._lock:
+ if self._array[idx] == expect:
+ self._array[idx] = update
+ return True
+
+ return False
diff --git a/skywalking/client/__init__.py b/skywalking/utils/atomic_ref.py
similarity index 52%
copy from skywalking/client/__init__.py
copy to skywalking/utils/atomic_ref.py
index 007e0bc..e938b5f 100644
--- a/skywalking/client/__init__.py
+++ b/skywalking/utils/atomic_ref.py
@@ -15,25 +15,30 @@
# limitations under the License.
#
+import threading
-class ServiceManagementClient(object):
- def send_instance_props(self):
- raise NotImplementedError()
- def send_heart_beat(self):
- raise NotImplementedError()
+class AtomicRef:
+ def __init__(self, var):
+ self._lock = threading.Lock()
+ self._var = var
+ def get(self):
+ with self._lock:
+ return self._var
-class TraceSegmentReportService(object):
- def report(self, generator):
- raise NotImplementedError()
+ def set(self, new_var):
+ with self._lock:
+ self._var = new_var
+ def compare_and_set(self, expect, update) -> bool:
+ """
+ Atomically sets the value to the given updated value if the current value == the expected value
-class LogDataReportService(object):
- def report(self, generator):
- raise NotImplementedError()
-
-
-class ProfileTaskChannelService(object):
- def do_query(self):
- raise NotImplementedError()
+ :return: return True if success, False if the actual value was not equal to the expected value.
+ """
+ with self._lock:
+ if self._var == expect:
+ self._var = update
+ return True
+ return False
diff --git a/skywalking/profile/__init__.py b/skywalking/utils/integer.py
similarity index 64%
copy from skywalking/profile/__init__.py
copy to skywalking/utils/integer.py
index fdf00ab..5c15005 100644
--- a/skywalking/profile/__init__.py
+++ b/skywalking/utils/integer.py
@@ -15,6 +15,20 @@
# limitations under the License.
#
-from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService
+from skywalking.utils.atomic_ref import AtomicRef
-profile_task_execution_service = ProfileTaskExecutionService()
+
+class AtomicInteger(AtomicRef):
+ def __init__(self, var: int):
+ super().__init__(var)
+
+ def add_and_get(self, delta: int):
+ """
+ Atomically adds the given value to the current value.
+
+ :param delta: the value to add
+ :return: the updated value
+ """
+ with self._lock:
+ self._var += delta
+ return self._var
diff --git a/skywalking/profile/__init__.py b/skywalking/utils/time.py
similarity index 83%
copy from skywalking/profile/__init__.py
copy to skywalking/utils/time.py
index fdf00ab..faa70c8 100644
--- a/skywalking/profile/__init__.py
+++ b/skywalking/utils/time.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
-from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService
+import time
-profile_task_execution_service = ProfileTaskExecutionService()
+
+def current_milli_time():
+ return round(time.time() * 1000)