You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/08/22 04:20:18 UTC

[skywalking-python] branch master updated: Add Profiling function (#155)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 97bf7fd  Add Profiling function (#155)
97bf7fd is described below

commit 97bf7fd8b63aedfbb37b7035c84445c71b32cd07
Author: Humbert Zhang <50...@qq.com>
AuthorDate: Sun Aug 22 12:20:10 2021 +0800

    Add Profiling function (#155)
---
 docs/EnvVars.md                                    |   6 +-
 skywalking/agent/__init__.py                       |  51 ++++-
 skywalking/agent/protocol/__init__.py              |   6 +
 skywalking/agent/protocol/grpc.py                  |  40 +++-
 skywalking/client/__init__.py                      |   3 +
 skywalking/client/grpc.py                          |  20 +-
 skywalking/command/command_service.py              |  20 +-
 .../executors/profile_task_command_executor.py     |   7 +-
 skywalking/config.py                               |  12 +-
 skywalking/profile/__init__.py                     |  12 +-
 skywalking/profile/profile_context.py              | 223 +++++++++++++++++++++
 skywalking/profile/profile_service.py              | 208 +++++++++++++++++++
 .../__init__.py => profile/profile_status.py}      |  39 ++--
 skywalking/profile/profile_task.py                 |   2 +
 .../profile/profile_task_execution_service.py      | 101 ----------
 skywalking/profile/snapshot.py                     |  44 ++++
 skywalking/trace/context.py                        |  21 ++
 skywalking/utils/array.py                          |  63 ++++++
 .../{client/__init__.py => utils/atomic_ref.py}    |  37 ++--
 .../{profile/__init__.py => utils/integer.py}      |  18 +-
 skywalking/{profile/__init__.py => utils/time.py}  |   6 +-
 21 files changed, 770 insertions(+), 169 deletions(-)

diff --git a/docs/EnvVars.md b/docs/EnvVars.md
index 9416534..1fddacf 100644
--- a/docs/EnvVars.md
+++ b/docs/EnvVars.md
@@ -31,8 +31,12 @@ Environment Variable | Description | Default
 | `SW_KAFKA_REPORTER_TOPIC_LOG` | Specifying Kafka topic name for Log data. | `skywalking-logs` |
 | `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
 | `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off  | `512` |
-| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
+| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `True` |
 | `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
+| `SW_AGENT_PROFILE_MAX_PARALLEL` | The number of parallel monitor segment count. | `5` |
+| `SW_AGENT_PROFILE_DURATION` | The maximum monitor segment time(minutes), if current segment monitor time out of limit, then stop it. | `10` |
+| `SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH` | The number of max dump thread stack depth | `500` |
+| `SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE` | The number of snapshot transport to backend buffer size | `50` |
 | `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
 | `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
 | `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 97cd77c..f439f8d 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -27,8 +27,10 @@ from skywalking import config, plugins
 from skywalking import loggings
 from skywalking.agent.protocol import Protocol
 from skywalking.command import command_service
-from skywalking.config import profile_active, profile_task_query_interval
 from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
 
 if TYPE_CHECKING:
     from skywalking.trace.context import Segment
@@ -36,7 +38,7 @@ if TYPE_CHECKING:
 __started = False
 __protocol = Protocol()  # type: Protocol
 __heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \
-    = __queue = __log_queue = __finished = None
+    = __send_profile_thread = __queue = __log_queue = __snapshot_queue = __finished = None
 
 
 def __heartbeat():
@@ -69,6 +71,16 @@ def __report_log():
         __finished.wait(0)
 
 
+def __send_profile_snapshot():
+    while not __finished.is_set():
+        try:
+            __protocol.send_snapshot(__snapshot_queue)
+        except Exception as exc:
+            logger.error(str(exc))
+
+        __finished.wait(0.5)
+
+
 def __query_profile_command():
     while not __finished.is_set():
         try:
@@ -76,7 +88,7 @@ def __query_profile_command():
         except Exception as exc:
             logger.error(str(exc))
 
-        __finished.wait(profile_task_query_interval)
+        __finished.wait(config.get_profile_task_interval)
 
 
 def __command_dispatch():
@@ -86,13 +98,12 @@ def __command_dispatch():
 
 def __init_threading():
     global __heartbeat_thread, __report_thread, __log_report_thread, __query_profile_thread, \
-        __command_dispatch_thread, __queue, __log_queue, __finished
+        __command_dispatch_thread, __send_profile_thread, __queue, __log_queue, __snapshot_queue, __finished
 
     __queue = Queue(maxsize=config.max_buffer_size)
     __finished = Event()
     __heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
     __report_thread = Thread(name='ReportThread', target=__report, daemon=True)
-    __query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
     __command_dispatch_thread = Thread(name="CommandDispatchThread", target=__command_dispatch, daemon=True)
 
     __heartbeat_thread.start()
@@ -104,13 +115,18 @@ def __init_threading():
         __log_report_thread = Thread(name='LogReportThread', target=__report_log, daemon=True)
         __log_report_thread.start()
 
-    if profile_active:
+    if config.profile_active:
+        __snapshot_queue = Queue(maxsize=config.profile_snapshot_transport_buffer_size)
+
+        __query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
         __query_profile_thread.start()
 
+        __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=__send_profile_snapshot, daemon=True)
+        __send_profile_thread.start()
+
 
 def __init():
     global __protocol
-
     if config.protocol == 'grpc':
         from skywalking.agent.protocol.grpc import GrpcProtocol
         __protocol = GrpcProtocol()
@@ -132,9 +148,15 @@ def __init():
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+
     if config.log_reporter_active:
         __protocol.report_log(__log_queue, False)
         __log_queue.join()
+
+    if config.profile_active:
+        __protocol.send_snapshot(__snapshot_queue, False)
+        __snapshot_queue.join()
+
     __finished.set()
 
 
