You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/08/21 06:30:42 UTC

[GitHub] [skywalking-python] Humbertzhang opened a new pull request #155: Add Profile function

Humbertzhang opened a new pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155


   This PR is the rest part of https://github.com/apache/skywalking/issues/5943 following https://github.com/apache/skywalking-python/pull/127 .
   
   It added the profile function into Python Agent, I tested it in normal and concurrent scenario and it works fine. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] Humbertzhang commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
Humbertzhang commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693316243



##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] Humbertzhang commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
Humbertzhang commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693317839



##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Event
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)
+
+
+class ThreadProfiler:
+    def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+                 profile_context: ProfileTaskExecutionContext):
+        self.trace_context = trace_context
+        self._segment_id = segment_id
+        self._profiling_thread = profiling_thread
+        self._profile_context = profile_context
+        self._profile_start_time = -1
+        self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+        self.dump_sequence = 0
+
+        if trace_context.profile_status is None:
+            self.profile_status = ProfileStatusReference.create_with_pending()
+        else:
+            self.profile_status = trace_context.profile_status  # type: ProfileStatusReference
+            self.profile_status.update_status(ProfileStatus.PENDING)
+
+    def start_profiling_if_need(self):
+        if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+            self._profile_start_time = current_milli_time()
+            self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+    def stop_profiling(self):
+        self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+    def build_snapshot(self) -> TracingThreadSnapshot:
+        if not self._profiling_thread.is_alive():
+            return None
+
+        current_time = current_milli_time()
+
+        stack_list = []
+
+        # get thread stack of target thread
+        stack = sys._current_frames().get(self._profiling_thread.ident)
+        if not stack:
+            return None
+
+        extracted = traceback.extract_stack(stack)
+        for idx, item in enumerate(extracted):
+            if idx > config.profile_dump_max_stack_depth:
+                break
+
+            code_sig = f"{item.filename}.{item.name}: {item.lineno}"
+            stack_list.append(code_sig)
+
+        # if is first dump, check is can start profiling
+        if self.dump_sequence == 0 and not self._profile_context.is_start_profileable():
+            return None

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#issuecomment-903211203


   @Humbertzhang please consider adding tests for this feature later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] Humbertzhang commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
Humbertzhang commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693434221



##########
File path: skywalking/profile/profile_service.py
##########
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        result = self._check_profile_task(task)
+        if not result.success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
+
+    def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
+        with self._rlock:
+            # need_stop is None or task_execution_context is not need_stop context
+            if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
+                return
+
+            need_stop.stop_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] stoped", need_stop.task.task_id,
+                         need_stop.task.first_span_op_name)
+
+            self.remove_from_profile_task_list(need_stop.task)
+
+            # notify profiling task has finished
+            agent.notify_profile_finish(need_stop.task)
+
+    class CheckResult:
+        def __init__(self, success: bool, error_reason: str):
+            self.success = success  # type: bool
+            self.error_reason = error_reason  # type: str

Review comment:
       I will modify here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] sonatype-lift[bot] commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693316483



##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Optional[Thread]
+        self._profiling_stop_event = None  # type: Optional[Event]
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Optional[Event]
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)
+
+
+class ThreadProfiler:
+    def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+                 profile_context: ProfileTaskExecutionContext):
+        self.trace_context = trace_context
+        self._segment_id = segment_id
+        self._profiling_thread = profiling_thread
+        self._profile_context = profile_context
+        self._profile_start_time = -1
+        self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+        self.dump_sequence = 0
+
+        if trace_context.profile_status is None:
+            self.profile_status = ProfileStatusReference.create_with_pending()
+        else:
+            self.profile_status = trace_context.profile_status  # type: ProfileStatusReference
+            self.profile_status.update_status(ProfileStatus.PENDING)
+
+    def start_profiling_if_need(self):
+        if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+            self._profile_start_time = current_milli_time()
+            self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+    def stop_profiling(self):
+        self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+    def build_snapshot(self) -> Optional[TracingThreadSnapshot]:
+        if not self._profiling_thread.is_alive():
+            return None
+
+        current_time = current_milli_time()
+
+        stack_list = []
+
+        # get thread stack of target thread
+        stack = sys._current_frames().get(int(self._profiling_thread.ident))

Review comment:
       *Incompatible parameter type:*  Expected `typing.Union[_SupportsIndex, _SupportsTrunc, bytes, str, typing.SupportsInt]` for 1st positional only parameter to call `int.__new__` but got `Optional[int]`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693429890



##########
File path: skywalking/profile/profile_service.py
##########
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        result = self._check_profile_task(task)
+        if not result.success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
+
+    def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
+        with self._rlock:
+            # need_stop is None or task_execution_context is not need_stop context
+            if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
+                return
+
+            need_stop.stop_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] stoped", need_stop.task.task_id,
+                         need_stop.task.first_span_op_name)
+
+            self.remove_from_profile_task_list(need_stop.task)
+
+            # notify profiling task has finished
+            agent.notify_profile_finish(need_stop.task)
+
+    class CheckResult:
+        def __init__(self, success: bool, error_reason: str):
+            self.success = success  # type: bool
+            self.error_reason = error_reason  # type: str

