You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/08/22 02:59:07 UTC

[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #155: Add Profile function

kezhenxu94 commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693429890



##########
File path: skywalking/profile/profile_service.py
##########
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        result = self._check_profile_task(task)
+        if not result.success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
+
+    def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
+        with self._rlock:
+            # need_stop is None or task_execution_context is not need_stop context
+            if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
+                return
+
+            need_stop.stop_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] stoped", need_stop.task.task_id,
+                         need_stop.task.first_span_op_name)
+
+            self.remove_from_profile_task_list(need_stop.task)
+
+            # notify profiling task has finished
+            agent.notify_profile_finish(need_stop.task)
+
+    class CheckResult:
+        def __init__(self, success: bool, error_reason: str):
+            self.success = success  # type: bool
+            self.error_reason = error_reason  # type: str

Review comment:
       Will simply using a tuple `bool, str` make codes more simpler (and Pythonic)? Ignore me if you want to keep it as is

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Optional[Thread]
+        self._profiling_stop_event = None  # type: Optional[Event]
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Optional[Event]
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)

Review comment:
       ```suggestion
               time.sleep(need_sleep / 1000)
   ```

##########
File path: skywalking/profile/profile_service.py
##########
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        result = self._check_profile_task(task)
+        if not result.success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])

Review comment:
       We should not always schedule after `task.duration` minutes, right? If the agent restarts, the task will be stopped after `task.duration` minutes after the agent restarted, even at that time the duration may be passed.
   
   We should check whether, for example, `current_time_millis() - task.start_time > task.duration`?

##########
File path: skywalking/profile/profile_service.py
##########
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        result = self._check_profile_task(task)
+        if not result.success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
+
+    def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
+        with self._rlock:
+            # need_stop is None or task_execution_context is not need_stop context
+            if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
+                return
+
+            need_stop.stop_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] stoped", need_stop.task.task_id,

Review comment:
       ```suggestion
               logger.debug("profile task [%s] for endpoint [%s] stopped", need_stop.task.task_id,
   ```

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Optional[Thread]
+        self._profiling_stop_event = None  # type: Optional[Event]
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):

Review comment:
       ```suggestion
                                                              using_slot_cnt + 1):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org