@@ -175,6 +197,7 @@ def start():
 
     loggings.init()
     config.finalize()
+    profile.init()
 
     __init()
 
@@ -210,3 +233,17 @@ def archive_log(log_data: 'LogData'):
         __log_queue.put(log_data, block=False)
     except Full:
         logger.warning('the queue is full, the log will be abandoned')
+
+
+def add_profiling_snapshot(snapshot: TracingThreadSnapshot):
+    try:
+        __snapshot_queue.put(snapshot)
+    except Full:
+        logger.warning('the snapshot queue is full, the snapshot will be abandoned')
+
+
+def notify_profile_finish(task: ProfileTask):
+    try:
+        __protocol.notify_profile_task_finish(task)
+    except Exception as e:
+        logger.error("notify profile task finish to backend fail. " + str(e))
diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py
index efc2fa4..24d4c3d 100644
--- a/skywalking/agent/protocol/__init__.py
+++ b/skywalking/agent/protocol/__init__.py
@@ -40,3 +40,9 @@ class Protocol(ABC):
 
     def query_profile_commands(self):
         pass
+
+    def send_snapshot(self, queue: Queue, block: bool = True):
+        pass
+
+    def notify_profile_task_finish(self, task):
+        pass
diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py
index 468ef5a..7ad0a8e 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -24,6 +24,7 @@ import grpc
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
 from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
 from skywalking.protocol.logging.Logging_pb2 import LogData
+from skywalking.protocol.profile.Profile_pb2 import ThreadSnapshot, ThreadStack
 
 from skywalking import config
 from skywalking.agent import Protocol
@@ -32,6 +33,8 @@ from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegment
     GrpcProfileTaskChannelService, GrpcLogDataReportService
 from skywalking.loggings import logger
 from skywalking.trace.segment import Segment
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.profile.profile_task import ProfileTask
 
 
 class GrpcProtocol(Protocol):
@@ -52,7 +55,7 @@ class GrpcProtocol(Protocol):
         self.channel.subscribe(self._cb, try_to_connect=True)
         self.service_management = GrpcServiceManagementClient(self.channel)
         self.traces_reporter = GrpcTraceSegmentReportService(self.channel)
-        self.profile_query = GrpcProfileTaskChannelService(self.channel)
+        self.profile_channel = GrpcProfileTaskChannelService(self.channel)
         self.log_reporter = GrpcLogDataReportService(self.channel)
 
     def _cb(self, state):
@@ -61,7 +64,10 @@ class GrpcProtocol(Protocol):
 
     def query_profile_commands(self):
         logger.debug("query profile commands")
-        self.profile_query.do_query()
+        self.profile_channel.do_query()
+
+    def notify_profile_task_finish(self, task: ProfileTask):
+        self.profile_channel.finish(task)
 
     def heartbeat(self):
         try:
@@ -163,3 +169,33 @@ class GrpcProtocol(Protocol):
             self.log_reporter.report(generator())
         except grpc.RpcError:
             self.on_error()
+
+    def send_snapshot(self, queue: Queue, block: bool = True):
+        start = time()
+
+        def generator():
+            while True:
+                try:
+                    timeout = config.QUEUE_TIMEOUT - int(time() - start)  # type: int
+                    if timeout <= 0:
+                        return
+                    snapshot = queue.get(block=block, timeout=timeout)  # type: TracingThreadSnapshot
+                except Empty:
+                    return
+
+                queue.task_done()
+
+                transform_snapshot = ThreadSnapshot(
+                    taskId=str(snapshot.task_id),
+                    traceSegmentId=str(snapshot.trace_segment_id),
+                    time=int(snapshot.time),
+                    sequence=int(snapshot.sequence),
+                    stack=ThreadStack(codeSignatures=snapshot.stack_list)
+                )
+
+                yield transform_snapshot
+
+        try:
+            self.profile_channel.send(generator())
+        except grpc.RpcError:
+            self.on_error()
diff --git a/skywalking/client/__init__.py b/skywalking/client/__init__.py
index 007e0bc..562e3a4 100644
--- a/skywalking/client/__init__.py
+++ b/skywalking/client/__init__.py
@@ -37,3 +37,6 @@ class LogDataReportService(object):
 class ProfileTaskChannelService(object):
     def do_query(self):
         raise NotImplementedError()
+
+    def send(self, generator):
+        raise NotImplementedError()
diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py
index 4c1fe31..5ffc24a 100644
--- a/skywalking/client/grpc.py
+++ b/skywalking/client/grpc.py
@@ -18,11 +18,11 @@
 import grpc
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
 from skywalking.protocol.language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub
+from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
+from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery, ProfileTaskFinishReport
 from skywalking.protocol.logging.Logging_pb2_grpc import LogReportServiceStub
 from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
 from skywalking.protocol.management.Management_pb2_grpc import ManagementServiceStub
-from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery
-from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
 
 from skywalking import config
 from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService, \
@@ -30,6 +30,7 @@ from skywalking.client import ServiceManagementClient, TraceSegmentReportService
 from skywalking.command import command_service
 from skywalking.loggings import logger
 from skywalking.profile import profile_task_execution_service
+from skywalking.profile.profile_task import ProfileTask
 
 
 class GrpcServiceManagementClient(ServiceManagementClient):
@@ -73,7 +74,7 @@ class GrpcLogDataReportService(LogDataReportService):
 
 class GrpcProfileTaskChannelService(ProfileTaskChannelService):
     def __init__(self, channel: grpc.Channel):
-        self.task_stub = ProfileTaskStub(channel)
+        self.profile_stub = ProfileTaskStub(channel)
 
     def do_query(self):
 
@@ -83,5 +84,16 @@ class GrpcProfileTaskChannelService(ProfileTaskChannelService):
             lastCommandTime=profile_task_execution_service.get_last_command_create_time()
         )
 
