You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/12/13 16:22:56 UTC

[skywalking-python] branch master updated: [Plugin] add aiohttp plugin (#101)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 65c6e4a  [Plugin] add aiohttp plugin (#101)
65c6e4a is described below

commit 65c6e4ab8d8f7d7e649823dc02674a00d19441da
Author: Tomasz Pytel <to...@gmail.com>
AuthorDate: Sun Dec 13 13:22:50 2020 -0300

    [Plugin] add aiohttp plugin (#101)
    
    * add test case
    
    Co-authored-by: kezhenxu94 <ke...@apache.org>
---
 requirements.txt                             |  1 +
 skywalking/__init__.py                       |  1 +
 skywalking/plugins/sw_aiohttp.py             | 94 ++++++++++++++++++++++++++++
 tests/plugin/sw_aiohttp/__init__.py          | 16 +++++
 tests/plugin/sw_aiohttp/docker-compose.yml   | 60 ++++++++++++++++++
 tests/plugin/sw_aiohttp/expected.data.yml    | 92 +++++++++++++++++++++++++++
 tests/plugin/sw_aiohttp/services/__init__.py | 16 +++++
 tests/plugin/sw_aiohttp/services/consumer.py | 44 +++++++++++++
 tests/plugin/sw_aiohttp/services/provider.py | 39 ++++++++++++
 tests/plugin/sw_aiohttp/test_aiohttp.py      | 36 +++++++++++
 10 files changed, 399 insertions(+)

diff --git a/requirements.txt b/requirements.txt
index f9687d4..eeea0d2 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,6 @@
 asgiref==3.2.10
 aiofiles==0.6.0
+aiohttp==3.7.3
 attrs==19.3.0
 blindspin==2.0.1
 certifi==2020.6.20
diff --git a/skywalking/__init__.py b/skywalking/__init__.py
index 06c737b..7be2e43 100644
--- a/skywalking/__init__.py
+++ b/skywalking/__init__.py
@@ -38,6 +38,7 @@ class Component(Enum):
     Elasticsearch = 47
     Urllib3 = 7006
     Sanic = 7007
+    AioHttp = 7008
 
 
 class Layer(Enum):
diff --git a/skywalking/plugins/sw_aiohttp.py b/skywalking/plugins/sw_aiohttp.py
new file mode 100644
index 0000000..ce3f0a8
--- /dev/null
+++ b/skywalking/plugins/sw_aiohttp.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from skywalking import Layer, Component
+from skywalking.trace import tags
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import Tag
+
+
+def install():
+    from aiohttp import ClientSession
+    from aiohttp.web_protocol import RequestHandler
+    from multidict import CIMultiDict, MultiDict, MultiDictProxy
+    from yarl import URL
+
+    async def _sw_request(self: ClientSession, method: str, str_or_url, **kwargs):
+        url = URL(str_or_url).with_user(None).with_password(None)
+        peer = '%s:%d' % (url.host or '', url.port)
+        context = get_context()
+
+        with context.new_exit_span(op=url.path or "/", peer=peer) as span:
+            span.layer = Layer.Http
+            span.component = Component.AioHttp
+            span.tag(Tag(key=tags.HttpMethod, val=method.upper()))  # pyre-ignore
+            span.tag(Tag(key=tags.HttpUrl, val=url))  # pyre-ignore
+
+            carrier = span.inject()
+            headers = kwargs.get('headers')
+
+            if headers is None:
+                headers = kwargs['headers'] = CIMultiDict()
+            elif not isinstance(headers, (MultiDictProxy, MultiDict)):
+                headers = CIMultiDict(headers)
+
+            for item in carrier:
+                headers.add(item.key, item.val)
+
+            res = await _request(self, method, str_or_url, **kwargs)
+
+            span.tag(Tag(key=tags.HttpStatus, val=res.status, overridable=True))
+
+            if res.status >= 400:
+                span.error_occurred = True
+
+            return res
+
+    _request = ClientSession._request
+    ClientSession._request = _sw_request
+
+    async def _sw_handle_request(self, request, start_time: float):
+        context = get_context()
+        carrier = Carrier()
+
+        for item in carrier:
+            val = request.headers.get(item.key)
+
+            if val is not None:
+                item.val = val
+
+        with context.new_entry_span(op=request.path, carrier=carrier) as span:
+            span.layer = Layer.Http
+            span.component = Component.AioHttp
+            span.peer = '%s:%d' % request._transport_peername if isinstance(request._transport_peername, (list, tuple))\
+                else request._transport_peername
+
+            span.tag(Tag(key=tags.HttpMethod, val=request.method))  # pyre-ignore
+            span.tag(Tag(key=tags.HttpUrl, val=str(request.url)))  # pyre-ignore
+
+            resp, reset = await _handle_request(self, request, start_time)
+
+            span.tag(Tag(key=tags.HttpStatus, val=resp.status, overridable=True))
+
+            if resp.status >= 400:
+                span.error_occurred = True
+
+        return resp, reset
+
+    _handle_request = RequestHandler._handle_request
+    RequestHandler._handle_request = _sw_handle_request
diff --git a/tests/plugin/sw_aiohttp/__init__.py b/tests/plugin/sw_aiohttp/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/sw_aiohttp/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/sw_aiohttp/docker-compose.yml b/tests/plugin/sw_aiohttp/docker-compose.yml
new file mode 100644
index 0000000..db53dcd
--- /dev/null
+++ b/tests/plugin/sw_aiohttp/docker-compose.yml
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.1'
+
+services:
+  collector:
+    extends:
+      service: collector
+      file: ../docker/docker-compose.base.yml
+
+  provider:
+    extends:
+      service: agent
+      file: ../docker/docker-compose.base.yml
+    ports:
+      - 9091:9091
+    volumes:
+      - .:/app
+    command: ['bash', '-c', 'pip install -r /app/requirements.txt && python3 /app/services/provider.py']
+    depends_on:
+      collector:
+        condition: service_healthy
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+
+  consumer:
+    extends:
+      service: agent
+      file: ../docker/docker-compose.base.yml
+    ports:
+      - 9090:9090
+    volumes:
+      - .:/app
+    command: ['bash', '-c', 'pip install -r /app/requirements.txt && python3 /app/services/consumer.py']
+    depends_on:
+      collector:
+        condition: service_healthy
+      provider:
+        condition: service_healthy
+
+networks:
+  beyond:
diff --git a/tests/plugin/sw_aiohttp/expected.data.yml b/tests/plugin/sw_aiohttp/expected.data.yml
new file mode 100644
index 0000000..c7c8711
--- /dev/null
+++ b/tests/plugin/sw_aiohttp/expected.data.yml
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+segmentItems:
+  - serviceName: provider
+    segmentSize: 1
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: /skywalking
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            tags:
+              - key: http.method
+                value: GET
+              - key: url
+                value: http://provider:9091/skywalking
+              - key: status.code
+                value: '200'
+            refs:
+              - parentEndpoint: /skywalking
+                networkAddress: provider:9091
+                refType: CrossProcess
+                parentSpanId: 1
+                parentTraceSegmentId: not null
+                parentServiceInstance: not null
+                parentService: consumer
+                traceId: not null
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 7008
+            spanType: Entry
+            peer: not null
+            skipAnalysis: false
+  - serviceName: consumer
+    segmentSize: 1
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: /skywalking
+            operationId: 0
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: Http
+            tags:
+              - key: http.method
+                value: GET
+              - key: url
+                value: http://provider:9091/skywalking
+              - key: status.code
+                value: '200'
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 7008
+            spanType: Exit
+            peer: provider:9091
+            skipAnalysis: false
+          - operationName: /skywalking
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            tags:
+              - key: http.method
+                value: GET
+              - key: url
+                value: http://0.0.0.0:9090/skywalking
+              - key: status.code
+                value: '200'
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 7008
+            spanType: Entry
+            peer: not null
+            skipAnalysis: false
+
diff --git a/tests/plugin/sw_aiohttp/services/__init__.py b/tests/plugin/sw_aiohttp/services/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/sw_aiohttp/services/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/sw_aiohttp/services/consumer.py b/tests/plugin/sw_aiohttp/services/consumer.py
new file mode 100644
index 0000000..b41e8fd
--- /dev/null
+++ b/tests/plugin/sw_aiohttp/services/consumer.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import time
+
+import aiohttp
+from aiohttp import web
+
+from skywalking import agent
+from skywalking import config
+
+
+async def handle(request):
+    name = request.match_info.get('name', "Anonymous")
+
+    async with aiohttp.ClientSession() as session:
+        async with session.get(f'http://user:pass@provider:9091/{name}') as response:
+            time.sleep(.5)
+            json = await response.json()
+            return web.Response(text=str(json))
+
+
+app = web.Application()
+app.add_routes([web.get('/', handle), web.get('/{name}', handle)])
+
+if __name__ == '__main__':
+    config.service_name = 'consumer'
+    config.logging_level = 'DEBUG'
+    agent.start()
+
+    web.run_app(app, port=9090)
diff --git a/tests/plugin/sw_aiohttp/services/provider.py b/tests/plugin/sw_aiohttp/services/provider.py
new file mode 100644
index 0000000..d025b56
--- /dev/null
+++ b/tests/plugin/sw_aiohttp/services/provider.py
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from aiohttp import web
+
+from skywalking import agent
+from skywalking import config
+
+
+async def handle(request):
+    name = request.match_info.get('name', "Anonymous")
+    return web.json_response({
+        name: name,
+    })
+
+
+app = web.Application()
+app.add_routes([web.get('/', handle), web.get('/{name}', handle)])
+
+if __name__ == '__main__':
+    config.service_name = 'provider'
+    config.logging_level = 'DEBUG'
+    agent.start()
+
+    web.run_app(app, port=9091)
diff --git a/tests/plugin/sw_aiohttp/test_aiohttp.py b/tests/plugin/sw_aiohttp/test_aiohttp.py
new file mode 100644
index 0000000..b0c9c49
--- /dev/null
+++ b/tests/plugin/sw_aiohttp/test_aiohttp.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Callable
+
+import pytest
+import requests
+
+from tests.plugin.base import TestPluginBase
+
+
+@pytest.fixture  # pyre-ignore
+def prepare():
+    # type: () -> Callable
+    return lambda *_: requests.get('http://0.0.0.0:9090/skywalking')
+
+
+class TestPlugin(TestPluginBase):
+    @pytest.mark.parametrize('version', [
+        'aiohttp==3.7.3',
+    ])
+    def test_plugin(self, docker_compose, version):
+        self.validate()