You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/03/31 01:04:46 UTC

[incubator-devlake] branch main updated: [fix-4772]: Python progress update crashes Go DevLake server (#4815)

This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new d0cd663be [fix-4772]: Python progress update crashes Go DevLake server (#4815)
d0cd663be is described below

commit d0cd663bea84c033d35af9febac02991b1147a3f
Author: Keon Amini <ke...@merico.dev>
AuthorDate: Thu Mar 30 20:04:40 2023 -0500

    [fix-4772]: Python progress update crashes Go DevLake server (#4815)
    
    * fix: Fix for panic in Go if Python returns a remote progress via FdOut
    
    * fix: Workaround for Pycharm debugger getting stuck
    
    * test: Have Python fakeplugin return more data so we can test this fix
    
    * fix: Progress counter logic fix
---
 backend/core/utils/ipc.go                          |  8 +++++--
 backend/go.mod                                     |  2 +-
 backend/python/pydevlake/pydevlake/__init__.py     |  6 +++++
 .../python/pydevlake/pydevlake/helpers/debugger.py |  6 ++---
 backend/python/pydevlake/pydevlake/ipc.py          |  3 ---
 backend/python/pydevlake/pydevlake/subtasks.py     | 20 ++++++++++++-----
 backend/python/test/fakeplugin/fakeplugin/main.py  | 26 +++++++++++++---------
 backend/server/services/remote/bridge/bridge.go    |  2 +-
 8 files changed, 46 insertions(+), 27 deletions(-)

diff --git a/backend/core/utils/ipc.go b/backend/core/utils/ipc.go
index 241fbd77a..4a0f814bf 100644
--- a/backend/core/utils/ipc.go
+++ b/backend/core/utils/ipc.go
@@ -243,7 +243,9 @@ func scanOutputPipe(pipe io.ReadCloser, wg *sync.WaitGroup, onReceive func([]byt
 			src := scanner.Bytes()
 			data := make([]byte, len(src))
 			copy(data, src)
-			onReceive(data)
+			if onReceive != nil {
+				onReceive(data)
+			}
 			outboundChannel <- responseCreator(data)
 		}
 		wg.Done()
@@ -259,7 +261,9 @@ func scanErrorPipe(pipe io.ReadCloser, onReceive func([]byte), outboundChannel c
 			src := scanner.Bytes()
 			data := make([]byte, len(src))
 			copy(data, src)
-			onReceive(data)
+			if onReceive != nil {
+				onReceive(data)
+			}
 			outboundChannel <- &ProcessResponse{stderr: data}
 			_, _ = remoteErrorMsg.Write(src)
 			_, _ = remoteErrorMsg.WriteString("\n")
diff --git a/backend/go.mod b/backend/go.mod
index 38ddbd22b..95370a7a4 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -68,7 +68,7 @@ require (
 	github.com/go-openapi/swag v0.21.1 // indirect
 	github.com/go-playground/locales v0.14.0 // indirect
 	github.com/go-playground/universal-translator v0.18.0 // indirect
-	github.com/go-sql-driver/mysql v1.6.0
+	github.com/go-sql-driver/mysql v1.6.0 // indirect
 	github.com/gogo/googleapis v1.4.1 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/gogo/status v1.1.0 // indirect
diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index 5374e7bbe..237c345d5 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -22,3 +22,9 @@ from .message import RemoteScopeGroup
 from .plugin import Plugin, ScopeTxRulePair
 from .stream import DomainType, Stream, Substream
 from .context import Context
+
+# the debugger hangs on startup during plugin registration (reason unknown), hence this workaround
+import sys
+if not sys.argv.__contains__('startup'):
+    from pydevlake.helpers import debugger
+    debugger.init()
diff --git a/backend/python/pydevlake/pydevlake/helpers/debugger.py b/backend/python/pydevlake/pydevlake/helpers/debugger.py
index 78d8666b9..528528f39 100644
--- a/backend/python/pydevlake/pydevlake/helpers/debugger.py
+++ b/backend/python/pydevlake/pydevlake/helpers/debugger.py
@@ -18,7 +18,7 @@ import os
 from pydevlake import logger
 
 
-def __start__():
+def init():
     debugger = os.getenv("USE_PYTHON_DEBUGGER", default="").lower()
     if debugger == "":
         return
@@ -32,12 +32,10 @@ def __start__():
             import pydevd_pycharm as pydevd
             try:
                 pydevd.settrace(host=host, port=port, suspend=False, stdoutToServer=True, stderrToServer=True)
+                logger.info("Pycharm remote debugger successfully connected")
             except TimeoutError as e:
                 logger.error(f"Failed to connect to pycharm debugger on {host}:{port}. Make sure it is running")
         except ImportError as e:
             logger.error("Pycharm debugger library is not installed")
     else:
         logger.error(f"Unsupported Python debugger specified: {debugger}")
-
-
-__start__()
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index f28c3af67..b75895ce1 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -24,9 +24,6 @@ from pydevlake.message import Message
 
 
 def plugin_method(func):
-    # keep this to enable debugging, and don't move this elsewhere or it can cause crashes if it gets called during the bootstrap process.
-    # noinspection PyUnresolvedReferences
-    from pydevlake.helpers import debugger
 
     def open_send_channel() -> TextIO:
         fd = 3
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py b/backend/python/pydevlake/pydevlake/subtasks.py
index d7f1fe59b..15b84e92f 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -55,20 +55,28 @@ class Subtask:
                 state = dict()
 
             try:
-                for i, (data, state) in enumerate(self.fetch(state, session, ctx)):
+                records = self.fetch(state, session, ctx)
+                progress = last_progress = 0
+                for data, state in records:
+                    progress += 1
                     self.process(data, session, ctx)
-
-                    if i % sync_point_interval == 0 and i != 0:
+                    if progress % sync_point_interval == 0:
                         # Save current state
                         subtask_run.state = json.dumps(state)
                         session.merge(subtask_run)
                         session.commit()
-
                         # Send progress
                         yield RemoteProgress(
                             increment=sync_point_interval,
-                            current=i
+                            current=progress
                         )
+                        last_progress = progress
+                # Send final progress
+                if progress != last_progress:
+                    yield RemoteProgress(
+                        increment=progress-last_progress,
+                        current=progress
+                    )
             except Exception as e:
                 logger.error(f'{type(e).__name__}: {e}')
                 raise e
@@ -99,7 +107,7 @@ class Subtask:
         pass
 
     @abstractmethod
-    def process(self, data: object, session: Session):
+    def process(self, data: object, session: Session, ctx: Context):
         """
         Called for all data entries returned by `fetch`.
         """
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py b/backend/python/test/fakeplugin/fakeplugin/main.py
index f1903c91f..ac6ac5308 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -23,7 +23,6 @@ from sqlmodel import Field
 from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel, ToolScope, RemoteScopeGroup, DomainType
 from pydevlake.domain_layer.devops import CicdScope, CICDPipeline, CICDStatus, CICDResult, CICDType
 
-
 VALID_TOKEN = "this_is_a_valid_token"
 
 
@@ -40,18 +39,12 @@ class FakePipeline(ToolModel, table=True):
     state: State
 
 
-class FakeStream(Stream):
+class FakePipelineStream(Stream):
     tool_model = FakePipeline
     domain_types = [DomainType.CICD]
 
-    fake_pipelines = [
-        FakePipeline(id=1, state=FakePipeline.State.SUCCESS, started_at=datetime(2023, 1, 10, 11, 0, 0), finished_at=datetime(2023, 1, 10, 11, 3, 0)),
-        FakePipeline(id=2, state=FakePipeline.State.FAILURE, started_at=datetime(2023, 1, 10, 12, 0, 0), finished_at=datetime(2023, 1, 10, 12, 1, 30)),
-        FakePipeline(id=1, state=FakePipeline.State.PENDING),
-    ]
-
     def collect(self, state, context):
-        for p in self.fake_pipelines:
+        for p in self.generate_fake_pipelines():
             yield json.loads(p.json()), {}
 
     def convert(self, pipeline: FakePipeline, ctx):
@@ -90,6 +83,19 @@ class FakeStream(Stream):
             return (pipeline.finished_at - pipeline.started_at).seconds
         return None
 
+    @classmethod
+    def generate_fake_pipelines(cls) -> list[FakePipeline]:
+        states = [FakePipeline.State.SUCCESS, FakePipeline.State.FAILURE, FakePipeline.State.PENDING]
+        fake_pipelines = []
+        for i in range(250):
+            fake_pipelines.append(FakePipeline(
+                id=i,
+                state=states[i % len(states)],
+                started_at=datetime(2023, 1, 10, 11, 0, 0, microsecond=i),
+                finished_at=datetime(2023, 1, 10, 11, 3, 0, microsecond=i),
+            ))
+        return fake_pipelines
+
 
 class FakeConnection(Connection):
     token: str
@@ -149,7 +155,7 @@ class FakePlugin(Plugin):
     @property
     def streams(self):
         return [
-            FakeStream
+            FakePipelineStream
         ]
 
 
diff --git a/backend/server/services/remote/bridge/bridge.go b/backend/server/services/remote/bridge/bridge.go
index d526494d8..727ef7e2c 100644
--- a/backend/server/services/remote/bridge/bridge.go
+++ b/backend/server/services/remote/bridge/bridge.go
@@ -53,7 +53,7 @@ func (b *Bridge) RemoteSubtaskEntrypointHandler(subtaskMeta models.SubtaskMeta)
 			if err != nil {
 				return err
 			}
-			if progress.Current != 0 {
+			if progress.Total != 0 {
 				ctx.SetProgress(progress.Current, progress.Total)
 			} else if progress.Increment != 0 {
 				ctx.IncProgress(progress.Increment)