-        commands = self.task_stub.getProfileTaskCommands(query)
+        commands = self.profile_stub.getProfileTaskCommands(query)
         command_service.receive_command(commands)
+
+    def send(self, generator):
+        self.profile_stub.collectSnapshot(generator)
+
+    def finish(self, task: ProfileTask):
+        finish_report = ProfileTaskFinishReport(
+            service=config.service_name,
+            serviceInstance=config.service_instance,
+            taskId=task.task_id
+        )
+        self.profile_stub.reportTaskFinish(finish_report)
diff --git a/skywalking/command/command_service.py b/skywalking/command/command_service.py
index e1d4bbf..8f9244e 100644
--- a/skywalking/command/command_service.py
+++ b/skywalking/command/command_service.py
@@ -30,38 +30,38 @@ from skywalking.loggings import logger
 class CommandService:
 
     def __init__(self):
-        self.__commands = queue.Queue()  # type: queue.Queue
+        self._commands = queue.Queue()  # type: queue.Queue
         # don't execute same command twice
-        self.__command_serial_number_cache = CommandSerialNumberCache()
+        self._command_serial_number_cache = CommandSerialNumberCache()
 
     def dispatch(self):
         while True:
             # block until a command is available
-            command = self.__commands.get()  # type: BaseCommand
+            command = self._commands.get()  # type: BaseCommand
             if not self.__is_command_executed(command):
                 command_executor_service.execute(command)
-                self.__command_serial_number_cache.add(command.serial_number)
+                self._command_serial_number_cache.add(command.serial_number)
 
     def __is_command_executed(self, command: BaseCommand):
-        return self.__command_serial_number_cache.contains(command.serial_number)
+        return self._command_serial_number_cache.contains(command.serial_number)
 
     def receive_command(self, commands: Commands):
         for command in commands.commands:
             try:
                 base_command = CommandDeserializer.deserialize(command)
-                logger.debug("Received command [{%s} {%s}]", base_command.command, base_command.serial_number)
+                logger.debug("received command [{%s} {%s}]", base_command.command, base_command.serial_number)
 
                 if self.__is_command_executed(base_command):
-                    logger.warning("Command[{%s}] is executed, ignored.", base_command.command)
+                    logger.warning("command[{%s}] is executed, ignored.", base_command.command)
                     continue
 
                 try:
-                    self.__commands.put(base_command)
+                    self._commands.put(base_command)
                 except queue.Full:
