You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/17 00:31:15 UTC

[skywalking-python] branch master updated: Refactor agent core startup solo (#89) (#287)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new d5a9506  Refactor agent core startup solo (#89) (#287)
d5a9506 is described below

commit d5a9506b86f2f2603b7cc5a75c04c60156ad526f
Author: Superskyyy (AWAY, busy graduating | Debug 人) <Su...@outlook.com>
AuthorDate: Thu Feb 16 19:31:09 2023 -0500

    Refactor agent core startup solo (#89) (#287)
    
    This PR is made of two tightly coupled parts:
    
    * Total rewrite of agent startup logic from module functions -> singleton class. (some other logic was changed in meter to fix wrong forking behavior)
    * Provide experimental support for os.fork(), exposed as an option.
    * A demo directory to provide easier access to oap/kafka/demoservices (for contributors).
    
    Minor changes:
    
    * Docs: fixed some missed ones over time.
    * Fixed a redis bug.
---
 .github/workflows/CI.yaml                          |   2 +-
 .lift/config.toml                                  |   2 +-
 CHANGELOG.md                                       |   8 +-
 README.md                                          |   2 +-
 demo/README.md                                     |   8 +
 tests/e2e/script/env => demo/__init__.py           |   4 -
 demo/docker-compose.yaml                           | 106 ++++
 demo/flask_consumer_fork.py                        |  47 ++
 .../consumer.py => demo/flask_provider_single.py   |  40 +-
 docs/README.md                                     |   2 +-
 docs/en/contribution/How-to-release.md             |   2 +-
 docs/en/setup/CLI.md                               |  11 +-
 docs/en/setup/Configuration.md                     |   2 +-
 docs/en/setup/Installation.md                      |   5 +-
 docs/en/setup/Intrusive.md                         |  21 +-
 docs/en/setup/Plugins.md                           |   2 +-
 docs/en/setup/advanced/LogReporter.md              |  21 +-
 docs/en/setup/advanced/MeterReporter.md            |  11 +-
 docs/en/setup/faq/How-to-use-with-uwsgi.md         |   7 +-
 docs/menu.yml                                      |   6 +-
 pyproject.toml                                     |   4 +-
 skywalking/agent/__init__.py                       | 562 ++++++++++++---------
 skywalking/bootstrap/loader/sitecustomize.py       |   2 +-
 skywalking/config.py                               |   8 +-
 skywalking/log/sw_logging.py                       |   3 +-
 skywalking/loggings.py                             |   2 +-
 skywalking/meter/__init__.py                       |   8 +-
 skywalking/meter/meter.py                          |  12 +-
 skywalking/meter/meter_service.py                  |   9 +-
 skywalking/meter/pvm/data_source.py                |   2 +-
 skywalking/plugins/sw_kafka.py                     |   2 +-
 skywalking/plugins/sw_loguru.py                    |   3 +-
 skywalking/plugins/sw_redis.py                     | 116 +----
 skywalking/plugins/sw_requests.py                  |   2 +-
 skywalking/plugins/sw_websockets.py                |   1 -
 skywalking/profile/profile_context.py              |   2 +-
 skywalking/profile/profile_service.py              |   4 +-
 skywalking/trace/context.py                        |   8 +-
 .../consumer.py => skywalking/utils/singleton.py   |  35 +-
 tests/e2e/script/env                               |   6 +-
 tests/plugin/base.py                               |   9 +-
 .../plugin/http/sw_websockets/services/consumer.py |   4 +-
 tests/plugin/web/sw_fastapi/services/consumer.py   |   4 +-
 tests/plugin/web/sw_sanic/services/provider.py     |   2 +-
 tests/unit/test_meter.py                           |   5 +-
 45 files changed, 651 insertions(+), 473 deletions(-)

diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml
index 9f974b8..c4d4a8e 100644
--- a/.github/workflows/CI.yaml
+++ b/.github/workflows/CI.yaml
@@ -155,7 +155,7 @@ jobs:
       ( always() && ! cancelled() ) &&
       ((github.event_name == 'schedule' && github.repository == 'apache/skywalking-python') || needs.changes.outputs.agent == 'true')
     runs-on: ubuntu-latest
-    timeout-minutes: 30
+    timeout-minutes: 20
     strategy:
       matrix:
         python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11" ]  
diff --git a/.lift/config.toml b/.lift/config.toml
index c663402..8c2da8f 100644
--- a/.lift/config.toml
+++ b/.lift/config.toml
@@ -16,4 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-ignoreRules = [ "Unused ignore", "Invalid decoration", "blacklist", "Missing argument", "hardcoded_bind_all_interfaces" ]
+ignoreRules = [ "Unused ignore", "Invalid decoration", "blacklist", "Missing argument", "hardcoded_bind_all_interfaces", "B104", "B201" ]
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fa128b9..3484362 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,12 +3,11 @@
 ### 1.0.0
 
 - **Important Note and Breaking Changes:**
-  - Python 3.6 is no longer supported and may not function properly, Python 3.11 support is added and tested.
-  - A number of common configuration options (environment variables) are renamed to follow the convention of Java agent,
+  - **BREAKING**: Python 3.6 is no longer supported and may not function properly, Python 3.11 support is added and tested.
+  - **BREAKING**: A number of common configuration options and environment variables are renamed to follow the convention of Java agent,
   please check with the latest official documentation before upgrading. (#273, #282)
-  
   https://skywalking.apache.org/docs/skywalking-python/v1.0.0/en/setup/configuration/
-  - All agent core capabilities are now covered by test cases and enabled by default (Trace, Log, PVM runtime metrics, Profiler)
+  - **BREAKING**: All agent core capabilities are now covered by test cases and enabled by default (Trace, Log, PVM runtime metrics, Profiler)
 
 
 - Feature:
@@ -20,6 +19,7 @@
   - Add support for the tags of Virtual Cache for Redis (#263)
   - Add a new configuration `kafka_namespace` to prefix the kafka topic names (#277)
   - Add log reporter support for loguru (#276)
+  - Add **experimental** support for explicit os.fork(), restarts agent in new process (#286)
 
 - Plugins:
   - Add aioredis, aiormq, amqp, asyncpg, aio-pika, kombu RMQ plugins (#230 Missing test coverage) 
diff --git a/README.md b/README.md
index 2df7758..c1b0498 100755
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
 
 <img src="http://skywalking.apache.org/assets/logo.svg" alt="Sky Walking logo" height="90px" align="right" />
 
-**SkyWalking-Python**: The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging abilities for Python projects.
+**SkyWalking-Python**: The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging/profiling abilities for Python projects.
 
 **[SkyWalking](https://github.com/apache/skywalking)**: Application performance monitor tool for distributed systems, especially designed for microservices, cloud native and container-based (Kubernetes) architectures.
 
diff --git a/demo/README.md b/demo/README.md
new file mode 100644
index 0000000..0e84fe5
--- /dev/null
+++ b/demo/README.md
@@ -0,0 +1,8 @@
+# Manual Test
+
+Edge cases on advanced features would benefit from a manual testing process.
+
+This directory holds some utils and scripts that are convenient for such use cases.
+
+## Docker-compose.yaml
+This docker-compose.yaml spins up a fresh Apache SkyWalking instance along with UI (localhost:8080) and SW_CTL CLI for you to verify.
diff --git a/tests/e2e/script/env b/demo/__init__.py
similarity index 85%
copy from tests/e2e/script/env
copy to demo/__init__.py
index adb6ad0..ae1e83e 100644
--- a/tests/e2e/script/env
+++ b/demo/__init__.py
@@ -12,7 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-# 2022-11-05 commit
-SW_CTL_COMMIT=521843f963917aa806740a9ad09c65aa59aca179
-SW_OAP_COMMIT=93d021ab0bbffa6cfa73adacdcbbf9e25f8016be
\ No newline at end of file
diff --git a/demo/docker-compose.yaml b/demo/docker-compose.yaml
new file mode 100644
index 0000000..ed1d485
--- /dev/null
+++ b/demo/docker-compose.yaml
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+services:
+  oap:
+    container_name: oap
+    image: apache/skywalking-oap-server:9.3.0
+    # Python agent supports gRPC/ HTTP/ Kafka reporting
+    expose:
+      - 11800 # gRPC
+      - 12800 # HTTP
+    networks:
+      - manual
+    environment:
+      SW_KAFKA_FETCHER: default
+      SW_KAFKA_FETCHER_SERVERS: kafka:9092
+      SW_KAFKA_FETCHER_PARTITIONS: 2
+      SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1
+    healthcheck:
+      test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/11800" ]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    ports:
+      - "12800:12800"
+      - "11800:11800"
+    depends_on:
+      - kafka
+
+
+  ui:
+    image: apache/skywalking-ui:9.3.0
+    container_name: ui
+    depends_on:
+      oap:
+        condition: service_healthy
+    networks:
+      - manual
+    ports:
+      - "8080:8080"
+    environment:
+      SW_OAP_ADDRESS: "http://oap:12800"
+
+  zookeeper:
+    container_name: zk
+    image: confluentinc/cp-zookeeper:latest
+    ports:
+      - "2181:2181"
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      ZOOKEEPER_TICK_TIME: 2000
+    networks:
+      - manual
+
+  kafka:
+    container_name: kafka
+    image: confluentinc/cp-kafka
+    expose:
+      - 9092
+      - 9094
+    ports:
+      - 9092:9092
+      - 9094:9094
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
+      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
+      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+    networks:
+      - manual
+
+#
+#  kafka-ui:
+#    image: provectuslabs/kafka-ui
+#    container_name: kafka-ui
+#    ports:
+#      - "8088:8080"
+#    restart: always
+#    environment:
+#      - KAFKA_CLUSTERS_0_NAME=local
+#      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
+#    depends_on:
+#      - kafka
+#    networks:
+#      - manual
+
+networks:
+  manual:
diff --git a/demo/flask_consumer_fork.py b/demo/flask_consumer_fork.py
new file mode 100644
index 0000000..6981f9a
--- /dev/null
+++ b/demo/flask_consumer_fork.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from flask import Flask
+from skywalking import agent, config
+import requests
+
+# Profiling only available in gRPC, meter only in kafka + grpc
+config.init(agent_collector_backend_services='localhost:11800', agent_protocol='grpc',
+            agent_name='great-app-consumer-grpc',
+            kafka_bootstrap_servers='localhost:9094',  # If you use kafka, set this
+            agent_instance_name='instance-01',
+            agent_experimental_fork_support=True, agent_logging_level='DEBUG', agent_log_reporter_active=True,
+            agent_meter_reporter_active=True,
+            agent_profile_active=True)
+
+agent.start()
+
+parent_pid = os.getpid()
+pid = os.fork()
+
+app = Flask(__name__)
+
+
+@app.route('/', methods=['POST', 'GET'])
+def application():
+    res = requests.get('http://localhost:9999')
+    return res.json()
+
+
+if __name__ == '__main__':
+    PORT = 9097 if pid == 0 else 9098  # 0 is child process
+    app.run(host='0.0.0.0', port=PORT, debug=False)  # RELOADER IS ALSO FORKED
diff --git a/tests/plugin/web/sw_fastapi/services/consumer.py b/demo/flask_provider_single.py
similarity index 51%
copy from tests/plugin/web/sw_fastapi/services/consumer.py
copy to demo/flask_provider_single.py
index 627bf6c..54576f5 100644
--- a/tests/plugin/web/sw_fastapi/services/consumer.py
+++ b/demo/flask_provider_single.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,32 +12,29 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
-import requests
-import websockets
+from flask import Flask, jsonify
+from skywalking import agent, config
 
-import asyncio
+config.init(agent_collector_backend_services='localhost:11800', agent_protocol='grpc',
+            agent_name='great-app-provider-grpc',
+            kafka_bootstrap_servers='localhost:9094',  # If you use kafka, set this
+            agent_instance_name='instance-01',
+            agent_experimental_fork_support=True,
+            agent_logging_level='DEBUG',
+            agent_log_reporter_active=True,
+            agent_meter_reporter_active=True,
+            agent_profile_active=True)
 
-if __name__ == '__main__':
-    from fastapi import FastAPI
-    import uvicorn
 
-    app = FastAPI()
+agent.start()
 
-    @app.get('/users')
-    async def application():
-        res = requests.get('http://provider:9091/users', timeout=5)
-        websocket_pong = await websocket_ping()
-        return {'http': res.json(), 'websocket': websocket_pong}
+app = Flask(__name__)
 
-    async def websocket_ping():
-        async with websockets.connect('ws://provider:9091/ws', extra_headers=None) as websocket:
-            await websocket.send('Ping')
 
-            response = await websocket.recv()
-            await asyncio.sleep(0.5)
+@app.route('/', methods=['POST', 'GET'])
+def application():
+    return jsonify({'status': 'ok'})
 
-            await websocket.close()
-            return response
 
-    uvicorn.run(app, host='0.0.0.0', port=9090)
+if __name__ == '__main__':
+    app.run(host='0.0.0.0', port=9999, debug=True, use_reloader=False)
diff --git a/docs/README.md b/docs/README.md
index b28327f..a31481f 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -2,7 +2,7 @@
 
 **This is the official documentation of SkyWalking Python agent. Welcome to the SkyWalking community!**
 
-The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging abilities for Python projects.
+The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging/profiling abilities for Python projects.
 
 This documentation covers a number of ways to set up the Python agent for various use cases.
 
diff --git a/docs/en/contribution/How-to-release.md b/docs/en/contribution/How-to-release.md
index ea000a1..57181cd 100644
--- a/docs/en/contribution/How-to-release.md
+++ b/docs/en/contribution/How-to-release.md
@@ -201,7 +201,7 @@ Vote result should follow these:
 
     On behalf of the SkyWalking Team, I’m glad to announce that SkyWalking Python $VERSION is now released.
 
-    SkyWalking Python: The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging abilities for Python projects.
+    SkyWalking Python: The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging/profiling abilities for Python projects.
 
     SkyWalking: APM (application performance monitor) tool for distributed systems, especially designed for microservices, cloud native and container-based (Docker, Kubernetes, Mesos) architectures.
 
diff --git a/docs/en/setup/CLI.md b/docs/en/setup/CLI.md
index 9aa712d..51703a0 100644
--- a/docs/en/setup/CLI.md
+++ b/docs/en/setup/CLI.md
@@ -1,9 +1,12 @@
-# SkyWalking Python Agent Command-Line Interface(CLI)
+# SkyWalking Python Agent Command-Line Interface (sw-python CLI)
 
 In releases before 0.7.0, you would at least need to add the following lines to your applications to get the agent attached and running.  
 
+This is the recommended way of running your application with Python agent.
+
 ```python
-from skywalking import agent
+from skywalking import agent, config
+config.init(SomeConfig)
 agent.start()
 ```
 
@@ -17,6 +20,8 @@ just like the [SkyWalking Java Agent](https://github.com/apache/skywalking-java)
 Upon successful [installation of the SkyWalking Python agent via pip](Installation.md#from-pypi),
 a command-line script `sw-python` is installed in your environment (virtual env preferred).
 
+run `sw-python` to see if it is available.
+
 ### The `run` option
 
 Currently, the `sw-python` CLI provides a `run` option, which you can use to execute your applications
@@ -64,7 +69,7 @@ You would normally want to provide additional configurations other than the defa
 The currently supported method is to provide the environment variables listed 
 and explained in the [Environment Variables List](Configuration.md).
 
-#### Through a sw-config.yaml
+#### Through a sw-config.yaml (TBD)
 
 Currently, only environment variable configuration is supported; an optional `yaml` configuration is to be implemented.
 
diff --git a/docs/en/setup/Configuration.md b/docs/en/setup/Configuration.md
index fc94779..249ec02 100644
--- a/docs/en/setup/Configuration.md
+++ b/docs/en/setup/Configuration.md
@@ -18,7 +18,7 @@ export SW_AGENT_YourConfiguration=YourValue
 | Configuration | Environment Variable | Type | Default Value | Description |
 | :------------ | :------------ | :------------ | :------------ | :------------ |
 | agent_collector_backend_services | SW_AGENT_AGENT_COLLECTOR_BACKEND_SERVICES | <class 'str'> | oap_host:oap_port | The backend OAP server address, 11800 is default OAP gRPC port, 12800 is HTTP, Kafka ignores this option and uses kafka_bootstrap_servers option. **This option should be changed accordingly with selected protocol** |
-| protocol | SW_AGENT_PROTOCOL | <class 'str'> | grpc | The protocol to communicate with the backend OAP, `http`, `grpc` or `kafka`, **we highly suggest using `grpc` in production as it's well optimized than `http`**. The `kafka` protocol provides an alternative way to submit data to the backend. |
+| agent_protocol | SW_AGENT_AGENT_PROTOCOL | <class 'str'> | grpc | The protocol to communicate with the backend OAP, `http`, `grpc` or `kafka`, **we highly suggest using `grpc` in production as it's well optimized than `http`**. The `kafka` protocol provides an alternative way to submit data to the backend. |
 | agent_name | SW_AGENT_AGENT_NAME | <class 'str'> | Python Service Name | The name of your awesome Python service |
 | agent_instance_name | SW_AGENT_AGENT_INSTANCE_NAME | <class 'str'> | str(uuid.uuid1()).replace('-', '') | The name of this particular awesome Python service instance |
 | agent_namespace | SW_AGENT_AGENT_NAMESPACE | <class 'str'> |  | The agent namespace of the Python service (available as tag and the suffix of service name) |
diff --git a/docs/en/setup/Installation.md b/docs/en/setup/Installation.md
index 0ef9ee1..ff42bc6 100644
--- a/docs/en/setup/Installation.md
+++ b/docs/en/setup/Installation.md
@@ -6,7 +6,7 @@ You can install the SkyWalking Python agent via various ways described next.
 
 > **Already installed? Check out easy ways to start the agent in your application**
 
-> [Non-intrusive](CLI.md) | [Intrusive <minimal>](Intrusive.md) | [Containerization](Container.md) 
+> [Non-intrusive <Recommended>](CLI.md) | [Intrusive <minimal>](Intrusive.md) | [Containerization](Container.md) 
 
 > **All available configurations are listed [here](Configuration.md)**
 
@@ -36,6 +36,9 @@ from where you can use `pip` to install:
 # Install the latest version, using the default gRPC protocol to report data to OAP
 pip install "apache-skywalking"
 
+# Install support for every protocol (gRPC, HTTP, Kafka)
+pip install "apache-skywalking[all]"
+
 # Install the latest version, using the http protocol to report data to OAP
 pip install "apache-skywalking[http]"
 
diff --git a/docs/en/setup/Intrusive.md b/docs/en/setup/Intrusive.md
index 743bb06..089579e 100644
--- a/docs/en/setup/Intrusive.md
+++ b/docs/en/setup/Intrusive.md
@@ -7,52 +7,55 @@ which is by importing SkyWalking into your project and starting the agent.
 By default, SkyWalking Python agent uses gRPC protocol to report data to SkyWalking backend,
 in SkyWalking backend, the port of gRPC protocol is `11800`, and the port of HTTP protocol is `12800`,
 
-You could configure `collector_address` (or environment variable `SW_AGENT_COLLECTOR_BACKEND_SERVICES`)
-and set `protocol` (or environment variable `SW_AGENT_PROTOCOL` to one of
+See all default configuration values in the [Configuration Vocabulary](Configuration.md)
+
+You could configure `agent_collector_backend_services` (or environment variable `SW_AGENT_COLLECTOR_BACKEND_SERVICES`)
+and set `agent_protocol` (or environment variable `SW_AGENT_PROTOCOL` to one of
 `gprc`, `http` or `kafka` according to the protocol you would like to use.
 
 ### Report data via gRPC protocol (Default)
 
-For example, if you want to use gRPC protocol to report data, configure `collector_address`
+For example, if you want to use gRPC protocol to report data, configure `agent_collector_backend_services`
 (or environment variable `SW_AGENT_COLLECTOR_BACKEND_SERVICES`) to `<oap-ip-or-host>:11800`,
 such as `127.0.0.1:11800`:
 
 ```python
 from skywalking import agent, config
 
-config.init(collector_address='127.0.0.1:11800', agent_name='your awesome service')
+config.init(agent_collector_backend_services='127.0.0.1:11800', agent_name='your awesome service', agent_instance_name='your-instance-name or <generated uuid>')
 
 agent.start()
 ```
 
 ### Report data via HTTP protocol
 
-However, if you want to use HTTP protocol to report data, configure `collector_address`
+However, if you want to use HTTP protocol to report data, configure `agent_collector_backend_services`
 (or environment variable `SW_AGENT_COLLECTOR_BACKEND_SERVICES`) to `<oap-ip-or-host>:12800`,
-such as `127.0.0.1:12800`, further set `protocol` (or environment variable `SW_AGENT_PROTOCOL` to `http`):
+such as `127.0.0.1:12800`, further set `agent_protocol` (or environment variable `SW_AGENT_PROTOCOL` to `http`):
 
 > Remember you should install `skywalking-python` with extra requires `http`, `pip install "apache-skywalking[http]`.
 
 ```python
 from skywalking import agent, config
 
-config.init(collector_address='127.0.0.1:12800', agent_name='your awesome service', protocol='http')
+config.init(agent_collector_backend_services='127.0.0.1:12800', agent_name='your awesome service', agent_protocol='http', agent_instance_name='your-instance-name or <generated uuid>')
 
 agent.start()
 ```
 
 ### Report data via Kafka protocol
+**Please make sure OAP is consuming the same Kafka topic as your agent produces to, `kafka_namespace` must match OAP side configuration `plugin.kafka.namespace`**
 
 Finally, if you want to use Kafka protocol to report data, configure `kafka_bootstrap_servers`
 (or environment variable `SW_KAFKA_BOOTSTRAP_SERVERS`) to `kafka-brokers`,
-such as `127.0.0.1:9200`, further set `protocol` (or environment variable `SW_AGENT_PROTOCOL` to `kafka`):
+such as `127.0.0.1:9200`, further set `agent_protocol` (or environment variable `SW_AGENT_PROTOCOL` to `kafka`):
 
 > Remember you should install `skywalking-python` with extra requires `kafka`, `pip install "apache-skywalking[kafka]"`.
 
 ```python
 from skywalking import agent, config
 
-config.init(kafka_bootstrap_servers='127.0.0.1:9200', agent_name='your awesome service', protocol='kafka')
+config.init(kafka_bootstrap_servers='127.0.0.1:9200', agent_name='your awesome service', agent_protocol='kafka', agent_instance_name='your-instance-name or <generated uuid>')
 
 agent.start()
 ```
diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md
index 1947608..dc61b2b 100644
--- a/docs/en/setup/Plugins.md
+++ b/docs/en/setup/Plugins.md
@@ -39,7 +39,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!)
 | [pymysql](https://pymysql.readthedocs.io/en/latest/) | Python >=3.7 - ['1.0'];  | `sw_pymysql` |
 | [pyramid](https://trypyramid.com) | Python >=3.7 - ['1.10', '2.0'];  | `sw_pyramid` |
 | [pika](https://pika.readthedocs.io) | Python >=3.7 - ['1.2'];  | `sw_rabbitmq` |
-| [redis](https://github.com/andymccurdy/redis-py/) | Python >=3.7 - ['3.5'];  | `sw_redis` |
+| [redis](https://github.com/andymccurdy/redis-py/) | Python >=3.7 - ['3.5.*', '4.5.1'];  | `sw_redis` |
 | [requests](https://requests.readthedocs.io/en/master/) | Python >=3.7 - ['2.26', '2.25'];  | `sw_requests` |
 | [sanic](https://sanic.readthedocs.io/en/latest) | Python >=3.10 - NOT SUPPORTED YET; Python >=3.7 - ['20.12'];  | `sw_sanic` |
 | [tornado](https://www.tornadoweb.org) | Python >=3.7 - ['6.0', '6.1'];  | `sw_tornado` |
diff --git a/docs/en/setup/advanced/LogReporter.md b/docs/en/setup/advanced/LogReporter.md
index bbe2c44..53cd873 100644
--- a/docs/en/setup/advanced/LogReporter.md
+++ b/docs/en/setup/advanced/LogReporter.md
@@ -2,18 +2,9 @@
 
 This functionality reports logs collected from the Python logging module (in theory, also logging libraries depending on the core logging module) and loguru module.
 
-To utilize this feature, you will need to add some new configurations to the agent initialization step.
+From Python agent 1.0.0, the log reporter is automatically enabled and can be disabled through `agent_log_reporter_active=False` or `SW_AGENT_LOG_REPORTER_ACTIVE=False`.
 
-## Enabling the feature
-```Python 
-from skywalking import agent, config
-
-config.init(collector_address='127.0.0.1:11800', agent_name='your awesome service',
-                log_reporter_active=True)  # defaults to grpc protocol
-agent.start()
-``` 
-
-Log reporter supports all three protocols including `grpc`, `http` and `kafka`, which shares the same config `protocol` with trace reporter.
+Log reporter supports all three protocols including `grpc`, `http` and `kafka`, which shares the same config `agent_protocol` with trace reporter.
 
 If chosen `http` protocol, the logs will be batch-reported to the collector REST endpoint `oap/v3/logs`.
 
@@ -21,12 +12,14 @@ If chosen `kafka` protocol, please make sure to config
 [kafka-fetcher](https://skywalking.apache.org/docs/main/v9.1.0/en/setup/backend/kafka-fetcher/) 
 on the OAP side, and make sure Python agent config `kafka_bootstrap_servers` points to your Kafka brokers.
 
-`log_reporter_active=True` - Enables the log reporter.
+**Please make sure OAP is consuming the same Kafka topic as your agent produces to, `kafka_namespace` must match OAP side configuration `plugin.kafka.namespace`**
+
+`agent_log_reporter_active=True` - Enables the log reporter.
 
-`log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped.
+`agent_log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped.
 
 Alternatively, you can pass configurations through environment variables. 
-Please refer to the [Environment Variables List](../Configuration.md) for the list of environment variables associated with the log reporter.
+Please refer to the [Configuration Vocabulary](../Configuration.md) for the list of environment variables associated with the log reporter.
 
 ## Specify a logging level
 Only the logs with a level equal to or higher than the specified will be collected and reported. 
diff --git a/docs/en/setup/advanced/MeterReporter.md b/docs/en/setup/advanced/MeterReporter.md
index f6a6d85..5697e65 100644
--- a/docs/en/setup/advanced/MeterReporter.md
+++ b/docs/en/setup/advanced/MeterReporter.md
@@ -1,12 +1,15 @@
 # Python Agent Meter Reporter
 
-To enable or disable this feature, you will need to set some environment variables.
-
 **Important Note**: Meter reporter is currently available to send in `gRPC` and `Kafka` protocol, 
 `HTTP` protocol is not implemented yet (requires additional handler on SkyWalking OAP side).
 
-## Enabling the feature (default)
+## Enabling the feature (default is enabled)
+**PVM Reporter is also by default enabled, meaning useful Python metrics such as thread count/GC info will be shown in OAP General Services - Instance - PVM Tab)**
+If you really don't need such a feature, disable them through `config.agent_pvm_meter_reporter_active` or `SW_AGENT_PVM_METER_REPORTER_ACTIVE`
+
 ```Python 
+config.agent_meter_reporter_active = True
+# Or
 os.environ['SW_AGENT_METER_REPORTER_ACTIVE'] = 'True'
 ``` 
 or
@@ -88,7 +91,7 @@ h = builder.build()
 builder = Histogram.Builder('h3', [i / 10 for i in range(10)])
 h = builder.build()
 
-# Histogram h will record the time the with-wrapped codes consumed
+# Histogram h will record the time the with-wprapped codes consumed
 with h.create_timer():
     # some codes may consume a certain time
 ```
diff --git a/docs/en/setup/faq/How-to-use-with-uwsgi.md b/docs/en/setup/faq/How-to-use-with-uwsgi.md
index 2bd12ac..d6d8361 100644
--- a/docs/en/setup/faq/How-to-use-with-uwsgi.md
+++ b/docs/en/setup/faq/How-to-use-with-uwsgi.md
@@ -1,4 +1,4 @@
-# How to use with uWSGI ?
+# How to use with uWSGI?
 
 [uWSGI](https://uwsgi-docs.readthedocs.io/en/latest/) is popular in the Python ecosystem. It is a lightweight, fast, and easy-to-use web server.
 
@@ -13,11 +13,12 @@ The following is an example of the use of uWSGI and flask, the initialization pa
 ```python
 # main.py
 from uwsgidecorators import postfork
-from skywalking import agent, config
 
 @postfork
 def init_tracing():
-    config.init(collector_address='127.0.0.1:11800', agent_name='your awesome service')
+    # Important: The import of skywalking should be inside the postfork function
+    from skywalking import agent, config
+    config.init(agent_collector_backend_services='127.0.0.1:11800', agent_name='your awesome service')
 
     agent.start()
 
diff --git a/docs/menu.yml b/docs/menu.yml
index 1017bd1..a5232d8 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -22,9 +22,9 @@ catalog:
     path: "/en/setup/installation"
   - name: "Integration"
     catalog:
-      - name: "Non-Intrusive Setup"
+      - name: "Non-Intrusive Setup (Recommended)"
         path: "/en/setup/CLI"
-      - name: "Legacy Setup"
+      - name: "Legacy Setup (Fallback)"
         path: "/en/setup/Intrusive"
       - name: "Containerized Setup"
         path: "/en/setup/Container"
@@ -38,7 +38,7 @@ catalog:
             path: "/en/setup/advanced/MeterReporter"
           - name: "Manual Trace Instrumentation"
             path: "/en/setup/advanced/API"
-  - name: "Plugins"
+  - name: "Supported Plugins"
     catalog:
       - name: "Supported Libraries"
         path: "/en/setup/Plugins"
diff --git a/pyproject.toml b/pyproject.toml
index c1a99ef..4f6d574 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -17,7 +17,7 @@
 [tool.poetry]
 name = "apache-skywalking"
 version = "0.8.0"
-description = "The Python Agent for Apache SkyWalking, which provides the native tracing/metrics/logging abilities for Python projects."
+description = "The Python Agent for Apache SkyWalking, which provides the native tracing/metrics/logging/profiling abilities for Python projects."
 license = "Apache-2.0"
 authors = ["Apache Software Foundation <de...@skywalking.apache.org>"]
 maintainers = ["Apache SkyWalking Community <de...@skywalking.apache.org>"]
@@ -131,7 +131,7 @@ loguru = "^0.6.0"
 httpx = "^0.23.3"
 
 [tool.poetry.group.lint.dependencies]
-pylint = '*'
+pylint = '2.13.9'
 flake8 = "^5.0.4"
 # isort = "^5.10.1"
 unify = "^0.5"
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 77d590b..e115328 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -16,269 +16,353 @@
 #
 
 import atexit
+import functools
+import os
+import sys
 from queue import Queue, Full
 from threading import Thread, Event
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
 
 from skywalking import config, plugins
 from skywalking import loggings
-from skywalking import profile
 from skywalking import meter
+from skywalking import profile
 from skywalking.agent.protocol import Protocol
 from skywalking.command import command_service
 from skywalking.loggings import logger
 from skywalking.profile.profile_task import ProfileTask
 from skywalking.profile.snapshot import TracingThreadSnapshot
-from skywalking.protocol.logging.Logging_pb2 import LogData
 from skywalking.protocol.language_agent.Meter_pb2 import MeterData
+from skywalking.protocol.logging.Logging_pb2 import LogData
+from skywalking.utils.singleton import Singleton
 
 if TYPE_CHECKING:
     from skywalking.trace.context import Segment
 
-__started = False
-__protocol = None  # type: Protocol
-__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \
-    = __send_profile_thread = __queue = __log_queue = __snapshot_queue = __meter_queue = __finished = None
-
-
-def __heartbeat():
-    wait = base = 30
-
-    while not __finished.is_set():
-        try:
-            __protocol.heartbeat()
-            wait = base  # reset to base wait time on success
-        except Exception as exc:
-            logger.error(str(exc))
-            wait = min(60, wait * 2 or 1)  # double wait time with each consecutive error up to a maximum
-
-        __finished.wait(wait)
-
 
-def __report():
-    wait = base = 0
-
-    while not __finished.is_set():
+def report_with_backoff(reporter_name, init_wait):
+    """
+    An exponential backoff for retrying reporters.
+    """
+
+    def backoff_decorator(func):
+        @functools.wraps(func)
+        def backoff_wrapper(self, *args, **kwargs):
+            wait = base = init_wait
+            while not self._finished.is_set():
+                try:
+                    func(self, *args, **kwargs)
+                    wait = base  # reset to base wait time on success
+                except Exception:  # noqa
+                    wait = min(60, wait * 2 or 1)  # double wait time with each consecutive error up to a maximum
+                    logger.exception(f'Exception in {reporter_name} service in pid {os.getpid()}, '
+                                     f'retry in {wait} seconds')
+                self._finished.wait(wait)
+            logger.info('finished reporter thread')
+        return backoff_wrapper
+    return backoff_decorator
+
+
+class SkyWalkingAgent(Singleton):
+    """
+    The main singleton class and entrypoint of SkyWalking Python Agent.
+    Upon fork(), original instance rebuild everything (queues, threads, instrumentation) by
+    calling the fork handlers in the class instance.
+    """
+    __started: bool = False  # shared by all instances
+
+    def __init__(self):
+        """
+        Protocol is one of gRPC, HTTP and Kafka that
+        provides clients to reporters to communicate with OAP backend.
+        """
+        self.started_pid = None
+        self.__protocol: Optional[Protocol] = None
+        self._finished: Optional[Event] = None
+
+    def __bootstrap(self):
+        # when forking, already instrumented modules must not be instrumented again
+        # otherwise it will cause double instrumentation! (we should provide an un-instrument method)
+        if config.agent_protocol == 'grpc':
+            from skywalking.agent.protocol.grpc import GrpcProtocol
+            self.__protocol = GrpcProtocol()
+        elif config.agent_protocol == 'http':
+            from skywalking.agent.protocol.http import HttpProtocol
+            self.__protocol = HttpProtocol()
+        elif config.agent_protocol == 'kafka':
+            from skywalking.agent.protocol.kafka import KafkaProtocol
+            self.__protocol = KafkaProtocol()
+
+        # Initialize queues for segment, log, meter and profiling snapshots
+        self.__segment_queue: Optional[Queue] = None
+        self.__log_queue: Optional[Queue] = None
+        self.__meter_queue: Optional[Queue] = None
+        self.__snapshot_queue: Optional[Queue] = None
+
+        # Start reporter threads and register queues
+        self.__init_threading()
+
+    def __init_threading(self) -> None:
+        """
+        This method initializes all the queues and threads for the agent and reporters.
+        Upon os.fork(), callback will reinitialize threads and queues by calling this method
+
+        Heartbeat thread is started by default.
+        Segment reporter thread and segment queue is created by default.
+        All other queues and threads depends on user configuration.
+        """
+        self._finished = Event()
+
+        __heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True)
+        __heartbeat_thread.start()
+
+        self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
+        __segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True)
+        __segment_report_thread.start()
+
+        if config.agent_meter_reporter_active:
+            self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
+            __meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True)
+            __meter_report_thread.start()
+
+            if config.agent_pvm_meter_reporter_active:
+                from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource
+                from skywalking.meter.pvm.gc_data import GCDataSource
+                from skywalking.meter.pvm.mem_usage import MEMUsageDataSource
+                from skywalking.meter.pvm.thread_data import ThreadDataSource
+
+                MEMUsageDataSource().register()
+                CPUUsageDataSource().register()
+                GCDataSource().register()
+                ThreadDataSource().register()
+
+        if config.agent_log_reporter_active:
+            self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size)
+            __log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True)
+            __log_report_thread.start()
+
+        if config.agent_profile_active:
+            # Now only profiler receives commands from OAP
+            __command_dispatch_thread = Thread(name='CommandDispatchThread', target=self.__command_dispatch,
+                                               daemon=True)
+            __command_dispatch_thread.start()
+
+            self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
+
+            __query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command,
+                                            daemon=True)
+            __query_profile_thread.start()
+
+            __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=self.__send_profile_snapshot,
+                                           daemon=True)
+            __send_profile_thread.start()
+
+    @staticmethod  # for now
+    def __fork_before() -> None:
+        """
+        This handles explicit fork() calls. The child process will not have a running thread, so we need to
+        revive all of them. The parent process will continue to run as normal.
+
+        This does not affect pre-forking server support, which are handled separately.
+        """
+        # possible deadlock would be introduced if some queue is in use when fork() is called and
+        # therefore child process will inherit a locked queue. To avoid this and have side benefit
+        # of a clean queue in child process (prevent duplicated reporting), we simply restart the agent and
+        # reinitialize all queues and threads.
+        logger.warning('SkyWalking Python agent fork support is currently experimental, '
+                       'please report issues if you encounter any.')
+
+    @staticmethod  # for now
+    def __fork_after_in_parent() -> None:
+        """
+        Something to do after fork() in parent process
+        """
+        ...
+
+    def __fork_after_in_child(self) -> None:
+        """
+        Simply restart the agent after we detect a fork() call
+        """
+        self.start()
+        logger.info('SkyWalking Python agent spawned in child after fork() call.')
+
+    def start(self) -> None:
+        """
+        Start would be called by user or os.register_at_fork() callback
+        Start will proceed if and only if the agent is not started in the
+        current process.
+
+        When os.fork(), the service instance should be changed to a new one by appending pid.
+        """
+        python_version: tuple = sys.version_info[:2]
+        if python_version[0] < 3 and python_version[1] < 7:
+            # agent may or may not work for Python 3.6 and below
+            # since 3.6 is EOL, we will not officially support it
+            logger.warning('SkyWalking Python agent does not support Python 3.6 and below, '
+                           'please upgrade to Python 3.7 or above.')
+        # Below is required for grpcio to work with fork()
+        # https://github.com/grpc/grpc/blob/master/doc/fork_support.md
+        # This is not available in Python 3.7 due to frequent hanging issue
+        # It doesn't mean other Python versions will not hang, but chances seem low
+        # https://github.com/grpc/grpc/issues/18075
+        if config.agent_protocol == 'grpc' and config.agent_experimental_fork_support:
+            python_version: tuple = sys.version_info[:2]
+            if python_version[0] == 3 and python_version[1] == 7:
+                raise RuntimeError('gRPC fork support is not safe on Python 3.7 and can cause subprocess hanging. '
+                                   'See: https://github.com/grpc/grpc/issues/18075.'
+                                   'Please either upgrade to Python 3.8+ (though hanging could still happen but rare), '
+                                   'or use HTTP/Kafka protocol, or disable experimental fork support.')
+
+            os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true'
+            os.environ['GRPC_POLL_STRATEGY'] = 'poll'
+
+        if not self.__started:
+            # if not already started, start the agent
+            self.__started = True
+            # Install logging plugins
+            # TODO - Add support for printing traceID/ context in logs
+            if config.agent_log_reporter_active:
+                from skywalking import log
+                log.install()
+            # Here we install all other lib plugins on first time start (parent process)
+            plugins.install()
+        elif self.__started and os.getpid() == self.started_pid:
+            # if already started, and this is the same process, raise an error
+            raise RuntimeError('SkyWalking Python agent has already been started in this process')
+        else:
+            # otherwise we assume a fork() happened, give it a new service instance name
+            logger.info('New process detected, re-initializing SkyWalking Python agent')
+            # Note: this is for experimental change, default config should never reach here
+            # Fork support is controlled by config.agent_fork_support :default: False
+            # Important: This does not impact pre-forking server support (uwsgi, gunicorn, etc...)
+            # This is only for explicit long-running fork() calls.
+            config.agent_instance_name = f'{config.agent_instance_name}-child-{os.getpid()}'
+
+        self.started_pid = os.getpid()
+
+        flag = False
         try:
-            __protocol.report_segment(__queue)  # is blocking actually, blocks for max config.queue_timeout seconds
-            wait = base
-        except Exception as exc:
-            logger.error(str(exc))
-            wait = min(60, wait * 2 or 1)
-
-        __finished.wait(wait)
-
-
-def __report_log():
-    wait = base = 0
-
-    while not __finished.is_set():
+            from gevent import monkey
+            flag = monkey.is_module_patched('socket')
+        except ModuleNotFoundError:
+            logger.debug("it was found that no gevent was used, if you don't use, please ignore.")
+        if flag:
+            import grpc.experimental.gevent as grpc_gevent
+            grpc_gevent.init_gevent()
+
+        loggings.init()
+        config.finalize()
+        profile.init()
+        meter.init(force=True)  # force re-init after fork()
+
+        self.__bootstrap()  # calls init_threading
+
+        atexit.register(self.__fini)
+
+        if config.agent_experimental_fork_support:
+            if hasattr(os, 'register_at_fork'):
+                os.register_at_fork(before=self.__fork_before, after_in_parent=self.__fork_after_in_parent,
+                                    after_in_child=self.__fork_after_in_child)
+
+    def __fini(self):
+        """
+        This method is called when the agent is shutting down.
+        Clean up all the queues and threads.
+        """
+        self.__protocol.report_segment(self.__segment_queue, False)
+        self.__segment_queue.join()
+
+        if config.agent_log_reporter_active:
+            self.__protocol.report_log(self.__log_queue, False)
+            self.__log_queue.join()
+
+        if config.agent_profile_active:
+            self.__protocol.report_snapshot(self.__snapshot_queue, False)
+            self.__snapshot_queue.join()
+
+        if config.agent_meter_reporter_active:
+            self.__protocol.report_meter(self.__meter_queue, False)
+            self.__meter_queue.join()
+
+        self._finished.set()
+
+    def stop(self) -> None:
+        """
+        Stops the agent and reset the started flag.
+        """
+        atexit.unregister(self.__fini)
+        self.__fini()
+        self.__started = False
+
+    @report_with_backoff(reporter_name='heartbeat', init_wait=config.agent_collector_heartbeat_period)
+    def __heartbeat(self) -> None:
+        self.__protocol.heartbeat()
+
+    @report_with_backoff(reporter_name='segment', init_wait=0)
+    def __report_segment(self) -> None:
+        if not self.__segment_queue.empty():
+            self.__protocol.report_segment(self.__segment_queue)
+
+    @report_with_backoff(reporter_name='log', init_wait=0)
+    def __report_log(self) -> None:
+        if not self.__log_queue.empty():
+            self.__protocol.report_log(self.__log_queue)
+
+    @report_with_backoff(reporter_name='meter', init_wait=config.agent_meter_reporter_period)
+    def __report_meter(self) -> None:
+        if not self.__meter_queue.empty():
+            self.__protocol.report_meter(self.__meter_queue)
+
+    @report_with_backoff(reporter_name='profile_snapshot', init_wait=0.5)
+    def __send_profile_snapshot(self) -> None:
+        if not self.__snapshot_queue.empty():
+            self.__protocol.report_snapshot(self.__snapshot_queue)
+
+    @report_with_backoff(reporter_name='query_profile_command',
+                         init_wait=config.agent_collector_get_profile_task_interval)
+    def __query_profile_command(self) -> None:
+        self.__protocol.query_profile_commands()
+
+    @staticmethod
+    def __command_dispatch() -> None:
+        # command dispatch will stuck when there are no commands
+        command_service.dispatch()
+
+    def is_segment_queue_full(self):
+        return self.__segment_queue.full()
+
+    def archive_segment(self, segment: 'Segment'):
+        try:  # unlike checking __queue.full() then inserting, this is atomic
+            self.__segment_queue.put(segment, block=False)
+        except Full:
+            logger.warning('the queue is full, the segment will be abandoned')
+
+    def archive_log(self, log_data: 'LogData'):
         try:
-            __protocol.report_log(__log_queue)
-            wait = base
-        except Exception as exc:
-            logger.error(str(exc))
-            wait = min(60, wait * 2 or 1)
-
-        __finished.wait(wait)
+            self.__log_queue.put(log_data, block=False)
+        except Full:
+            logger.warning('the queue is full, the log will be abandoned')
 
-
-def __send_profile_snapshot():
-    wait = base = 0.5
-
-    while not __finished.is_set():
+    def archive_meter(self, meter_data: 'MeterData'):
         try:
-            __protocol.report_snapshot(__snapshot_queue)
-            wait = base
-        except Exception as exc:
-            logger.error(str(exc))
-            wait = min(60, wait * 2 or 1)
-
-        __finished.wait(wait)
-
+            self.__meter_queue.put(meter_data, block=False)
+        except Full:
+            logger.warning('the queue is full, the meter will be abandoned')
 
-def __query_profile_command():
-    wait = base = config.agent_collector_get_profile_task_interval
-
-    while not __finished.is_set():
+    def add_profiling_snapshot(self, snapshot: TracingThreadSnapshot):
         try:
-            __protocol.query_profile_commands()
-            wait = base
-        except Exception as exc:
-            logger.error(str(exc))
-            wait = min(60, wait * 2 or 1)
-
-        __finished.wait(wait)
-
-
-def __report_meter():
-    wait = base = 1
+            self.__snapshot_queue.put(snapshot)
+        except Full:
+            logger.warning('the snapshot queue is full, the snapshot will be abandoned')
 
-    while not __finished.is_set():
+    def notify_profile_finish(self, task: ProfileTask):
         try:
-            __protocol.report_meter(__meter_queue)  # is blocking actually, blocks for max config.queue_timeout seconds
-            wait = base
-        except Exception as exc:
-            logger.error(str(exc))
-            wait = min(60, wait * 2 or 1)
-
-        __finished.wait(wait)
-
-
-def __command_dispatch():
-    # command dispatch will stuck when there are no commands
-    command_service.dispatch()
-
-
-def __init_threading():
-    global __heartbeat_thread, __report_thread, __log_report_thread, __query_profile_thread, \
-        __command_dispatch_thread, __send_profile_thread, __queue, __log_queue, __snapshot_queue, __meter_queue, __finished
-
-    __queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
-    __finished = Event()
-    __heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
-    __report_thread = Thread(name='ReportThread', target=__report, daemon=True)
-    __command_dispatch_thread = Thread(name='CommandDispatchThread', target=__command_dispatch, daemon=True)
-
-    __heartbeat_thread.start()
-    __report_thread.start()
-    __command_dispatch_thread.start()
-
-    if config.agent_meter_reporter_active:
-        __meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
-        __meter_report_thread = Thread(name='MeterReportThread', target=__report_meter, daemon=True)
-        __meter_report_thread.start()
-
-        if config.agent_pvm_meter_reporter_active:
-            from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource
-            from skywalking.meter.pvm.gc_data import GCDataSource
-            from skywalking.meter.pvm.mem_usage import MEMUsageDataSource
-            from skywalking.meter.pvm.thread_data import ThreadDataSource
-
-            MEMUsageDataSource().registry()
-            CPUUsageDataSource().registry()
-            GCDataSource().registry()
-            ThreadDataSource().registry()
-
-    if config.agent_log_reporter_active:
-        __log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size)
-        __log_report_thread = Thread(name='LogReportThread', target=__report_log, daemon=True)
-        __log_report_thread.start()
-
-    if config.agent_profile_active:
-        __snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
-
-        __query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
-        __query_profile_thread.start()
-
-        __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=__send_profile_snapshot, daemon=True)
-        __send_profile_thread.start()
-
-
-def __init():
-    global __protocol
-    if config.protocol == 'grpc':
-        from skywalking.agent.protocol.grpc import GrpcProtocol
-        __protocol = GrpcProtocol()
-    elif config.protocol == 'http':
-        from skywalking.agent.protocol.http import HttpProtocol
-        __protocol = HttpProtocol()
-    elif config.protocol == 'kafka':
-        from skywalking.agent.protocol.kafka import KafkaProtocol
-        __protocol = KafkaProtocol()
-
-    plugins.install()
-    if config.agent_log_reporter_active:  # todo - Add support for printing traceID/ context in logs
-        from skywalking import log
-        log.install()
-
-    __init_threading()
-
-
-def __fini():
-    __protocol.report_segment(__queue, False)
-    __queue.join()
-
-    if config.agent_log_reporter_active:
-        __protocol.report_log(__log_queue, False)
-        __log_queue.join()
-
-    if config.agent_profile_active:
-        __protocol.report_snapshot(__snapshot_queue, False)
-        __snapshot_queue.join()
-
-    __finished.set()
-
-
-def start():
-    global __started
-    if __started:
-        return
-    __started = True
-
-    flag = False
-    try:
-        from gevent import monkey
-        flag = monkey.is_module_patched('socket')
-    except ModuleNotFoundError:
-        logger.debug("it was found that no gevent was used, if you don't use, please ignore.")
-    if flag:
-        import grpc.experimental.gevent as grpc_gevent
-        grpc_gevent.init_gevent()
-
-    loggings.init()
-    config.finalize()
-    profile.init()
-    meter.init()
-
-    __init()
-
-    atexit.register(__fini)
-
-
-def stop():
-    atexit.unregister(__fini)
-    __fini()
-
-
-def started():
-    return __started
-
-
-def isfull():
-    return __queue.full()
-
-
-def archive(segment: 'Segment'):
-    try:  # unlike checking __queue.full() then inserting, this is atomic
-        __queue.put(segment, block=False)
-    except Full:
-        logger.warning('the queue is full, the segment will be abandoned')
-
-
-def archive_log(log_data: 'LogData'):
-    try:
-        __log_queue.put(log_data, block=False)
-    except Full:
-        logger.warning('the queue is full, the log will be abandoned')
-
-
-def archive_meter(meterdata: 'MeterData'):
-    try:
-        __meter_queue.put(meterdata, block=False)
-    except Full:
-        logger.warning('the queue is full, the meter will be abandoned')
-
-
-def add_profiling_snapshot(snapshot: TracingThreadSnapshot):
-    try:
-        __snapshot_queue.put(snapshot)
-    except Full:
-        logger.warning('the snapshot queue is full, the snapshot will be abandoned')
+            self.__protocol.notify_profile_task_finish(task)
+        except Exception as e:
+            logger.error(f'notify profile task finish to backend fail. {str(e)}')
 
 
-def notify_profile_finish(task: ProfileTask):
-    try:
-        __protocol.notify_profile_task_finish(task)
-    except Exception as e:
-        logger.error(f'notify profile task finish to backend fail. {str(e)}')
+# Export for user (backwards compatibility)
+# so users still use `from skywalking import agent`
+agent = SkyWalkingAgent()
+start = agent.start
diff --git a/skywalking/bootstrap/loader/sitecustomize.py b/skywalking/bootstrap/loader/sitecustomize.py
index bf24ba1..79a72ce 100644
--- a/skywalking/bootstrap/loader/sitecustomize.py
+++ b/skywalking/bootstrap/loader/sitecustomize.py
@@ -41,7 +41,7 @@ def _get_sw_loader_logger():
     from logging import getLogger
     logger = getLogger('skywalking-loader')
     ch = logging.StreamHandler()
-    formatter = logging.Formatter('%(name)s [%(threadName)s] [%(levelname)s] %(message)s')
+    formatter = logging.Formatter('%(name)s [pid:%(process)d] [%(threadName)s] [%(levelname)s] %(message)s')
     ch.setFormatter(formatter)
     logger.addHandler(ch)
     logger.propagate = False
diff --git a/skywalking/config.py b/skywalking/config.py
index 67c5ac6..ef05df3 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -49,7 +49,7 @@ agent_collector_backend_services: str = os.getenv('SW_AGENT_COLLECTOR_BACKEND_SE
 # The protocol to communicate with the backend OAP, `http`, `grpc` or `kafka`, **we highly suggest using `grpc` in
 # production as it's well optimized than `http`**. The `kafka` protocol provides an alternative way to submit data to
 # the backend.
-protocol: str = os.getenv('SW_AGENT_PROTOCOL', 'grpc').lower()
+agent_protocol: str = os.getenv('SW_AGENT_PROTOCOL', 'grpc').lower()
 # The name of your awesome Python service
 agent_name: str = os.getenv('SW_AGENT_NAME', 'Python Service Name')
 # The name of this particular awesome Python service instance
@@ -213,7 +213,7 @@ def init(**kwargs) -> None:
     """
     Used to initialize the configuration of the SkyWalking Python Agent.
     Refer to the official online documentation
-    https://skywalking.apache.org/docs/skywalking-python/next/en/setup/configurations/
+    https://skywalking.apache.org/docs/skywalking-python/next/en/setup/configuration/
     for more information on the configuration options.
 
     Args:
@@ -233,12 +233,12 @@ def finalize_feature() -> None:
     """
     global agent_profile_active, agent_meter_reporter_active
 
-    if protocol == 'http' and (agent_profile_active or agent_meter_reporter_active):
+    if agent_protocol == 'http' and (agent_profile_active or agent_meter_reporter_active):
         agent_profile_active = False
         agent_meter_reporter_active = False
         warnings.warn('HTTP protocol does not support meter reporter and profiler. Please use gRPC protocol if you '
                       'would like to use both features.')
-    elif protocol == 'kafka' and agent_profile_active:
+    elif agent_protocol == 'kafka' and agent_profile_active:
         agent_profile_active = False
         warnings.warn('Kafka protocol does not support profiler. Please use gRPC protocol if you would like to use '
                       'this feature.')
diff --git a/skywalking/log/sw_logging.py b/skywalking/log/sw_logging.py
index 3c0e99d..1c88aea 100644
--- a/skywalking/log/sw_logging.py
+++ b/skywalking/log/sw_logging.py
@@ -17,7 +17,8 @@
 
 import logging
 
-from skywalking import config, agent
+from skywalking import config
+from skywalking.agent import agent
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
 from skywalking.protocol.logging.Logging_pb2 import LogData, LogDataBody, TraceContext, LogTags, TextLog
 from skywalking.trace.context import get_context
diff --git a/skywalking/loggings.py b/skywalking/loggings.py
index 3955d7d..1b9ba83 100644
--- a/skywalking/loggings.py
+++ b/skywalking/loggings.py
@@ -25,7 +25,7 @@ logger_debug_enabled = False
 def getLogger(name=None):  # noqa
     logger = logging.getLogger(name)
     ch = logging.StreamHandler()
-    formatter = logging.Formatter('%(name)s [%(threadName)s] [%(levelname)s] %(message)s')
+    formatter = logging.Formatter('%(name)s [pid:%(process)d] [%(threadName)s] [%(levelname)s] %(message)s')
     ch.setFormatter(formatter)
     logger.addHandler(ch)
     logger.propagate = False
diff --git a/skywalking/meter/__init__.py b/skywalking/meter/__init__.py
index eacfe5f..413f739 100644
--- a/skywalking/meter/__init__.py
+++ b/skywalking/meter/__init__.py
@@ -18,11 +18,15 @@
 _meter_service = None
 
 
-def init():
+def init(force: bool = False):
+    """
+    If the meter service is not initialized, initialize it.
+    if force, we are in a fork(), we force re-initialization
+    """
     from skywalking.meter.meter_service import MeterService
 
     global _meter_service
-    if _meter_service:
+    if _meter_service and not force:
         return
 
     _meter_service = MeterService()
diff --git a/skywalking/meter/meter.py b/skywalking/meter/meter.py
index 8feec19..9f04b5e 100644
--- a/skywalking/meter/meter.py
+++ b/skywalking/meter/meter.py
@@ -14,9 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import typing
 from abc import ABC, abstractmethod
 from enum import Enum
+from typing import Optional
+
 from skywalking.protocol.language_agent.Meter_pb2 import Label
 import skywalking.meter as meter
 
@@ -82,9 +83,10 @@ class BaseMeter(ABC):
     meter_service = None
 
     def __init__(self, name: str, tags=None):
-        if BaseMeter.meter_service is None:
-            BaseMeter.meter_service = meter._meter_service
-
+        # Should always override to use the correct meter service.
+        # Otherwise, forked process will inherit the original
+        # meter_service in parent. We want a new one in child.
+        BaseMeter.meter_service = meter._meter_service
         self.meterId = MeterId(name, self.get_type(), tags)
 
     def get_name(self):
@@ -110,7 +112,7 @@ class BaseMeter(ABC):
         def __init__(self, name: str, tags=None):
             # Derived Builder should instantiate its corresponding meter here.
             # self.meter = BaseMeter(name, tags)
-            self.meter: typing.Optional[BaseMeter] = None
+            self.meter: Optional[BaseMeter] = None
             pass
 
         def tag(self, name: str, value):
diff --git a/skywalking/meter/meter_service.py b/skywalking/meter/meter_service.py
index 67ad85e..a56fbfe 100644
--- a/skywalking/meter/meter_service.py
+++ b/skywalking/meter/meter_service.py
@@ -14,19 +14,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 import time
 from concurrent.futures import ThreadPoolExecutor
 from threading import Thread
 from skywalking import config
-from skywalking import agent
+from skywalking.agent import agent
 from skywalking.meter.meter import BaseMeter
 from skywalking.utils.time import current_milli_time
+from skywalking.loggings import logger
 
 
 class MeterService(Thread):
     def __init__(self):
-        super().__init__(daemon=True)
+        super().__init__(name='meterService', daemon=True)
+        logger.debug('Started meter service')
         self.meter_map = {}
 
     def register(self, meter: BaseMeter):
@@ -44,7 +45,7 @@ class MeterService(Thread):
             meterdata.timestamp = current_milli_time()
             agent.archive_meter(meterdata)
 
-        with ThreadPoolExecutor(max_workers=1) as executor:
+        with ThreadPoolExecutor(thread_name_prefix='meter_service_pool_worker', max_workers=1) as executor:
             executor.map(archive, self.meter_map.values())
 
     def run(self):
diff --git a/skywalking/meter/pvm/data_source.py b/skywalking/meter/pvm/data_source.py
index 988886e..3f97c78 100644
--- a/skywalking/meter/pvm/data_source.py
+++ b/skywalking/meter/pvm/data_source.py
@@ -19,7 +19,7 @@ from skywalking.meter.gauge import Gauge
 
 
 class DataSource:
-    def registry(self):
+    def register(self):
         for name in dir(self):
             if name.endswith('generator'):
                 generator = getattr(self, name)()
diff --git a/skywalking/plugins/sw_kafka.py b/skywalking/plugins/sw_kafka.py
index 60f3d9c..da685ca 100644
--- a/skywalking/plugins/sw_kafka.py
+++ b/skywalking/plugins/sw_kafka.py
@@ -74,7 +74,7 @@ def _sw__poll_once_func(__poll_once):
 def _sw_send_func(_send):
     def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
         # ignore trace, log and meter reporter - skywalking self request
-        if config.protocol == 'kafka' and \
+        if config.agent_protocol == 'kafka' and \
                 (config.kafka_topic_segment == topic
                  or config.kafka_topic_log == topic
                  or config.kafka_topic_management == topic
diff --git a/skywalking/plugins/sw_loguru.py b/skywalking/plugins/sw_loguru.py
index 9dd8c2c..0adee84 100644
--- a/skywalking/plugins/sw_loguru.py
+++ b/skywalking/plugins/sw_loguru.py
@@ -22,7 +22,8 @@ from multiprocessing import current_process
 from os.path import basename, splitext
 from threading import current_thread
 
-from skywalking import config, agent
+from skywalking import config
+from skywalking.agent import agent
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
 from skywalking.protocol.logging.Logging_pb2 import LogData, LogDataBody, TraceContext, LogTags, TextLog
 from skywalking.trace.context import get_context
diff --git a/skywalking/plugins/sw_redis.py b/skywalking/plugins/sw_redis.py
index 0dc95dd..ad5ae36 100644
--- a/skywalking/plugins/sw_redis.py
+++ b/skywalking/plugins/sw_redis.py
@@ -22,105 +22,24 @@ from skywalking.trace.tags import TagCacheType, TagCacheOp, TagCacheCmd, TagCach
 link_vector = ['https://github.com/andymccurdy/redis-py/']
 support_matrix = {
     'redis': {
-        '>=3.7': ['3.5']  # "4.0" next, incompatible to current instrumentation
+        '>=3.7': ['3.5.*', '4.5.1']
     }
 }
 note = """"""
 
-OPERATIONS_WRITE = set({'GETSET',
-                        'SET',
-                        'SETBIT',
-                        'SETEX ',
-                        'SETNX ',
-                        'SETRANGE',
-                        'STRLEN ',
-                        'MSET',
-                        'MSETNX ',
-                        'PSETEX',
-                        'INCR ',
-                        'INCRBY ',
-                        'INCRBYFLOAT',
-                        'DECR ',
-                        'DECRBY ',
-                        'APPEND ',
-                        'HMSET',
-                        'HSET',
-                        'HSETNX ',
-                        'HINCRBY',
-                        'HINCRBYFLOAT',
-                        'HDEL',
-                        'RPOPLPUSH',
-                        'RPUSH',
-                        'RPUSHX',
-                        'LPUSH',
-                        'LPUSHX',
-                        'LREM',
-                        'LTRIM',
-                        'LSET',
-                        'BRPOPLPUSH',
-                        'LINSERT',
-                        'SADD',
-                        'SDIFF',
-                        'SDIFFSTORE',
-                        'SINTERSTORE',
-                        'SISMEMBER',
-                        'SREM',
-                        'SUNION',
-                        'SUNIONSTORE',
-                        'SINTER',
-                        'ZADD',
-                        'ZINCRBY',
-                        'ZINTERSTORE',
-                        'ZRANGE',
-                        'ZRANGEBYLEX',
-                        'ZRANGEBYSCORE',
-                        'ZRANK',
-                        'ZREM',
-                        'ZREMRANGEBYLEX',
-                        'ZREMRANGEBYRANK',
-                        'ZREMRANGEBYSCORE',
-                        'ZREVRANGE',
-                        'ZREVRANGEBYSCORE',
-                        'ZREVRANK',
-                        'ZUNIONSTORE',
-                        'XADD',
-                        'XDEL',
-                        'DEL',
-                        'XTRIM'})
+OPERATIONS_WRITE = {'GETSET', 'SET', 'SETBIT', 'SETEX ', 'SETNX ', 'SETRANGE', 'STRLEN ', 'MSET', 'MSETNX ', 'PSETEX',
+                    'INCR ', 'INCRBY ', 'INCRBYFLOAT', 'DECR ', 'DECRBY ', 'APPEND ', 'HMSET', 'HSET', 'HSETNX ',
+                    'HINCRBY', 'HINCRBYFLOAT', 'HDEL', 'RPOPLPUSH', 'RPUSH', 'RPUSHX', 'LPUSH', 'LPUSHX', 'LREM',
+                    'LTRIM', 'LSET', 'BRPOPLPUSH', 'LINSERT', 'SADD', 'SDIFF', 'SDIFFSTORE', 'SINTERSTORE', 'SISMEMBER',
+                    'SREM', 'SUNION', 'SUNIONSTORE', 'SINTER', 'ZADD', 'ZINCRBY', 'ZINTERSTORE', 'ZRANGE',
+                    'ZRANGEBYLEX', 'ZRANGEBYSCORE', 'ZRANK', 'ZREM', 'ZREMRANGEBYLEX', 'ZREMRANGEBYRANK',
+                    'ZREMRANGEBYSCORE', 'ZREVRANGE', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZUNIONSTORE', 'XADD', 'XDEL',
+                    'DEL', 'XTRIM'}
 
-OPERATIONS_READ = set({'GETRANGE',
-                       'GETBIT ',
-                       'MGET',
-                       'HVALS',
-                       'HKEYS',
-                       'HLEN',
-                       'HEXISTS',
-                       'HGET',
-                       'HGETALL',
-                       'HMGET',
-                       'BLPOP',
-                       'BRPOP',
-                       'LINDEX',
-                       'LLEN',
-                       'LPOP',
-                       'LRANGE',
-                       'RPOP',
-                       'SCARD',
-                       'SRANDMEMBER',
-                       'SPOP',
-                       'SSCAN',
-                       'SMOVE',
-                       'ZLEXCOUNT',
-                       'ZSCORE',
-                       'ZSCAN',
-                       'ZCARD',
-                       'ZCOUNT',
-                       'XGET',
-                       'GET',
-                       'XREAD',
-                       'XLEN',
-                       'XRANGE',
-                       'XREVRANGE'})
+OPERATIONS_READ = {'GETRANGE', 'GETBIT ', 'MGET', 'HVALS', 'HKEYS', 'HLEN', 'HEXISTS', 'HGET', 'HGETALL', 'HMGET',
+                   'BLPOP', 'BRPOP', 'LINDEX', 'LLEN', 'LPOP', 'LRANGE', 'RPOP', 'SCARD', 'SRANDMEMBER', 'SPOP',
+                   'SSCAN', 'SMOVE', 'ZLEXCOUNT', 'ZSCORE', 'ZSCAN', 'ZCARD', 'ZCOUNT', 'XGET', 'GET', 'XREAD', 'XLEN',
+                   'XRANGE', 'XREVRANGE'}
 
 
 def install():
@@ -130,7 +49,14 @@ def install():
 
     def _sw_send_command(this: Connection, *args, **kwargs):
         peer = f'{this.host}:{this.port}'
-        cmd, key = args[0], args[1]
+
+        if len(args) == 1:
+            cmd = args[0]
+            key = ''
+        elif len(args) > 1:
+            cmd, key = args[0], args[1]
+        else:  # just to be safe
+            cmd = key = ''
 
         if cmd in OPERATIONS_WRITE:
             op = 'write'
diff --git a/skywalking/plugins/sw_requests.py b/skywalking/plugins/sw_requests.py
index 743bb32..6d8c4e1 100644
--- a/skywalking/plugins/sw_requests.py
+++ b/skywalking/plugins/sw_requests.py
@@ -43,7 +43,7 @@ def install():
         url_param = sw_urlparse(url)
 
         # ignore trace skywalking self request
-        if config.protocol == 'http' and config.agent_collector_backend_services.rstrip('/').endswith(url_param.netloc):
+        if config.agent_protocol == 'http' and config.agent_collector_backend_services.rstrip('/').endswith(url_param.netloc):
             return _request(this, method, url, params, data, headers, cookies, files, auth, timeout,
                             allow_redirects,
                             proxies,
diff --git a/skywalking/plugins/sw_websockets.py b/skywalking/plugins/sw_websockets.py
index 084160b..128fde3 100644
--- a/skywalking/plugins/sw_websockets.py
+++ b/skywalking/plugins/sw_websockets.py
@@ -73,7 +73,6 @@ def install():
             finally:
                 span.tag(TagHttpStatusMsg(status_msg))
 
-
     WebSocketClientProtocol.handshake = _sw_protocol_handshake_client
 
     # To trace per message transactions
diff --git a/skywalking/profile/profile_context.py b/skywalking/profile/profile_context.py
index 283ad36..79d6e23 100644
--- a/skywalking/profile/profile_context.py
+++ b/skywalking/profile/profile_context.py
@@ -23,7 +23,7 @@ from packaging import version
 from threading import Thread, Event, current_thread
 from typing import Optional
 
-from skywalking import agent
+from skywalking.agent import agent
 from skywalking import config
 from skywalking import profile
 from skywalking.loggings import logger
diff --git a/skywalking/profile/profile_service.py b/skywalking/profile/profile_service.py
index bc44480..5caf0ee 100644
--- a/skywalking/profile/profile_service.py
+++ b/skywalking/profile/profile_service.py
@@ -20,7 +20,7 @@ from queue import Queue
 from threading import Timer, RLock, Lock
 from typing import Tuple
 
-from skywalking import agent
+from skywalking.agent import agent
 from skywalking.loggings import logger, logger_debug_enabled
 from skywalking.profile.profile_constants import ProfileConstants
 from skywalking.profile.profile_context import ProfileTaskExecutionContext
@@ -55,7 +55,7 @@ class ProfileTaskExecutionService:
 
         self._last_command_create_time = -1  # type: int
         # single thread executor
-        self.profile_executor = ThreadPoolExecutor(max_workers=1)
+        self.profile_executor = ThreadPoolExecutor(thread_name_prefix='profile-executor', max_workers=1)
         self.task_execution_context = AtomicRef(None)
 
         self.profile_task_scheduler = Scheduler()
diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py
index 6ef6429..7f4528d 100644
--- a/skywalking/trace/context.py
+++ b/skywalking/trace/context.py
@@ -16,9 +16,9 @@
 #
 from typing import Optional
 
-from skywalking import Component, agent, config
+from skywalking import Component, config
 from skywalking import profile
-from skywalking.agent import isfull
+from skywalking.agent import agent
 from skywalking.profile.profile_status import ProfileStatusReference
 from skywalking.trace import ID
 from skywalking.trace.carrier import Carrier
@@ -104,7 +104,7 @@ class SpanContext:
 
     @staticmethod
     def ignore_check(op: str, kind: Kind, carrier: Optional[Carrier] = None):
-        if config.RE_IGNORE_PATH.match(op) or isfull() or (carrier is not None and carrier.is_suppressed):
+        if config.RE_IGNORE_PATH.match(op) or agent.is_segment_queue_full() or (carrier is not None and carrier.is_suppressed):
             return NoopSpan(context=NoopContext())
 
         return None
@@ -219,7 +219,7 @@ class SpanContext:
 
         self._nspans -= 1
         if self._nspans == 0:
-            agent.archive(self.segment)
+            agent.archive_segment(self.segment)
             return True
 
         return False
diff --git a/tests/plugin/http/sw_websockets/services/consumer.py b/skywalking/utils/singleton.py
similarity index 57%
copy from tests/plugin/http/sw_websockets/services/consumer.py
copy to skywalking/utils/singleton.py
index 21e9640..f38e3ab 100644
--- a/tests/plugin/http/sw_websockets/services/consumer.py
+++ b/skywalking/utils/singleton.py
@@ -14,25 +14,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import websockets
 
-import asyncio
 
-if __name__ == '__main__':
-    from fastapi import FastAPI
-    import uvicorn
+class Singleton(object):
+    """
+    This is to ensure a single process can only have one instance of agent.
+    Written by Guido van Rossum to implement a singleton pattern.
+    https://www.python.org/download/releases/2.2/descrintro/#__new__
+    Classes that inherit from this class will be singletons.
+    """
+    def __new__(cls, *args, **kwds):
+        it = cls.__dict__.get('__it__')
+        if it is not None:
+            return it
+        cls.__it__ = it = object.__new__(cls)
+        it.init(*args, **kwds)
+        return it
 
-    app = FastAPI()
-
-    @app.get('/ws')
-    async def websocket_ping():
-        async with websockets.connect('ws://provider:9091/ws', extra_headers=None) as websocket:
-            await websocket.send('Ping')
-
-            response = await websocket.recv()
-            await asyncio.sleep(0.5)
-
-            await websocket.close()
-            return response
-
-    uvicorn.run(app, host='0.0.0.0', port=9090)
+    def init(self, *args, **kwds):
+        pass
diff --git a/tests/e2e/script/env b/tests/e2e/script/env
index adb6ad0..91aa97f 100644
--- a/tests/e2e/script/env
+++ b/tests/e2e/script/env
@@ -13,6 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# 2022-11-05 commit
-SW_CTL_COMMIT=521843f963917aa806740a9ad09c65aa59aca179
-SW_OAP_COMMIT=93d021ab0bbffa6cfa73adacdcbbf9e25f8016be
\ No newline at end of file
+# 2022-02-15 commit
+SW_CTL_COMMIT=0883266bfaa36612927b69e35781b64ea181758d
+SW_OAP_COMMIT=574b83f095861d4199fdb78aa52923765cf921a1
\ No newline at end of file
diff --git a/tests/plugin/base.py b/tests/plugin/base.py
index b61ece0..64634e3 100644
--- a/tests/plugin/base.py
+++ b/tests/plugin/base.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import time
 import inspect
 import os
 import sys
@@ -38,13 +38,16 @@ class TestPluginBase:
         if expected_file_name is None:
             expected_file_name = os.path.join(dirname(inspect.getfile(self.__class__)), 'expected.data.yml')
 
-        # time.sleep(10)
-
         with open(expected_file_name) as expected_data_file:
             expected_data = os.linesep.join(expected_data_file.readlines())
 
             response = requests.post(url='http://localhost:12800/dataValidate', data=expected_data)
 
+            if response.status_code != 200:
+                # heuristically retry once
+                time.sleep(10)
+                response = requests.post(url='http://localhost:12800/dataValidate', data=expected_data)
+
             if response.status_code != 200:
                 res = requests.get('http://localhost:12800/receiveData')
 
diff --git a/tests/plugin/http/sw_websockets/services/consumer.py b/tests/plugin/http/sw_websockets/services/consumer.py
index 21e9640..204d7f7 100644
--- a/tests/plugin/http/sw_websockets/services/consumer.py
+++ b/tests/plugin/http/sw_websockets/services/consumer.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import websockets
+from websockets.client import connect
 
 import asyncio
 
@@ -26,7 +26,7 @@ if __name__ == '__main__':
 
     @app.get('/ws')
     async def websocket_ping():
-        async with websockets.connect('ws://provider:9091/ws', extra_headers=None) as websocket:
+        async with connect('ws://provider:9091/ws', extra_headers=None) as websocket:
             await websocket.send('Ping')
 
             response = await websocket.recv()
diff --git a/tests/plugin/web/sw_fastapi/services/consumer.py b/tests/plugin/web/sw_fastapi/services/consumer.py
index 627bf6c..39d144a 100644
--- a/tests/plugin/web/sw_fastapi/services/consumer.py
+++ b/tests/plugin/web/sw_fastapi/services/consumer.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 import requests
-import websockets
+from websockets.client import connect
 
 import asyncio
 
@@ -32,7 +32,7 @@ if __name__ == '__main__':
         return {'http': res.json(), 'websocket': websocket_pong}
 
     async def websocket_ping():
-        async with websockets.connect('ws://provider:9091/ws', extra_headers=None) as websocket:
+        async with connect('ws://provider:9091/ws', extra_headers=None) as websocket:
             await websocket.send('Ping')
 
             response = await websocket.recv()
diff --git a/tests/plugin/web/sw_sanic/services/provider.py b/tests/plugin/web/sw_sanic/services/provider.py
index 2cc7ba9..1669a21 100644
--- a/tests/plugin/web/sw_sanic/services/provider.py
+++ b/tests/plugin/web/sw_sanic/services/provider.py
@@ -30,4 +30,4 @@ if __name__ == '__main__':
         )
 
     PORT = 9091
-    app.run(host='0.0.0.0', port=PORT, debug=True)
+    app.run(host='0.0.0.0', port=PORT)
diff --git a/tests/unit/test_meter.py b/tests/unit/test_meter.py
index 4447abc..cb9ae19 100644
--- a/tests/unit/test_meter.py
+++ b/tests/unit/test_meter.py
@@ -23,6 +23,7 @@ from skywalking.meter.counter import Counter, CounterMode
 from skywalking.meter.histogram import Histogram
 from skywalking.meter.gauge import Gauge
 from skywalking.meter.meter import BaseMeter
+from skywalking import meter
 
 
 class MockMeterService():
@@ -44,7 +45,7 @@ class MockMeterService():
 
 
 meter_service = MockMeterService()
-BaseMeter.meter_service = meter_service
+meter._meter_service = meter_service
 
 # picked empirically
 tolerance = 5e-2
@@ -85,7 +86,6 @@ class TestMeter(unittest.TestCase):
             self.assertLess(abs(i - (meterdata.singleValue.value - pre)), tolerance)
             pre = meterdata.singleValue.value
 
-
     def test_counter_increase_decarator(self):
         builder = Counter.Builder('c3', CounterMode.INCREMENT)
         c = builder.build()
@@ -168,7 +168,6 @@ class TestMeter(unittest.TestCase):
                 meterdata = meter_service.transform(h)
                 self.assertEqual(repeat, meterdata.histogram.values[idx].count)
 
-
     def test_gauge(self):
         ls = list(range(1, 10))
         random.shuffle(ls)