Review comment:
       Will simply using a tuple `bool, str` make codes more simpler (and Pythonic)? Ignore me if you want to keep it as is

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Optional[Thread]
+        self._profiling_stop_event = None  # type: Optional[Event]
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Optional[Event]
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)

Review comment:
       ```suggestion
               time.sleep(need_sleep / 1000)
   ```

##########
File path: skywalking/profile/profile_service.py
##########
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        result = self._check_profile_task(task)
+        if not result.success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])

Review comment:
       We should not always schedule after `task.duration` minutes, right? If the agent restarts, the task will be stopped after `task.duration` minutes after the agent restarted, even at that time the duration may be passed.
   
   We should check whether, for example, `current_time_millis() - task.start_time > task.duration`?

##########
File path: skywalking/profile/profile_service.py
##########
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        result = self._check_profile_task(task)
+        if not result.success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
+
+    def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
+        with self._rlock:
+            # need_stop is None or task_execution_context is not need_stop context
+            if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
+                return
+
+            need_stop.stop_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] stoped", need_stop.task.task_id,

Review comment:
       ```suggestion
               logger.debug("profile task [%s] for endpoint [%s] stopped", need_stop.task.task_id,
   ```

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Optional[Thread]
+        self._profiling_stop_event = None  # type: Optional[Event]
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):