-                    logger.warning("Command[{%s}, {%s}] cannot add to command list. because the command list is full.",
+                    logger.warning("command[{%s}, {%s}] cannot add to command list. because the command list is full.",
                                    base_command.command, base_command.serial_number)
             except UnsupportedCommandException as e:
-                logger.warning("Received unsupported command[{%s}].", e.command.command)
+                logger.warning("received unsupported command[{%s}].", e.command.command)
 
 
 class CommandSerialNumberCache:
diff --git a/skywalking/command/executors/profile_task_command_executor.py b/skywalking/command/executors/profile_task_command_executor.py
index 53fa5aa..2134d99 100644
--- a/skywalking/command/executors/profile_task_command_executor.py
+++ b/skywalking/command/executors/profile_task_command_executor.py
@@ -17,16 +17,13 @@
 
 from skywalking.command.executors.command_executor import CommandExecutor
 from skywalking.command.profile_task_command import ProfileTaskCommand
-from skywalking.loggings import logger
-from skywalking.profile import profile_task_execution_service
+from skywalking import profile
 from skywalking.profile.profile_task import ProfileTask
 
 
 class ProfileTaskCommandExecutor(CommandExecutor):
 
     def execute(self, command: ProfileTaskCommand):
-        logger.debug("ProfileTaskCommandExecutor start execute ProfileTaskCommand [{%s}]", command.serial_number)
-
         profile_task = ProfileTask(task_id=command.task_id,
                                    first_span_op_name=command.endpoint_name,
                                    duration=command.duration,
@@ -36,4 +33,4 @@ class ProfileTaskCommandExecutor(CommandExecutor):
                                    start_time=command.start_time,
                                    create_time=command.create_time)
 
-        profile_task_execution_service.add_profile_task(profile_task)
+        profile.profile_task_execution_service.add_profile_task(profile_task)
diff --git a/skywalking/config.py b/skywalking/config.py
index 0123f3b..041a6a8 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -64,9 +64,15 @@ kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "sky
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
 kafka_topic_log = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or "skywalking-logs"  # type: str
 celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
-profile_active = True if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
-                         os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False  # type: bool
-profile_task_query_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20')
+
+# profile configs
+get_profile_task_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20')  # type: int
+profile_active = False if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
+                         os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'False' else True  # type: bool
+profile_max_parallel = int(os.getenv("SW_AGENT_PROFILE_MAX_PARALLEL") or '5')  # type: int
+profile_duration = int(os.getenv('SW_AGENT_PROFILE_DURATION') or '10')  # type: int
+profile_dump_max_stack_depth = int(os.getenv('SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH') or '500')  # type: int
+profile_snapshot_transport_buffer_size = int(os.getenv('SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE') or '50')
 
 # NOTE - Log reporting requires a separate channel, will merge in the future.
 log_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \
diff --git a/skywalking/profile/__init__.py b/skywalking/profile/__init__.py
index fdf00ab..b160441 100644
--- a/skywalking/profile/__init__.py
+++ b/skywalking/profile/__init__.py
@@ -15,6 +15,14 @@
 # limitations under the License.
 #
 
-from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService
+profile_task_execution_service = None
 
-profile_task_execution_service = ProfileTaskExecutionService()
+
+def init():
+    from skywalking.profile.profile_service import ProfileTaskExecutionService
+
+    global profile_task_execution_service
+    if profile_task_execution_service:
+        return
+
+    profile_task_execution_service = ProfileTaskExecutionService()
diff --git a/skywalking/profile/profile_context.py b/skywalking/profile/profile_context.py
new file mode 100644
index 0000000..1584c27
--- /dev/null
+++ b/skywalking/profile/profile_context.py
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import time
+from threading import Thread, Event, current_thread
+import sys
+import traceback
+from typing import Optional
+
+from skywalking import agent
+from skywalking import config
+from skywalking.loggings import logger
+from skywalking import profile
+from skywalking.profile.profile_status import ProfileStatusReference, ProfileStatus
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.snapshot import TracingThreadSnapshot
+from skywalking.trace.context import SpanContext
+from skywalking.utils.array import AtomicArray
+from skywalking.utils.integer import AtomicInteger
+from skywalking.utils.time import current_milli_time
+
+
+class ProfileTaskExecutionContext:
+    def __init__(self, task: ProfileTask):
+        self.task = task  # type: ProfileTask
+        self._current_profiling_cnt = AtomicInteger(var=0)
+        self._total_started_profiling_cnt = AtomicInteger(var=0)
+        self.profiling_segment_slots = AtomicArray(length=config.profile_max_parallel)
+        self._profiling_thread = None  # type: Optional[Thread]
+        self._profiling_stop_event = None  # type: Optional[Event]
+
+    def start_profiling(self):
+        profile_thread = ProfileThread(self)
+        self._profiling_stop_event = Event()
+
+        self._profiling_thread = Thread(target=profile_thread.start, args=[self._profiling_stop_event], daemon=True)
+        self._profiling_thread.start()
+
+    def stop_profiling(self):
+        if self._profiling_thread is not None and self._profiling_stop_event is not None:
+            self._profiling_stop_event.set()
+
+    def attempt_profiling(self, trace_context: SpanContext, segment_id: str, first_span_opname: str) -> \
+            ProfileStatusReference:
+        """
+        check have available slot to profile and add it
+        """
+
+        # check has available slot
+        using_slot_cnt = self._current_profiling_cnt.get()
+        if using_slot_cnt >= config.profile_max_parallel:
+            return ProfileStatusReference.create_with_none()
+
+        # check first operation name matches
+        if not self.task.first_span_op_name == first_span_opname:
+            return ProfileStatusReference.create_with_none()
+
+        # if out limit started profiling count then stop add profiling
+        if self._total_started_profiling_cnt.get() > self.task.max_sampling_count:
+            return ProfileStatusReference.create_with_none()
+
+        # try to occupy slot
+        if not self._current_profiling_cnt.compare_and_set(using_slot_cnt,
+                                                           using_slot_cnt + 1):
+            return ProfileStatusReference.create_with_none()
+
+        thread_profiler = ThreadProfiler(trace_context=trace_context,
+                                         segment_id=segment_id,
+                                         profiling_thread=current_thread(),
+                                         profile_context=self)
+
+        slot_length = self.profiling_segment_slots.length()
+        for idx in range(slot_length):
+            # occupy slot success
+            if self.profiling_segment_slots.compare_and_set(idx, None, thread_profiler):
+                return thread_profiler.profile_status
+
+        return ProfileStatusReference.create_with_none()
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        if trace_context.profile_status.is_being_watched():
+            return
+
+        # if first_span_opname was changed by other plugin, there can start profile as well
+        trace_context.profile_status.update_status(self.attempt_profiling(trace_context,
+                                                                          segment_id,
+                                                                          first_span_opname).get())
+
+    def stop_tracing_profile(self, trace_context: SpanContext):
+        """
+        find tracing context and clear on slot
+        """
+        for idx, profiler in enumerate(self.profiling_segment_slots):
+            if profiler and profiler.matches(trace_context):
+                self.profiling_segment_slots.set(idx, None)
+                profiler.stop_profiling()
+                self._current_profiling_cnt.add_and_get(-1)
+                break
+
+    def is_start_profileable(self):
+        return self._total_started_profiling_cnt.add_and_get(1) <= self.task.max_sampling_count
+
+
+class ProfileThread:
+    def __init__(self, context: ProfileTaskExecutionContext):
+        self._task_execution_context = context
+        self._task_execution_service = profile.profile_task_execution_service
+        self._stop_event = None  # type: Optional[Event]
+
+    def start(self, stop_event: Event):
+        self._stop_event = stop_event
+
+        try:
+            self.profiling(self._task_execution_context)
+        except Exception as e:
+            logger.error("profiling task fail. task_id:[%s] error:[%s]", self._task_execution_context.task.task_id, e)
+        finally:
+            self._task_execution_service.stop_current_profile_task(self._task_execution_context)
+
+    def profiling(self, context: ProfileTaskExecutionContext):
+        max_sleep_period = context.task.thread_dump_period
+
+        while not self._stop_event.is_set():
+            current_loop_start_time = current_milli_time()
+            profilers = self._task_execution_context.profiling_segment_slots
+
+            for profiler in profilers:  # type: ThreadProfiler
+                if profiler is None:
+                    continue
+
+                if profiler.profile_status.get() is ProfileStatus.PENDING:
+                    profiler.start_profiling_if_need()
+                elif profiler.profile_status.get() is ProfileStatus.PROFILING:
+                    snapshot = profiler.build_snapshot()
+                    if snapshot is not None:
+                        agent.add_profiling_snapshot(snapshot)
+                    else:
+                        # tell execution context current tracing thread dump failed, stop it
+                        context.stop_tracing_profile(profiler.trace_context)
+
+            need_sleep = (current_loop_start_time + max_sleep_period) - current_milli_time()
+            if not need_sleep > 0:
+                need_sleep = max_sleep_period
+
+            # convert to float second
+            time.sleep(need_sleep / 1000)
+
+
+class ThreadProfiler:
+    def __init__(self, trace_context: SpanContext, segment_id: str, profiling_thread: Thread,
+                 profile_context: ProfileTaskExecutionContext):
+        self.trace_context = trace_context
+        self._segment_id = segment_id
+        self._profiling_thread = profiling_thread
+        self._profile_context = profile_context
+        self._profile_start_time = -1
+        self.profiling_max_time_mills = config.profile_duration * 60 * 1000
+
+        self.dump_sequence = 0
+
+        if trace_context.profile_status is None:
+            self.profile_status = ProfileStatusReference.create_with_pending()
+        else:
+            self.profile_status = trace_context.profile_status  # type: ProfileStatusReference
+            self.profile_status.update_status(ProfileStatus.PENDING)
+
+    def start_profiling_if_need(self):
+        if current_milli_time() - self.trace_context.create_time > self._profile_context.task.min_duration_threshold:
+            self._profile_start_time = current_milli_time()
+            self.trace_context.profile_status.update_status(ProfileStatus.PROFILING)
+
+    def stop_profiling(self):
+        self.trace_context.profile_status.update_status(ProfileStatus.STOPPED)
+
+    def build_snapshot(self) -> Optional[TracingThreadSnapshot]:
+        if not self._profiling_thread.is_alive():
+            return None
+
+        current_time = current_milli_time()
+
+        stack_list = []
+
+        # get thread stack of target thread
+        stack = sys._current_frames().get(int(self._profiling_thread.ident))
+        if not stack:
+            return None
+
+        extracted = traceback.extract_stack(stack)
+        for idx, item in enumerate(extracted):
+            if idx > config.profile_dump_max_stack_depth:
+                break
+
+            code_sig = f"{item.filename}.{item.name}: {item.lineno}"
+            stack_list.append(code_sig)
+
+        # if is first dump, check is can start profiling
+        if self.dump_sequence == 0 and not self._profile_context.is_start_profileable():
+            return None
+
+        t = TracingThreadSnapshot(self._profile_context.task.task_id,
+                                  self._segment_id,
+                                  self.dump_sequence,
+                                  current_time,
+                                  stack_list)
+        self.dump_sequence += 1
+        return t
+
+    def matches(self, trace_context: SpanContext) -> bool:
+        return self.trace_context == trace_context
diff --git a/skywalking/profile/profile_service.py b/skywalking/profile/profile_service.py
new file mode 100644
index 0000000..f3b6c8f
--- /dev/null
+++ b/skywalking/profile/profile_service.py
@@ -0,0 +1,208 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from queue import Queue
+from typing import Tuple
+from skywalking.loggings import logger
+from skywalking.profile.profile_constants import ProfileConstants
+from skywalking.profile.profile_task import ProfileTask
+from skywalking.profile.profile_context import ProfileTaskExecutionContext
+from skywalking.profile.profile_status import ProfileStatusReference
+
+from skywalking.trace.context import SpanContext
+
+from skywalking.utils.atomic_ref import AtomicRef
+from skywalking import agent
+
+from concurrent.futures import ThreadPoolExecutor
+from threading import Timer, RLock, Lock
+
+from skywalking.utils.time import current_milli_time
+
+
+class Scheduler:
+
+    @staticmethod
+    def schedule(milliseconds, func, *args, **kwargs):
+        seconds = milliseconds / 1000
+        if seconds < 0:
+            seconds = 0
+
+        t = Timer(seconds, func, *args, **kwargs)
+        t.daemon = True
+        t.start()
+
+
+class ProfileTaskExecutionService:
+    MINUTE_TO_MILLIS = 60000
+
+    def __init__(self):
+        # Queue is thread safe
+        self._profile_task_list = Queue()  # type: Queue
+        # queue_lock for making sure complex operation of this profile_task_list is thread safe
+        self.queue_lock = Lock()
+
+        self._last_command_create_time = -1  # type: int
+        # single thread executor
+        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.task_execution_context = AtomicRef(None)
+
+        self.profile_task_scheduler = Scheduler()
+
+        # rlock for process_profile_task and stop_current_profile_task
+        self._rlock = RLock()
+
+    def remove_from_profile_task_list(self, task: ProfileTask) -> bool:
+        """
+        Remove a task from profile_task_list in a thread safe state
+        """
+        with self.queue_lock:
+            item_left = []
+            result = False
+
+            while not self._profile_task_list.empty():
+                item = self._profile_task_list.get()
+                if item == task:
+                    result = True
+                    # not put in item_list for removing it
+                    continue
+
+                item_left.append(item)
+
+            for item in item_left:
+                self._profile_task_list.put(item)
+
+            return result
+
+    def get_last_command_create_time(self) -> int:
+        return self._last_command_create_time
+
+    def add_profile_task(self, task: ProfileTask):
+        # update last command create time, which will be used in command query
+        if task.create_time > self._last_command_create_time:
+            self._last_command_create_time = task.create_time
+
+        # check profile task object
+        success, error_reason = self._check_profile_task(task)
+        if not success:
+            logger.warning("check command error, cannot process this profile task. reason: %s", error_reason)
+            return
+
+        # add task to list
+        self._profile_task_list.put(task)
+
+        delay_millis = task.start_time - current_milli_time()
+        # schedule to start task
+        self.profile_task_scheduler.schedule(delay_millis, self.process_profile_task, [task])
+
+    def add_profiling(self, context: SpanContext, segment_id: str, first_span_opname: str) -> ProfileStatusReference:
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return ProfileStatusReference.create_with_none()
+
+        return execution_context.attempt_profiling(context, segment_id, first_span_opname)
+
+    def profiling_recheck(self, trace_context: SpanContext, segment_id: str, first_span_opname: str):
+        """
+        Re-check current trace need profiling, in case that third-party plugins change the operation name.
+        """
+        execution_context = self.task_execution_context.get()  # type: ProfileTaskExecutionContext
+        if execution_context is None:
+            return
+        execution_context.profiling_recheck(trace_context, segment_id, first_span_opname)
+
+    # using reentrant lock for process_profile_task and stop_current_profile_task,
+    # to make sure thread safe.
+    def process_profile_task(self, task: ProfileTask):
+        with self._rlock:
+            # make sure prev profile task already stopped
+            self.stop_current_profile_task(self.task_execution_context.get())
+
+            # make stop task schedule and task context
+            current_context = ProfileTaskExecutionContext(task)
+            self.task_execution_context.set(current_context)
+
+            # start profiling this task
+            current_context.start_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] started", task.task_id, task.first_span_op_name)
+
+            millis = task.duration * self.MINUTE_TO_MILLIS
+            self.profile_task_scheduler.schedule(millis, self.stop_current_profile_task, [current_context])
+
+    def stop_current_profile_task(self, need_stop: ProfileTaskExecutionContext):
+        with self._rlock:
+            # need_stop is None or task_execution_context is not need_stop context
+            if need_stop is None or not self.task_execution_context.compare_and_set(need_stop, None):
+                return
+
+            need_stop.stop_profiling()
+            logger.debug("profile task [%s] for endpoint [%s] stopped", need_stop.task.task_id,
+                         need_stop.task.first_span_op_name)
+
+            self.remove_from_profile_task_list(need_stop.task)
+
+            # notify profiling task has finished
+            agent.notify_profile_finish(need_stop.task)
+
+    def _check_profile_task(self, task: ProfileTask) -> Tuple[bool, str]:
+        try:
+            # endpoint name
+            if len(task.first_span_op_name) == 0:
+                return (False, "endpoint name [{}] error, "
+                               "should be str and not empty".format(task.first_span_op_name))
+            # duration
+            if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE:
+                return (False, "monitor duration must greater"
+                               " than {} minutes".format(ProfileConstants.TASK_DURATION_MIN_MINUTE))
+            if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE:
+                return (False, "monitor duration must less"
+                               " than {} minutes".format(ProfileConstants.TASK_DURATION_MAX_MINUTE))
+            # min duration threshold
+            if task.min_duration_threshold < 0:
+                return False, "min duration threshold must greater than or equals zero"
+
+            # dump period
+            if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS:
+                return (False, "dump period must be greater than or equals to {}"
+                               " milliseconds".format(ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS))
+
+            # max sampling count
+            if task.max_sampling_count <= 0:
+                return False, "max sampling count must greater than zero"
+            if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT:
+                return (False, "max sampling count must less"
+                               " than {}".format(ProfileConstants.TASK_MAX_SAMPLING_COUNT))
+
+            # check task queue
+            task_finish_time = self._cal_profile_task_finish_time(task)
+
+            # lock the self._profile_task_list.queue when check the item in it, avoid concurrency errors
+            with self._profile_task_list.mutex:
+                for profile_task in self._profile_task_list.queue:  # type: ProfileTask
+                    # if the end time of the task to be added is during the execution of any data, means is a error data
+                    if task.start_time <= task_finish_time <= self._cal_profile_task_finish_time(profile_task):
+                        return (False, "there already have processing task in time range, "
+                                       "could not add a new task again. processing task monitor "
+                                       "endpoint name: {}".format(profile_task.first_span_op_name))
+
+            return True, ""
+
+        except TypeError:
+            return False, "ProfileTask attributes has type error"
+
+    def _cal_profile_task_finish_time(self, task: ProfileTask) -> int:
+        return task.start_time + task.duration * self.MINUTE_TO_MILLIS
diff --git a/skywalking/client/__init__.py b/skywalking/profile/profile_status.py
similarity index 51%
copy from skywalking/client/__init__.py
copy to skywalking/profile/profile_status.py
index 007e0bc..f594cff 100644
--- a/skywalking/client/__init__.py
+++ b/skywalking/profile/profile_status.py
@@ -15,25 +15,36 @@
 # limitations under the License.
 #
 
