You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/06/21 16:16:07 UTC

[GitHub] [skywalking-python] tom-pytel opened a new pull request #125: WIP celery plugin and required core changes

tom-pytel opened a new pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125


   There are several things here all mixed together because they are required to make celery work. The main problem is that the default celery configuration is to run the backend server via multiprocessing `fork()` for true concurrency, which this agent was not set up to handle. I added  this but unfortunately I could not get grpc working with `fork()` (does not mean it can't, just I couldn't do it in my restricted timeline), so I fixed a minor bug in the http protocol which now works correctly with `fork()`. A few more tweaks needed before this can be merged.
   
   * NOTE: The changes in `context.py` are not finished. Entry and exit spans can not indiscriminately reuse each other since they may not be related at all. An explicit inheritance mechanism is needed to indicate which plugins can inherit a span from which others. I will eventually implement this in the same way I did for the Node agent, but for now this works. I will probably eventually implement most of the Node features and cleanups here.
   
   - [ ] Add a test case for the new plugin
   - [ ] Add a component id in [the main repo](https://github.com/apache/skywalking/blob/master/oap-server/server-bootstrap/src/main/resources/component-libraries.yml#L415)
   - [ ] Add a logo in [the UI repo](https://github.com/apache/skywalking-rocketbot-ui/tree/master/src/views/components/topology/assets)
   - [ ] Rebuild the `requirements.txt` by running `tools/env/build_requirements_(linux|windows).sh`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r661529369



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       BTW, this is not the end of the road though. Our internal stress tests show problems with spans mixing or disappearing so I need to go back into core functionality and fix all that, maybe overhaul how span context is tracked like in the Node agent (especially since async wansn't originally a design consideration in this agent). So treat this PR as a single step towards getting all that fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r663997799



##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#issuecomment-874156554


   Sanic 21.0.0+ no longer works with plugin hook method. This PR is starting to get messy...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel merged pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel merged pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] sonatype-lift[bot] commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r663975843



##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       *Incompatible variable type:*  protocol_type is declared to have type `str` but is used as type `None`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)

##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       I've recorded this as ignored for this pull request. If you change your mind, just comment `@sonatype-lift unignore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#issuecomment-874156554


   Sanic 21.0.0+ no longer works with plugin hook method. This PR is starting to get messy...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657990819



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       Same result for me. Is possible it is something in my env. Can you try with also a span BEFORE the fork with and without a 2 sec delay to see if maybe grpc is handling the fork ok on its own?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r665185963



##########
File path: docs/Plugins.md
##########
@@ -18,6 +18,7 @@ Library | Versions | Plugin Name
 | [sanic](https://sanic.readthedocs.io/en/latest/) | >= 20.3.0 <= 20.9.1 | `sw_sanic` |
 | [aiohttp](https://sanic.readthedocs.io/en/latest/) | >= 3.7.3 | `sw_aiohttp` |
 | [pyramid](https://trypyramid.com) | >= 1.9 | `sw_pyramid` |
-| [psycopg2](https://www.psycopg.org/) | 2.8.6 | `sw_psycopg2` |
+| [psycopg2](https://www.psycopg.org/) | >= 2.8.6 | `sw_psycopg2` |
+| [celery](https://docs.celeryproject.org/) | >= 4.2.1 | `sw_celery` |

Review comment:
       Let's be clear that this plugin only works in `http` protocol

##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       I'd rather leave `grpc` as default protocol and switch to `http` in the tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r661529369



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       BTW, this is not the end of the road though. Our internal stress tests show problems with spans mixing or disappearing so I need to go back into core functionality and fix all that. Maybe overhaul how span context is tracked like in the Node agent (especially since async wansn't originally a design consideration in this agent). So treat this PR as a single step towards getting all that fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657093361



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       With grpc? Really?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657092466



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       Just run the test script for several times, all 4 spans are correctly reported




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r665271504



##########
File path: docs/Plugins.md
##########
@@ -18,6 +18,7 @@ Library | Versions | Plugin Name
 | [sanic](https://sanic.readthedocs.io/en/latest/) | >= 20.3.0 <= 20.9.1 | `sw_sanic` |
 | [aiohttp](https://sanic.readthedocs.io/en/latest/) | >= 3.7.3 | `sw_aiohttp` |
 | [pyramid](https://trypyramid.com) | >= 1.9 | `sw_pyramid` |
-| [psycopg2](https://www.psycopg.org/) | 2.8.6 | `sw_psycopg2` |
+| [psycopg2](https://www.psycopg.org/) | >= 2.8.6 | `sw_psycopg2` |
+| [celery](https://docs.celeryproject.org/) | >= 4.2.1 | `sw_celery` |

Review comment:
       It only works with http protocol for the server being run with `celery -A`, client can use whatever protocol (as long as it is not fork()ing, which will break grpc for whatever client). Will add note.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657115742



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       ```shell
   pwd
   # /Users/kezhenxu94/workspace/skywalking-python
   
   g rev-parse HEAD
   # 6002ccc5e7463be8960304c30ecae55753f001dc
   
   make install > /dev/null 2>&1
   cat <<EOF > test.py
   import multiprocessing as mp
   import time
   
   from skywalking.trace.context import get_context
   from skywalking import agent, config
   
   config.init(collector='127.0.0.1:11800', service='your awesome service')
   agent.start()
   
   def foo():
       with get_context().new_local_span('child before error'):
           pass
   
       time.sleep(2)  # this needed to flush send because python doesn't run atexit handlers on exit in forked children
   
       # import atexit
       # atexit._run_exitfuncs()
   
   if __name__ == '__main__':
       p = mp.Process(target = foo, args = ())
   
       with get_context().new_local_span('parent before start'):
           pass
   
       p.start()
   
       time.sleep(1)
   
       with get_context().new_local_span('parent after start'):
           pass
   
       p.join()
   
       with get_context().new_local_span('parent after join'):
           pass
   
       time.sleep(5)
   EOF
   
   docker run -it --rm -p 11800:11800 -p 12800:12800 -d apache/skywalking-oap-server:8.6.0-es6
   # 64fb2bf0e8b8030c69058d79c114969ab46207fe9391913cc91080528f510005
   
   # Start to test
   # Generate traffic
   python3 test.py
   # skywalking [MainThread] [WARNING] failed to install plugin sw_aiohttp
   # skywalking [MainThread] [WARNING] failed to install plugin sw_django
   # skywalking [MainThread] [WARNING] failed to install plugin sw_elasticsearch
   # skywalking [MainThread] [WARNING] failed to install plugin sw_flask
   # skywalking [MainThread] [WARNING] failed to install plugin sw_kafka
   # skywalking [MainThread] [WARNING] failed to install plugin sw_psycopg2
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymongo
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymysql
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pyramid
   # skywalking [MainThread] [WARNING] failed to install plugin sw_rabbitmq
   # skywalking [MainThread] [WARNING] failed to install plugin sw_redis
   # skywalking [MainThread] [WARNING] failed to install plugin sw_requests
   # skywalking [MainThread] [WARNING] failed to install plugin sw_sanic
   # skywalking [MainThread] [WARNING] failed to install plugin sw_tornado
   # skywalking [MainThread] [WARNING] failed to install plugin sw_urllib3
   # skywalking [MainThread] [WARNING] failed to install plugin sw_aiohttp
   # skywalking [MainThread] [WARNING] failed to install plugin sw_django
   # skywalking [MainThread] [WARNING] failed to install plugin sw_elasticsearch
   # skywalking [MainThread] [WARNING] failed to install plugin sw_flask
   # skywalking [MainThread] [WARNING] failed to install plugin sw_kafka
   # skywalking [MainThread] [WARNING] failed to install plugin sw_psycopg2
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymongo
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymysql
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pyramid
   # skywalking [MainThread] [WARNING] failed to install plugin sw_rabbitmq
   # skywalking [MainThread] [WARNING] failed to install plugin sw_redis
   # skywalking [MainThread] [WARNING] failed to install plugin sw_requests
   # skywalking [MainThread] [WARNING] failed to install plugin sw_sanic
   # skywalking [MainThread] [WARNING] failed to install plugin sw_tornado
   # skywalking [MainThread] [WARNING] failed to install plugin sw_urllib3
   # Query and see the span number
   
   curl -s 'http://localhost:12800/graphql' \
   -X 'POST' \
   -H 'Content-Type: application/json;charset=utf-8' \
   --data-binary '{"query":"query queryTraces($condition: TraceQueryCondition) {\n  data: queryBasicTraces(condition: $condition) { traces { endpointNames } total }}","variables":{"condition":{"queryDuration":{"start":"2021-06-23 131309","end":"2021-06-23 232809","step":"SECOND"},"traceState":"ALL","paging":{"pageNum":1,"pageSize":15,"needTotal":true},"queryOrder":"BY_START_TIME","tags":[]}}}' | jq '.data.data.total'
   
   # 4 <================== 
   
   python3 test.py
   # skywalking [MainThread] [WARNING] failed to install plugin sw_aiohttp
   # skywalking [MainThread] [WARNING] failed to install plugin sw_django
   # skywalking [MainThread] [WARNING] failed to install plugin sw_elasticsearch
   # skywalking [MainThread] [WARNING] failed to install plugin sw_flask
   # skywalking [MainThread] [WARNING] failed to install plugin sw_kafka
   # skywalking [MainThread] [WARNING] failed to install plugin sw_psycopg2
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymongo
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymysql
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pyramid
   # skywalking [MainThread] [WARNING] failed to install plugin sw_rabbitmq
   # skywalking [MainThread] [WARNING] failed to install plugin sw_redis
   # skywalking [MainThread] [WARNING] failed to install plugin sw_requests
   # skywalking [MainThread] [WARNING] failed to install plugin sw_sanic
   # skywalking [MainThread] [WARNING] failed to install plugin sw_tornado
   # skywalking [MainThread] [WARNING] failed to install plugin sw_urllib3
   # skywalking [MainThread] [WARNING] failed to install plugin sw_aiohttp
   # skywalking [MainThread] [WARNING] failed to install plugin sw_django
   # skywalking [MainThread] [WARNING] failed to install plugin sw_elasticsearch
   # skywalking [MainThread] [WARNING] failed to install plugin sw_flask
   # skywalking [MainThread] [WARNING] failed to install plugin sw_kafka
   # skywalking [MainThread] [WARNING] failed to install plugin sw_psycopg2
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymongo
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymysql
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pyramid
   # skywalking [MainThread] [WARNING] failed to install plugin sw_rabbitmq
   # skywalking [MainThread] [WARNING] failed to install plugin sw_redis
   # skywalking [MainThread] [WARNING] failed to install plugin sw_requests
   # skywalking [MainThread] [WARNING] failed to install plugin sw_sanic
   # skywalking [MainThread] [WARNING] failed to install plugin sw_tornado
   # skywalking [MainThread] [WARNING] failed to install plugin sw_urllib3
   
   # Query and see the span number
   curl -s 'http://localhost:12800/graphql' \
   -X 'POST' \
   -H 'Content-Type: application/json;charset=utf-8' \
   --data-binary '{"query":"query queryTraces($condition: TraceQueryCondition) {\n  data: queryBasicTraces(condition: $condition) { traces { endpointNames } total }}","variables":{"condition":{"queryDuration":{"start":"2021-06-23 131309","end":"2021-06-23 232809","step":"SECOND"},"traceState":"ALL","paging":{"pageNum":1,"pageSize":15,"needTotal":true},"queryOrder":"BY_START_TIME","tags":[]}}}' | jq '.data.data.total'
   
   # 8 <===============
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] sonatype-lift[bot] commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r663975843



##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       *Incompatible variable type:*  protocol_type is declared to have type `str` but is used as type `None`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657905251



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       I do not get this result, and frankly I find it weird that you do since this version does not do anything special for grpc on fork. Tested with both python 3.8 and 3.9 on with clean build I get three spans and a deadlock. Also, I do not see `sw_celery` listed for plugins which this PR contains, are you running this PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: WIP celery plugin and required core changes

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r655585973



##########
File path: skywalking/config.py
##########
@@ -58,13 +58,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters = os.getenv('SW_CELERY_PARAMETERS', '').lower() == 'true'
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r656722698



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       I didn't mean to close parent channel and create in child process by reusing the agent in parent. What I meant is to start another independent agent in child process and leave the parent one there because there may be other things that may need to be traced in parent process. Can you take a look at 
   
   https://github.com/apache/skywalking-python/blob/c73398544694ff7c64b083a5ee0d6d82578971cf/skywalking/trace/ipc/process.py#L30-L34
   
   ... and see whether that helps, it is generally what I propose to do in forked processes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r667484219



##########
File path: setup.py
##########
@@ -33,12 +33,13 @@
     author="Apache",
     author_email="dev@skywalking.apache.org",
     license="Apache 2.0",
-    packages=find_packages(exclude=("tests",)),
+    packages=find_packages(exclude=("tests", "tests.*")),
     include_package_data=True,
     install_requires=[
         "grpcio",
         "grpcio-tools",
         "packaging",
+        "requests",

Review comment:
       Hi @tom-pytel I missed this in this PR, but this makes `requests` a mandatory dependency of skywalking-python, please also take a look at https://github.com/apache/skywalking/issues/7282 that `requests` depends on a LGPL licensed dependency that we cannot ship with in ASF project. As we have this in `extras_require/http`, can we just remove this? When users want to use `http` protocol, they can use something like `pip install skywalking-python[http]`.
   
   FYI @wu-sheng 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657907669



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       Just tried with previous master without celery or fork changes and still only get three spans, but no deadlock.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r665271578



##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       I see 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on pull request #125: WIP celery plugin and required core changes

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#issuecomment-865350683


   Just noticed Python 3.6 is missing the critical `os.register_at_fork()`, will probably just not have support for forking under that version then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r656713010



##########
File path: skywalking/plugins/sw_celery.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from skywalking import Layer, Component, config
+from skywalking.trace import tags
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import Tag
+
+
+def install():
+    from urllib.parse import urlparse
+    from celery import Celery
+
+    def send_task(self, name, args=None, kwargs=None, **options):
+        # NOTE: Lines commented out below left for documentation purposes if sometime in the future exchange / queue
+        # names are wanted. Currently these do not match between producer and consumer so would need some work.
+
+        broker_url = self.conf['broker_url']
+        # exchange = options['exchange']
+        # queue = options['routing_key']
+        # op = 'celery/{}/{}/{}'.format(exchange or '', queue or '', name)
+        op = 'celery/' + name
+
+        if broker_url:
+            url = urlparse(broker_url)
+            peer = '{}:{}'.format(url.hostname, url.port)
+        else:
+            peer = '???'

Review comment:
       Shorthand for unknown peer, hostname should never not be present so this is an extreme just-in-case. Want me to change text to something like "unknown host" or something?

##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       Tried closing down channel and recreating in both parent and child after fork. It is possible I did not do it right since I am not grpc expert but I got one of two results:
   1. Worked in exactly one of the forks, parent or child, but not both.
   2. Didn't work in either.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657031313



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       > I didn't mean to close parent channel and create in child process by reusing the agent in parent. What I meant is to start another independent agent in child process and leave the parent one there because there may be other things that may need to be traced in parent process. Can you take a look at
   
   I tried several things like:
   * Not doing anything before the fork then creating new `GrpcServiceManagementClient` and `GrpcTraceSegmentReportService` in child.
   * The above but closing channel in child before creating new.
   * Closing the channel before fork then recreating in both parent and child.
   * Instead of `close()`, use `unsubscribe()`.
   * Both `unsubscribe()` then `close()` before fork or after in child.
   * I did also try waiting for empty queue before allowing fork to proceed but that was unnecessary as I wasn't even sending anything before the fork, just for form.
   
   I also forgot to mention there was a third result I was getting sometime, deadlock hang. It is possible I missed some permutations or a different function to call, but in general researching python grpc with multiprocessing on the the net I found basically the following answers, either 1. "don't do it", or 2. "grpc must be started only after forking everything, then maybe it will work". Here are some links:
   
   https://github.com/googleapis/synthtool/issues/902
   https://stackoverflow.com/questions/62798507/why-multiprocess-python-grpc-server-do-not-work
   
   So as I said, it may be possible but I have not hit on how to do it. If you want to give it a shot I will add a simple test scrip to the end of this message. I also didn't test anything with Kafka and assume it will not work correctly forking until someone validates that.
   
   As for current higher level flow, keep in mind it can be modified in the future according to what protocol is in use, but for now - Nothing special is done before fork or after in the parent. In those cases all threads and sockets and locks continue operating as if nothing had happened. In the child, new report and heartbeat threads are started since threads don't survive into children. And specifically in the http protocol, the duplicated sockets are closed and new ones are opened on next heartbeat or report.
   
   There is a potential problem with the `__queue` object as a thread may have been holding an internal lock on it before fork and since that thread is no longer present the queue will remain in a locked state. Not sure how to resolve this yet, but it should be a very rare event. Even rarer may be the same lock problem with the `__finished` event, but I wouldn't expect that to happen basically ever.
   
   Right now I have other stuff on my plate but if you have any suggestions on what to try I may revisit this at some point in the future. Or if you ant to try yourself, here is a test script:
   ```py
   import multiprocessing as mp
   import time
   
   from skywalking.trace.context import get_context
   from skywalking import agent, config
   
   config.init(collector='127.0.0.1:11800', service='your awesome service')
   agent.start()
   
   def foo():
       with get_context().new_local_span('child before error'):
           pass
   
       time.sleep(2)  # this needed to flush send because python doesn't run atexit handlers on exit in forked children
   
       # import atexit
       # atexit._run_exitfuncs()
   
   if __name__ == '__main__':
       p = mp.Process(target = foo, args = ())
   
       with get_context().new_local_span('parent before start'):
           pass
   
       p.start()
   
       time.sleep(1)
   
       with get_context().new_local_span('parent after start'):
           pass
   
       p.join()
   
       with get_context().new_local_span('parent after join'):
           pass
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r665270417



##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       Grpc IS the default protocol because of config.py:36
   ```
   protocol = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower()  # type: str
   ```
   What that line did was override whatever protocol was set by env var `SW_AGENT_PROTOCOL` with `grpc`, which was a bug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 edited a comment on pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 edited a comment on pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#issuecomment-872315770


   > @kezhenxu94 I notice you did "Merge branch 'master' into master", does this mean I should squash and merge?
   
   Not exactly, I just updated your branch to make sure your branch is up to date and pass CI, I haven't checked details in this PR b/c I'm recently busy at other emergent stuffs, should be able to look into this soon, sorry about that 🙇🏻 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#issuecomment-872305150


   @kezhenxu94 I notice you did "Merge branch 'master' into master", does this mean I should squash and merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on pull request #125: WIP celery plugin and required core changes

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#issuecomment-865350683


   Just noticed Python 3.6 is missing the critical `os.register_at_fork()`, will probably just not have support for forking under that version then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657992328



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       But also, the missing `failed to install plugin sw_celery` tells me you are not running this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] sonatype-lift[bot] commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r663997835



##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       I've recorded this as ignored for this pull request. If you change your mind, just comment `@sonatype-lift unignore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] sonatype-lift[bot] commented on a change in pull request #125: WIP celery plugin and required core changes

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r655586018



##########
File path: skywalking/config.py
##########
@@ -58,13 +58,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters = os.getenv('SW_CELERY_PARAMETERS', '').lower() == 'true'
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       I've recorded this as ignored for this pull request. If you change your mind, just comment `@sonatype-lift unignore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on pull request #125: WIP celery plugin and required core changes

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#issuecomment-865988381


   This should be final-ish, PR good to go. The only thing I didn't do is a test case because those take me way too long due to extremely slow iteration. Our own internal tests for this plugin are good so maybe leave the official SW test for this to an intern?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] wu-sheng commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r667484473



##########
File path: setup.py
##########
@@ -33,12 +33,13 @@
     author="Apache",
     author_email="dev@skywalking.apache.org",
     license="Apache 2.0",
-    packages=find_packages(exclude=("tests",)),
+    packages=find_packages(exclude=("tests", "tests.*")),
     include_package_data=True,
     install_requires=[
         "grpcio",
         "grpcio-tools",
         "packaging",
+        "requests",

Review comment:
       Why a plugin requires an agent-level dependency? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657115742



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       pwd
   # /Users/kezhenxu94/workspace/skywalking-python
   
   g rev-parse HEAD
   # 6002ccc5e7463be8960304c30ecae55753f001dc
   
   make install > /dev/null 2>&1
   cat <<EOF > test.py
   import multiprocessing as mp
   import time
   
   from skywalking.trace.context import get_context
   from skywalking import agent, config
   
   config.init(collector='127.0.0.1:11800', service='your awesome service')
   agent.start()
   
   def foo():
       with get_context().new_local_span('child before error'):
           pass
   
       time.sleep(2)  # this needed to flush send because python doesn't run atexit handlers on exit in forked children
   
       # import atexit
       # atexit._run_exitfuncs()
   
   if __name__ == '__main__':
       p = mp.Process(target = foo, args = ())
   
       with get_context().new_local_span('parent before start'):
           pass
   
       p.start()
   
       time.sleep(1)
   
       with get_context().new_local_span('parent after start'):
           pass
   
       p.join()
   
       with get_context().new_local_span('parent after join'):
           pass
   
       time.sleep(5)
   EOF
   
   docker run -it --rm -p 11800:11800 -p 12800:12800 -d apache/skywalking-oap-server:8.6.0-es6
   # 64fb2bf0e8b8030c69058d79c114969ab46207fe9391913cc91080528f510005
   
   # Start to test
   # Generate traffic
   python3 test.py
   # skywalking [MainThread] [WARNING] failed to install plugin sw_aiohttp
   # skywalking [MainThread] [WARNING] failed to install plugin sw_django
   # skywalking [MainThread] [WARNING] failed to install plugin sw_elasticsearch
   # skywalking [MainThread] [WARNING] failed to install plugin sw_flask
   # skywalking [MainThread] [WARNING] failed to install plugin sw_kafka
   # skywalking [MainThread] [WARNING] failed to install plugin sw_psycopg2
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymongo
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymysql
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pyramid
   # skywalking [MainThread] [WARNING] failed to install plugin sw_rabbitmq
   # skywalking [MainThread] [WARNING] failed to install plugin sw_redis
   # skywalking [MainThread] [WARNING] failed to install plugin sw_requests
   # skywalking [MainThread] [WARNING] failed to install plugin sw_sanic
   # skywalking [MainThread] [WARNING] failed to install plugin sw_tornado
   # skywalking [MainThread] [WARNING] failed to install plugin sw_urllib3
   # skywalking [MainThread] [WARNING] failed to install plugin sw_aiohttp
   # skywalking [MainThread] [WARNING] failed to install plugin sw_django
   # skywalking [MainThread] [WARNING] failed to install plugin sw_elasticsearch
   # skywalking [MainThread] [WARNING] failed to install plugin sw_flask
   # skywalking [MainThread] [WARNING] failed to install plugin sw_kafka
   # skywalking [MainThread] [WARNING] failed to install plugin sw_psycopg2
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymongo
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymysql
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pyramid
   # skywalking [MainThread] [WARNING] failed to install plugin sw_rabbitmq
   # skywalking [MainThread] [WARNING] failed to install plugin sw_redis
   # skywalking [MainThread] [WARNING] failed to install plugin sw_requests
   # skywalking [MainThread] [WARNING] failed to install plugin sw_sanic
   # skywalking [MainThread] [WARNING] failed to install plugin sw_tornado
   # skywalking [MainThread] [WARNING] failed to install plugin sw_urllib3
   # Query and see the span number
   
   curl -s 'http://localhost:12800/graphql' \
   -X 'POST' \
   -H 'Content-Type: application/json;charset=utf-8' \
   --data-binary '{"query":"query queryTraces($condition: TraceQueryCondition) {\n  data: queryBasicTraces(condition: $condition) { traces { endpointNames } total }}","variables":{"condition":{"queryDuration":{"start":"2021-06-23 131309","end":"2021-06-23 232809","step":"SECOND"},"traceState":"ALL","paging":{"pageNum":1,"pageSize":15,"needTotal":true},"queryOrder":"BY_START_TIME","tags":[]}}}' | jq '.data.data.total'
   
   # 4 <================== 
   
   python3 test.py
   # skywalking [MainThread] [WARNING] failed to install plugin sw_aiohttp
   # skywalking [MainThread] [WARNING] failed to install plugin sw_django
   # skywalking [MainThread] [WARNING] failed to install plugin sw_elasticsearch
   # skywalking [MainThread] [WARNING] failed to install plugin sw_flask
   # skywalking [MainThread] [WARNING] failed to install plugin sw_kafka
   # skywalking [MainThread] [WARNING] failed to install plugin sw_psycopg2
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymongo
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymysql
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pyramid
   # skywalking [MainThread] [WARNING] failed to install plugin sw_rabbitmq
   # skywalking [MainThread] [WARNING] failed to install plugin sw_redis
   # skywalking [MainThread] [WARNING] failed to install plugin sw_requests
   # skywalking [MainThread] [WARNING] failed to install plugin sw_sanic
   # skywalking [MainThread] [WARNING] failed to install plugin sw_tornado
   # skywalking [MainThread] [WARNING] failed to install plugin sw_urllib3
   # skywalking [MainThread] [WARNING] failed to install plugin sw_aiohttp
   # skywalking [MainThread] [WARNING] failed to install plugin sw_django
   # skywalking [MainThread] [WARNING] failed to install plugin sw_elasticsearch
   # skywalking [MainThread] [WARNING] failed to install plugin sw_flask
   # skywalking [MainThread] [WARNING] failed to install plugin sw_kafka
   # skywalking [MainThread] [WARNING] failed to install plugin sw_psycopg2
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymongo
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pymysql
   # skywalking [MainThread] [WARNING] failed to install plugin sw_pyramid
   # skywalking [MainThread] [WARNING] failed to install plugin sw_rabbitmq
   # skywalking [MainThread] [WARNING] failed to install plugin sw_redis
   # skywalking [MainThread] [WARNING] failed to install plugin sw_requests
   # skywalking [MainThread] [WARNING] failed to install plugin sw_sanic
   # skywalking [MainThread] [WARNING] failed to install plugin sw_tornado
   # skywalking [MainThread] [WARNING] failed to install plugin sw_urllib3
   
   # Query and see the span number
   curl -s 'http://localhost:12800/graphql' \
   -X 'POST' \
   -H 'Content-Type: application/json;charset=utf-8' \
   --data-binary '{"query":"query queryTraces($condition: TraceQueryCondition) {\n  data: queryBasicTraces(condition: $condition) { traces { endpointNames } total }}","variables":{"condition":{"queryDuration":{"start":"2021-06-23 131309","end":"2021-06-23 232809","step":"SECOND"},"traceState":"ALL","paging":{"pageNum":1,"pageSize":15,"needTotal":true},"queryOrder":"BY_START_TIME","tags":[]}}}' | jq '.data.data.total'
   
   # 8 <===============
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#issuecomment-872315770


   > @kezhenxu94 I notice you did "Merge branch 'master' into master", does this mean I should squash and merge?
   
   Not exactly, I just updated your branch to make sure your branch is up to date, I haven't checked details in this PR b/c I'm recently busy at other emergent stuffs, should be able to look into this soon, sorry about that 🙇🏻 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] sonatype-lift[bot] commented on a change in pull request #125: WIP celery plugin and required core changes

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r655530004



##########
File path: skywalking/config.py
##########
@@ -58,13 +58,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters = os.getenv('SW_CELERY_PARAMETERS', '').lower() == 'true'
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       *Incompatible variable type:*  protocol_type is declared to have type `str` but is used as type `None`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)

##########
File path: skywalking/config.py
##########
@@ -58,13 +58,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters = os.getenv('SW_CELERY_PARAMETERS', '').lower() == 'true'
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       I've recorded this as ignored for this pull request. If you change your mind, just comment `@sonatype-lift unignore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657094466



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       Can you give me version numbers of everything relevant, python included?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r667495773



##########
File path: setup.py
##########
@@ -33,12 +33,13 @@
     author="Apache",
     author_email="dev@skywalking.apache.org",
     license="Apache 2.0",
-    packages=find_packages(exclude=("tests",)),
+    packages=find_packages(exclude=("tests", "tests.*")),
     include_package_data=True,
     install_requires=[
         "grpcio",
         "grpcio-tools",
         "packaging",
+        "requests",

Review comment:
       Sure if it is problematic then we remove it from the required dependencies if the license will cause problems. I could also look into using a different communication method like `urllib.request` or `urllib3.request`?
   
   As for http protocol, we are doing stress testing here and finding that grpc is not entirely reliable and the http protocol is actually a lot more stable. Not sure why this is happening, maybe grpc is not configured correctly or the timeouts are causing problems. But the main result is that you should consider the http protocol a little more than just optional at this point since it is capable of working in scenarios for us where grpc breaks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r663997799



##########
File path: skywalking/config.py
##########
@@ -59,13 +59,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r659414619



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       Same result with or without a 2 sec delay




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] wu-sheng commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r667496303



##########
File path: setup.py
##########
@@ -33,12 +33,13 @@
     author="Apache",
     author_email="dev@skywalking.apache.org",
     license="Apache 2.0",
-    packages=find_packages(exclude=("tests",)),
+    packages=find_packages(exclude=("tests", "tests.*")),
     include_package_data=True,
     install_requires=[
         "grpcio",
         "grpcio-tools",
         "packaging",
+        "requests",

Review comment:
       @tom-pytel Could you share how you test the performance in another separate issue? From the last several weeks' perf tests, the JSON really doesn't have good performance from a Java perspective, tested in the OAP backend.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657966991



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       Can you add `time.sleep(5)` at the end of the program and retry? Like what I did 👆




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r667485055



##########
File path: setup.py
##########
@@ -33,12 +33,13 @@
     author="Apache",
     author_email="dev@skywalking.apache.org",
     license="Apache 2.0",
-    packages=find_packages(exclude=("tests",)),
+    packages=find_packages(exclude=("tests", "tests.*")),
     include_package_data=True,
     install_requires=[
         "grpcio",
         "grpcio-tools",
         "packaging",
+        "requests",

Review comment:
       > Why a plugin requires an agent-level dependency?
   
   We support grpc and http protocols, for http protocol, we use `requests` to send http requests, as we use grpc as default protocol and http is optional (can be installed by `pip install skywalking-python[http]`), I think @tom-pytel missed that and wanted to test http protocol so he add the dependency here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r656722967



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       I your current implementation, when new processes are spawned, the agent in parent process takes no effect then, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r656290342



##########
File path: skywalking/plugins/sw_celery.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from skywalking import Layer, Component, config
+from skywalking.trace import tags
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import Tag
+
+
+def install():
+    from urllib.parse import urlparse
+    from celery import Celery
+
+    def send_task(self, name, args=None, kwargs=None, **options):
+        # NOTE: Lines commented out below left for documentation purposes if sometime in the future exchange / queue
+        # names are wanted. Currently these do not match between producer and consumer so would need some work.
+
+        broker_url = self.conf['broker_url']
+        # exchange = options['exchange']
+        # queue = options['routing_key']
+        # op = 'celery/{}/{}/{}'.format(exchange or '', queue or '', name)
+        op = 'celery/' + name
+
+        if broker_url:
+            url = urlparse(broker_url)
+            peer = '{}:{}'.format(url.hostname, url.port)
+        else:
+            peer = '???'

Review comment:
       Is this the final state?

##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       Is it possible to make gRPC work in fork? Like I said in DM, recreate a brand new gRPC channel in forked process?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] wu-sheng commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r667487085



##########
File path: setup.py
##########
@@ -33,12 +33,13 @@
     author="Apache",
     author_email="dev@skywalking.apache.org",
     license="Apache 2.0",
-    packages=find_packages(exclude=("tests",)),
+    packages=find_packages(exclude=("tests", "tests.*")),
     include_package_data=True,
     install_requires=[
         "grpcio",
         "grpcio-tools",
         "packaging",
+        "requests",

Review comment:
       OK, get it. It is glad we don't really depend on it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r661486113



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       I have tried a few more times, with upstream/master, and still bad results. I did get one run where I got all 4 spans but the rest of the runs were 3 spans with a couple of deadlocks. Apart from that, upstream/master can not possibly run correctly in a multiprocessing scenario because on fork() no other threads are duplicated in the child (like report or heartbeat), they need to be explicitly recreated in a fork child (which I do in this PR).
   
   I don't have time allocated now to look into the grpc issue but I do know that http protocol in this PR works with fork() for sure. So how do you want to proceed? I could remove that warning message if you want, or change it to something a little less absolute like "fork() may not work correctly with grpc protocol"? But in general this PR does not change anything about how grpc worked before, just fixes the http protocol and adds restart of report and heartbeat threads in fork() child. And also the celery plugin of course.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] kezhenxu94 commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r659413923



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       > But also, the missing `failed to install plugin sw_celery` tells me you are not running this PR.
   
   I was running on master branch, not this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: Python celery plugin

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657104330



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % config.protocol)

Review comment:
       If you got this far, you should check with some spans before the fork as well to see what happens.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] tom-pytel commented on a change in pull request #125: WIP celery plugin and required core changes

Posted by GitBox <gi...@apache.org>.
tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r655585973



##########
File path: skywalking/config.py
##########
@@ -58,13 +58,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters = os.getenv('SW_CELERY_PARAMETERS', '').lower() == 'true'
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       ignore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-python] sonatype-lift[bot] commented on a change in pull request #125: WIP celery plugin and required core changes

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r655530004



##########
File path: skywalking/config.py
##########
@@ -58,13 +58,14 @@
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+celery_parameters = os.getenv('SW_CELERY_PARAMETERS', '').lower() == 'true'
 
 
 def init(
         service: str = None,
         instance: str = None,
         collector: str = None,
-        protocol_type: str = 'grpc',
+        protocol_type: str = None,

Review comment:
       *Incompatible variable type:*  protocol_type is declared to have type `str` but is used as type `None`.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org