Review comment:
       ```suggestion
                                                              using_slot_cnt + 1):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] Humbertzhang commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
Humbertzhang commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693324527



##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Optional[Thread]
+        self._profiling_stop_event = None  # type: Optional[Event]
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Optional[Event]
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)
+
+
+class ThreadProfiler:
+    def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+                 profile_context: ProfileTaskExecutionContext):
+        self.trace_context = trace_context
+        self._segment_id = segment_id
+        self._profiling_thread = profiling_thread
+        self._profile_context = profile_context
+        self._profile_start_time = -1
+        self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+        self.dump_sequence = 0
+
+        if trace_context.profile_status is None:
+            self.profile_status = ProfileStatusReference.create_with_pending()
+        else:
+            self.profile_status = trace_context.profile_status  # type: ProfileStatusReference
+            self.profile_status.update_status(ProfileStatus.PENDING)
+
+    def start_profiling_if_need(self):
+        if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+            self._profile_start_time = current_milli_time()
+            self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+    def stop_profiling(self):
+        self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+    def build_snapshot(self) -> Optional[TracingThreadSnapshot]:
+        if not self._profiling_thread.is_alive():
+            return None
+
+        current_time = current_milli_time()
+
+        stack_list = []
+
+        # get thread stack of target thread
+        stack = sys._current_frames().get(int(self._profiling_thread.ident))

Review comment:
       @sonatype-lift ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 merged pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
kezhenxu94 merged pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] Humbertzhang commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
Humbertzhang commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693316243



##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):

Review comment:
       ignore

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] Humbertzhang commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
Humbertzhang commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693317593



##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Optional[Thread]
+        self._profiling_stop_event = None  # type: Optional[Event]
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Optional[Event]
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)
+
+
+class ThreadProfiler:
+    def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+                 profile_context: ProfileTaskExecutionContext):
+        self.trace_context = trace_context
+        self._segment_id = segment_id
+        self._profiling_thread = profiling_thread
+        self._profile_context = profile_context
+        self._profile_start_time = -1
+        self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+        self.dump_sequence = 0
+
+        if trace_context.profile_status is None:
+            self.profile_status = ProfileStatusReference.create_with_pending()
+        else:
+            self.profile_status = trace_context.profile_status  # type: ProfileStatusReference
+            self.profile_status.update_status(ProfileStatus.PENDING)
+
+    def start_profiling_if_need(self):
+        if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+            self._profile_start_time = current_milli_time()
+            self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+    def stop_profiling(self):
+        self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+    def build_snapshot(self) -> Optional[TracingThreadSnapshot]:
+        if not self._profiling_thread.is_alive():
+            return None
+
+        current_time = current_milli_time()
+
+        stack_list = []
+
+        # get thread stack of target thread
+        stack = sys._current_frames().get(int(self._profiling_thread.ident))

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] Humbertzhang commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
Humbertzhang commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693317839



##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Event
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)
+
+
+class ThreadProfiler:
+    def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+                 profile_context: ProfileTaskExecutionContext):
+        self.trace_context = trace_context
+        self._segment_id = segment_id
+        self._profiling_thread = profiling_thread
+        self._profile_context = profile_context
+        self._profile_start_time = -1
+        self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+        self.dump_sequence = 0
+
+        if trace_context.profile_status is None:
+            self.profile_status = ProfileStatusReference.create_with_pending()
+        else:
+            self.profile_status = trace_context.profile_status  # type: ProfileStatusReference
+            self.profile_status.update_status(ProfileStatus.PENDING)
+
+    def start_profiling_if_need(self):
+        if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+            self._profile_start_time = current_milli_time()
+            self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+    def stop_profiling(self):
+        self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+    def build_snapshot(self) -> TracingThreadSnapshot:
+        if not self._profiling_thread.is_alive():
+            return None
+
+        current_time = current_milli_time()
+
+        stack_list = []
+
+        # get thread stack of target thread
+        stack = sys._current_frames().get(self._profiling_thread.ident)
+        if not stack:
+            return None
+
+        extracted = traceback.extract_stack(stack)
+        for idx, item in enumerate(extracted):
+            if idx > config.profile_dump_max_stack_depth:
+                break
+
+            code_sig = f"{item.filename}.{item.name}: {item.lineno}"
+            stack_list.append(code_sig)
+
+        # if is first dump, check is can start profiling
+        if self.dump_sequence == 0 and not self._profile_context.is_start_profileable():
+            return None

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] sonatype-lift[bot] commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693313674



##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event

Review comment:
       *Incompatible attribute type:*  Attribute `_profiling_stop_event` declared in class `ProfileTaskExecutionContext` has type `Event` but is used as type `None`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread

Review comment:
       *Incompatible attribute type:*  Attribute `_profiling_thread` declared in class `ProfileTaskExecutionContext` has type `Thread` but is used as type `None`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Event

Review comment:
       *Incompatible attribute type:*  Attribute `_stop_event` declared in class `ProfileThread` has type `Event` but is used as type `None`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Event
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)
+
+
+class ThreadProfiler:
+    def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+                 profile_context: ProfileTaskExecutionContext):
+        self.trace_context = trace_context
+        self._segment_id = segment_id
+        self._profiling_thread = profiling_thread
+        self._profile_context = profile_context
+        self._profile_start_time = -1
+        self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+        self.dump_sequence = 0
+
+        if trace_context.profile_status is None:
+            self.profile_status = ProfileStatusReference.create_with_pending()
+        else:
+            self.profile_status = trace_context.profile_status  # type: ProfileStatusReference
+            self.profile_status.update_status(ProfileStatus.PENDING)
+
+    def start_profiling_if_need(self):
+        if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+            self._profile_start_time = current_milli_time()
+            self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+    def stop_profiling(self):
+        self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+    def build_snapshot(self) -> TracingThreadSnapshot:
+        if not self._profiling_thread.is_alive():
+            return None
+
+        current_time = current_milli_time()
+
+        stack_list = []
+
+        # get thread stack of target thread
+        stack = sys._current_frames().get(self._profiling_thread.ident)

Review comment:
       *Incompatible parameter type:*  Expected `int` for 1st positional only parameter to call `typing.Mapping.get` but got `typing.Optional[int]`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):

Review comment:
       *Incompatible parameter type:*  Expected `typing.Iterable[Variable[_T]]` for 1st positional only parameter to call `enumerate.__init__` but got `AtomicArray`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)

##########
File path: skywalking/profile/profile_context.py
##########
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Thread
+        self._profiling_stop_event = None  # type: Event
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt+1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Event
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep/1000)
+
+
+class ThreadProfiler:
+    def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+                 profile_context: ProfileTaskExecutionContext):
+        self.trace_context = trace_context
+        self._segment_id = segment_id
+        self._profiling_thread = profiling_thread
+        self._profile_context = profile_context
+        self._profile_start_time = -1
+        self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+        self.dump_sequence = 0
+
+        if trace_context.profile_status is None:
+            self.profile_status = ProfileStatusReference.create_with_pending()
+        else:
+            self.profile_status = trace_context.profile_status  # type: ProfileStatusReference
+            self.profile_status.update_status(ProfileStatus.PENDING)
+
+    def start_profiling_if_need(self):
+        if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+            self._profile_start_time = current_milli_time()
+            self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+    def stop_profiling(self):
+        self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+    def build_snapshot(self) -> TracingThreadSnapshot:
+        if not self._profiling_thread.is_alive():
+            return None
+
+        current_time = current_milli_time()
+
+        stack_list = []
+
+        # get thread stack of target thread
+        stack = sys._current_frames().get(self._profiling_thread.ident)
+        if not stack:
+            return None
+
+        extracted = traceback.extract_stack(stack)
+        for idx, item in enumerate(extracted):
+            if idx > config.profile_dump_max_stack_depth:
+                break
+
+            code_sig = f"{item.filename}.{item.name}: {item.lineno}"
+            stack_list.append(code_sig)
+
+        # if is first dump, check is can start profiling
+        if self.dump_sequence == 0 and not self._profile_context.is_start_profileable():
+            return None

Review comment:
       *Incompatible return type:*  Expected `TracingThreadSnapshot` but got `None`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] Humbertzhang commented on a change in pull request #155: Add Profile function

Posted by GitBox <gi...@apache.org>.
Humbertzhang commented on a change in pull request #155:
URL: https://github.com/apache/skywalking-python/pull/155#discussion_r693434148



##########
File path: skywalking/profile/profile_service.py
##########
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        result = self._check_profile_task(task)
+        if not result.success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])

Review comment:
       I staied at same with Java Agent here, I think the reason for that is to make sure to profile enough time




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org