+from enum import Enum
 
-class ServiceManagementClient(object):
-    def send_instance_props(self):
-        raise NotImplementedError()
 
-    def send_heart_beat(self):
-        raise NotImplementedError()
+class ProfileStatus(Enum):
+    NONE = 0
+    PENDING = 1
+    PROFILING = 2
+    STOPPED = 3
 
 
-class TraceSegmentReportService(object):
-    def report(self, generator):
-        raise NotImplementedError()
+class ProfileStatusReference:
+    def __init__(self, status: ProfileStatus):
+        self.status = status
 
+    @staticmethod
+    def create_with_none():
+        return ProfileStatusReference(ProfileStatus.NONE)
 
-class LogDataReportService(object):
-    def report(self, generator):
-        raise NotImplementedError()
+    @staticmethod
+    def create_with_pending():
+        return ProfileStatusReference(ProfileStatus.PENDING)
 
+    def get(self) -> ProfileStatus:
+        return self.status
 
-class ProfileTaskChannelService(object):
-    def do_query(self):
-        raise NotImplementedError()
+    def update_status(self, update: ProfileStatus):
+        self.status = update
+
+    def is_being_watched(self):
+        return self.status is not ProfileStatus.NONE
+
+    def is_profiling(self):
+        return self.status is ProfileStatus.PROFILING
diff --git a/skywalking/profile/profile_task.py b/skywalking/profile/profile_task.py
index a11df51..0776627 100644
--- a/skywalking/profile/profile_task.py
+++ b/skywalking/profile/profile_task.py
@@ -33,7 +33,9 @@ class ProfileTask:
         self.task_id = str(task_id)  # type: str
         self.first_span_op_name = str(first_span_op_name)  # type: str
         self.duration = int(duration)  # type: int
+        # when can start profile after span context created
         self.min_duration_threshold = int(min_duration_threshold)  # type: int
+        # profile interval
         self.thread_dump_period = int(thread_dump_period)  # type: int
         self.max_sampling_count = int(max_sampling_count)  # type: int
         self.start_time = int(start_time)  # type: int
diff --git a/skywalking/profile/profile_task_execution_service.py b/skywalking/profile/profile_task_execution_service.py
deleted file mode 100644
index 35ab65d..0000000
--- a/skywalking/profile/profile_task_execution_service.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from queue import Queue
-from skywalking.loggings import logger
-from skywalking.profile.profile_constants import ProfileConstants
-from skywalking.profile.profile_task import ProfileTask
-
-
-class ProfileTaskExecutionService:
-    MINUTE_TO_MILLIS = 60000
-
-    def __init__(self):
-        # Queue is thread safe
-        self.__profile_task_list = Queue()  # type: Queue
-        self.__last_command_create_time = -1  # type: int
-
-    def get_last_command_create_time(self) -> int:
-        return self.__last_command_create_time
-
-    def add_profile_task(self, task: ProfileTask):
-        # update last command create time, which will be used in command query
-        if task.create_time > self.__last_command_create_time:
-            self.__last_command_create_time = task.create_time
-
-        # check profile task object
-        result = self.__check_profile_task(task)
-        if not result.success:
-            logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason)
-            return
-
-        # add task to list
-        self.__profile_task_list.put(task)
-
-    class CheckResult:
-        def __init__(self, success: bool, error_reason: str):
-            self.success = success  # type: bool
-            self.error_reason = error_reason  # type: str
-
-    def __check_profile_task(self, task: ProfileTask) -> CheckResult:
-        try:
-            # endpoint name
-            if len(task.first_span_op_name) == 0:
-                return self.CheckResult(False, "endpoint name [{}] error, "
-                                               "should be str and not empty".format(task.first_span_op_name))
-            # duration
-            if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE:
-                return self.CheckResult(False, "monitor duration must greater"
-                                               " than {} minutes".format(ProfileConstants.TASK_DURATION_MIN_MINUTE))
-            if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE:
-                return self.CheckResult(False, "monitor duration must less"
-                                               " than {} minutes".format(ProfileConstants.TASK_DURATION_MAX_MINUTE))
-            # min duration threshold
-            if task.min_duration_threshold < 0:
-                return self.CheckResult(False, "min duration threshold must greater than or equals zero")
-
-            # dump period
-            if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS:
-                return self.CheckResult(False, "dump period must be greater than or equals to {}"
-                                               " milliseconds".format(ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS))
-
-            # max sampling count
-            if task.max_sampling_count <= 0:
-                return self.CheckResult(False, "max sampling count must greater than zero")
-            if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT:
-                return self.CheckResult(False, "max sampling count must less"
-                                               " than {}".format(ProfileConstants.TASK_MAX_SAMPLING_COUNT))
-
-            # check task queue
-            task_finish_time = self.__cal_profile_task_finish_time(task)
-
-            # lock the self.__profile_task_list.queue when check the item in it, avoid concurrency errors
-            with self.__profile_task_list.mutex:
-                for profile_task in self.__profile_task_list.queue:  # type: ProfileTask
-                    # if the end time of the task to be added is during the execution of any data, means is a error data
-                    if task.start_time <= task_finish_time <= self.__cal_profile_task_finish_time(profile_task):
-                        return self.CheckResult(False, "there already have processing task in time range, "
-                                                       "could not add a new task again. processing task monitor "
-                                                       "endpoint name: {}".format(profile_task.first_span_op_name))
-
-            return self.CheckResult(True, "")
-
-        except TypeError:
-            return self.CheckResult(False, "ProfileTask attributes has type error")
-
-    def __cal_profile_task_finish_time(self, task: ProfileTask) -> int:
-        return task.start_time + task.duration * self.MINUTE_TO_MILLIS
diff --git a/skywalking/profile/snapshot.py b/skywalking/profile/snapshot.py
new file mode 100644
index 0000000..b3a8212
--- /dev/null
+++ b/skywalking/profile/snapshot.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import List
+
+from skywalking.protocol.profile.Profile_pb2 import ThreadSnapshot, ThreadStack
+
+
+class TracingThreadSnapshot:
+
+    def __init__(self, task_id: str, trace_segment_id: str, sequence: int, time: int, stack_list: List[str]):
+        self.task_id = task_id
+        self.trace_segment_id = trace_segment_id
+        self.sequence = sequence
+        self.time = time
+        self.stack_list = stack_list
+
+    def transform(self) -> ThreadSnapshot:
+        code_sigs = [code_sign for code_sign in self.stack_list]
+        stack = ThreadStack(codeSignatures=code_sigs)
+
+        snapshot = ThreadSnapshot(
+            taskId=str(self.task_id),
+            traceSegmentId=str(self.trace_segment_id),
+            time=int(self.time),
+            sequence=int(self.sequence),
+            stack=stack
+        )
+
+        return snapshot
diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py
index cdd57c4..9ffd8e4 100644
--- a/skywalking/trace/context.py
+++ b/skywalking/trace/context.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+from skywalking.profile.profile_status import ProfileStatusReference
 from skywalking import Component, agent, config
 from skywalking.agent import isfull
 from skywalking.trace import ID
@@ -23,6 +24,8 @@ from skywalking.trace.segment import Segment, SegmentRef
 from skywalking.trace.snapshot import Snapshot
 from skywalking.trace.span import Span, Kind, NoopSpan, EntrySpan, ExitSpan
 from skywalking.utils.counter import Counter
+from skywalking.utils.time import current_milli_time
+from skywalking import profile
 
 
 try:  # attempt to use async-local instead of thread-local context and spans
@@ -75,6 +78,8 @@ class SpanContext(object):
         self._sid = Counter()
         self._correlation = {}  # type: dict
         self._nspans = 0
+        self.profile_status = None  # type: ProfileStatusReference
+        self.create_time = current_milli_time()
 
     def ignore_check(self, op: str, kind: Kind, carrier: 'Carrier' = None):
         if config.RE_IGNORE_PATH.match(op) or isfull() or (carrier is not None and carrier.is_suppressed):
@@ -106,7 +111,17 @@ class SpanContext(object):
         spans = _spans()
         parent = spans[-1] if spans else None  # type: Span
 
+        # start profiling if profile_context is set
+        if self.profile_status is None:
+            self.profile_status = profile.profile_task_execution_service.add_profiling(self,
+                                                                                       self.segment.segment_id,
+                                                                                       op)
+
         if parent is not None and parent.kind.is_entry and inherit == parent.component:
+            # Span's operation name could be override, recheck here
+            # if the op name now is being profiling, start profile it here
+            self.profiling_recheck(parent, op)
+
             span = parent
             span.op = op
         else:
@@ -150,6 +165,12 @@ class SpanContext(object):
 
         return span
 
+    def profiling_recheck(self, span: Span, op_name: str):
+        # only check first span, e.g, first opname is correct.
+        if span.sid != 0:
+            return
+        profile.profile_task_execution_service.profiling_recheck(self, self.segment.segment_id, op_name)
+
     def start(self, span: Span):
         self._nspans += 1
         spans = _spans_dup()
diff --git a/skywalking/utils/array.py b/skywalking/utils/array.py
new file mode 100644
index 0000000..e32e3f9
--- /dev/null
+++ b/skywalking/utils/array.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+
+
+class AtomicArray:
+
+    def __init__(self, length: int):
+        self._length = length
+        self._array = [None] * self._length
+        self._lock = threading.Lock()
+
+    def __getitem__(self, idx):
+        # for iteration
+        with self._lock:
+            return self._array[idx]
+
+    def length(self) -> int:
+        return self._length
+
+    def set(self, idx: int, new_value):
+        if idx < 0 or idx >= self.length():
+            raise IndexError("atomic array assignment index out of range")
+
+        with self._lock:
+            self._array[idx] = new_value
+
+    def get(self, idx: int):
+        if idx < 0 or idx >= self.length():
+            raise IndexError("atomic array assignment index out of range")
+
+        with self._lock:
+            return self._array[idx]
+
+    def compare_and_set(self, idx: int, expect, update) -> bool:
+        """
+        Atomically sets the value of array to the given updated value if the current value == the expected value
+        :return: return True if success, False if the actual value was not equal to the expected value.
+        """
+        if idx < 0 or idx >= self.length():
+            raise IndexError("atomic array assignment index out of range")
+
+        with self._lock:
+            if self._array[idx] == expect:
+                self._array[idx] = update
+                return True
+
+            return False
diff --git a/skywalking/client/__init__.py b/skywalking/utils/atomic_ref.py
similarity index 52%
copy from skywalking/client/__init__.py
copy to skywalking/utils/atomic_ref.py
index 007e0bc..e938b5f 100644
--- a/skywalking/client/__init__.py
+++ b/skywalking/utils/atomic_ref.py
@@ -15,25 +15,30 @@
 # limitations under the License.
 #
 
+import threading
 
-class ServiceManagementClient(object):
-    def send_instance_props(self):
-        raise NotImplementedError()
 
-    def send_heart_beat(self):
-        raise NotImplementedError()
+class AtomicRef:
+    def __init__(self, var):
+        self._lock = threading.Lock()
+        self._var = var
 
+    def get(self):
+        with self._lock:
+            return self._var
 
-class TraceSegmentReportService(object):
-    def report(self, generator):
-        raise NotImplementedError()
+    def set(self, new_var):
+        with self._lock:
+            self._var = new_var
 
+    def compare_and_set(self, expect, update) -> bool:
+        """
+        Atomically sets the value to the given updated value if the current value == the expected value
 
-class LogDataReportService(object):
-    def report(self, generator):
-        raise NotImplementedError()
-
-
-class ProfileTaskChannelService(object):
-    def do_query(self):
-        raise NotImplementedError()
+        :return: return True if success, False if the actual value was not equal to the expected value.
+        """
+        with self._lock:
+            if self._var == expect:
+                self._var = update
+                return True
+            return False
diff --git a/skywalking/profile/__init__.py b/skywalking/utils/integer.py
similarity index 64%
copy from skywalking/profile/__init__.py
copy to skywalking/utils/integer.py
index fdf00ab..5c15005 100644
--- a/skywalking/profile/__init__.py
+++ b/skywalking/utils/integer.py
@@ -15,6 +15,20 @@
 # limitations under the License.
 #
 
-from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService
+from skywalking.utils.atomic_ref import AtomicRef
 
-profile_task_execution_service = ProfileTaskExecutionService()
+
+class AtomicInteger(AtomicRef):
+    def __init__(self, var: int):
+        super().__init__(var)
+
+    def add_and_get(self, delta: int):
+        """
+        Atomically adds the given value to the current value.
+
+        :param delta: the value to add
+        :return: the updated value
+        """
+        with self._lock:
+            self._var += delta
+            return self._var
diff --git a/skywalking/profile/__init__.py b/skywalking/utils/time.py
similarity index 83%
copy from skywalking/profile/__init__.py
copy to skywalking/utils/time.py
index fdf00ab..faa70c8 100644
--- a/skywalking/profile/__init__.py
+++ b/skywalking/utils/time.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
-from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService
+import time
 
-profile_task_execution_service = ProfileTaskExecutionService()
+
+def current_milli_time():
+    return round(time.time() * 1000)