You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/03/31 01:04:46 UTC
[incubator-devlake] branch main updated: [fix-4772]: Python progress update crashes Go DevLake server (#4815)
This is an automated email from the ASF dual-hosted git repository.
hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new d0cd663be [fix-4772]: Python progress update crashes Go DevLake server (#4815)
d0cd663be is described below
commit d0cd663bea84c033d35af9febac02991b1147a3f
Author: Keon Amini <ke...@merico.dev>
AuthorDate: Thu Mar 30 20:04:40 2023 -0500
[fix-4772]: Python progress update crashes Go DevLake server (#4815)
* fix: Fix for panic in Go if Python returns a remote progress via FdOut
* fix: Workaround for Pycharm debugger getting stuck
* test: Have Python fakeplugin return more data so we can test this fix
* fix: Progress counter logic fix
---
backend/core/utils/ipc.go | 8 +++++--
backend/go.mod | 2 +-
backend/python/pydevlake/pydevlake/__init__.py | 6 +++++
.../python/pydevlake/pydevlake/helpers/debugger.py | 6 ++---
backend/python/pydevlake/pydevlake/ipc.py | 3 ---
backend/python/pydevlake/pydevlake/subtasks.py | 20 ++++++++++++-----
backend/python/test/fakeplugin/fakeplugin/main.py | 26 +++++++++++++---------
backend/server/services/remote/bridge/bridge.go | 2 +-
8 files changed, 46 insertions(+), 27 deletions(-)
diff --git a/backend/core/utils/ipc.go b/backend/core/utils/ipc.go
index 241fbd77a..4a0f814bf 100644
--- a/backend/core/utils/ipc.go
+++ b/backend/core/utils/ipc.go
@@ -243,7 +243,9 @@ func scanOutputPipe(pipe io.ReadCloser, wg *sync.WaitGroup, onReceive func([]byt
src := scanner.Bytes()
data := make([]byte, len(src))
copy(data, src)
- onReceive(data)
+ if onReceive != nil {
+ onReceive(data)
+ }
outboundChannel <- responseCreator(data)
}
wg.Done()
@@ -259,7 +261,9 @@ func scanErrorPipe(pipe io.ReadCloser, onReceive func([]byte), outboundChannel c
src := scanner.Bytes()
data := make([]byte, len(src))
copy(data, src)
- onReceive(data)
+ if onReceive != nil {
+ onReceive(data)
+ }
outboundChannel <- &ProcessResponse{stderr: data}
_, _ = remoteErrorMsg.Write(src)
_, _ = remoteErrorMsg.WriteString("\n")
diff --git a/backend/go.mod b/backend/go.mod
index 38ddbd22b..95370a7a4 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -68,7 +68,7 @@ require (
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
- github.com/go-sql-driver/mysql v1.6.0
+ github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.0 // indirect
diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index 5374e7bbe..237c345d5 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -22,3 +22,9 @@ from .message import RemoteScopeGroup
from .plugin import Plugin, ScopeTxRulePair
from .stream import DomainType, Stream, Substream
from .context import Context
+
+# the debugger hangs on startup during plugin registration (reason unknown), hence this workaround
+import sys
+if not sys.argv.__contains__('startup'):
+ from pydevlake.helpers import debugger
+ debugger.init()
diff --git a/backend/python/pydevlake/pydevlake/helpers/debugger.py b/backend/python/pydevlake/pydevlake/helpers/debugger.py
index 78d8666b9..528528f39 100644
--- a/backend/python/pydevlake/pydevlake/helpers/debugger.py
+++ b/backend/python/pydevlake/pydevlake/helpers/debugger.py
@@ -18,7 +18,7 @@ import os
from pydevlake import logger
-def __start__():
+def init():
debugger = os.getenv("USE_PYTHON_DEBUGGER", default="").lower()
if debugger == "":
return
@@ -32,12 +32,10 @@ def __start__():
import pydevd_pycharm as pydevd
try:
pydevd.settrace(host=host, port=port, suspend=False, stdoutToServer=True, stderrToServer=True)
+ logger.info("Pycharm remote debugger successfully connected")
except TimeoutError as e:
logger.error(f"Failed to connect to pycharm debugger on {host}:{port}. Make sure it is running")
except ImportError as e:
logger.error("Pycharm debugger library is not installed")
else:
logger.error(f"Unsupported Python debugger specified: {debugger}")
-
-
-__start__()
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index f28c3af67..b75895ce1 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -24,9 +24,6 @@ from pydevlake.message import Message
def plugin_method(func):
- # keep this to enable debugging, and don't move this elsewhere or it can cause crashes if it gets called during the bootstrap process.
- # noinspection PyUnresolvedReferences
- from pydevlake.helpers import debugger
def open_send_channel() -> TextIO:
fd = 3
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py b/backend/python/pydevlake/pydevlake/subtasks.py
index d7f1fe59b..15b84e92f 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -55,20 +55,28 @@ class Subtask:
state = dict()
try:
- for i, (data, state) in enumerate(self.fetch(state, session, ctx)):
+ records = self.fetch(state, session, ctx)
+ progress = last_progress = 0
+ for data, state in records:
+ progress += 1
self.process(data, session, ctx)
-
- if i % sync_point_interval == 0 and i != 0:
+ if progress % sync_point_interval == 0:
# Save current state
subtask_run.state = json.dumps(state)
session.merge(subtask_run)
session.commit()
-
# Send progress
yield RemoteProgress(
increment=sync_point_interval,
- current=i
+ current=progress
)
+ last_progress = progress
+ # Send final progress
+ if progress != last_progress:
+ yield RemoteProgress(
+ increment=progress-last_progress,
+ current=progress
+ )
except Exception as e:
logger.error(f'{type(e).__name__}: {e}')
raise e
@@ -99,7 +107,7 @@ class Subtask:
pass
@abstractmethod
- def process(self, data: object, session: Session):
+ def process(self, data: object, session: Session, ctx: Context):
"""
Called for all data entries returned by `fetch`.
"""
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py b/backend/python/test/fakeplugin/fakeplugin/main.py
index f1903c91f..ac6ac5308 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -23,7 +23,6 @@ from sqlmodel import Field
from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel, ToolScope, RemoteScopeGroup, DomainType
from pydevlake.domain_layer.devops import CicdScope, CICDPipeline, CICDStatus, CICDResult, CICDType
-
VALID_TOKEN = "this_is_a_valid_token"
@@ -40,18 +39,12 @@ class FakePipeline(ToolModel, table=True):
state: State
-class FakeStream(Stream):
+class FakePipelineStream(Stream):
tool_model = FakePipeline
domain_types = [DomainType.CICD]
- fake_pipelines = [
- FakePipeline(id=1, state=FakePipeline.State.SUCCESS, started_at=datetime(2023, 1, 10, 11, 0, 0), finished_at=datetime(2023, 1, 10, 11, 3, 0)),
- FakePipeline(id=2, state=FakePipeline.State.FAILURE, started_at=datetime(2023, 1, 10, 12, 0, 0), finished_at=datetime(2023, 1, 10, 12, 1, 30)),
- FakePipeline(id=1, state=FakePipeline.State.PENDING),
- ]
-
def collect(self, state, context):
- for p in self.fake_pipelines:
+ for p in self.generate_fake_pipelines():
yield json.loads(p.json()), {}
def convert(self, pipeline: FakePipeline, ctx):
@@ -90,6 +83,19 @@ class FakeStream(Stream):
return (pipeline.finished_at - pipeline.started_at).seconds
return None
+ @classmethod
+ def generate_fake_pipelines(cls) -> list[FakePipeline]:
+ states = [FakePipeline.State.SUCCESS, FakePipeline.State.FAILURE, FakePipeline.State.PENDING]
+ fake_pipelines = []
+ for i in range(250):
+ fake_pipelines.append(FakePipeline(
+ id=i,
+ state=states[i % len(states)],
+ started_at=datetime(2023, 1, 10, 11, 0, 0, microsecond=i),
+ finished_at=datetime(2023, 1, 10, 11, 3, 0, microsecond=i),
+ ))
+ return fake_pipelines
+
class FakeConnection(Connection):
token: str
@@ -149,7 +155,7 @@ class FakePlugin(Plugin):
@property
def streams(self):
return [
- FakeStream
+ FakePipelineStream
]
diff --git a/backend/server/services/remote/bridge/bridge.go b/backend/server/services/remote/bridge/bridge.go
index d526494d8..727ef7e2c 100644
--- a/backend/server/services/remote/bridge/bridge.go
+++ b/backend/server/services/remote/bridge/bridge.go
@@ -53,7 +53,7 @@ func (b *Bridge) RemoteSubtaskEntrypointHandler(subtaskMeta models.SubtaskMeta)
if err != nil {
return err
}
- if progress.Current != 0 {
+ if progress.Total != 0 {
ctx.SetProgress(progress.Current, progress.Total)
} else if progress.Increment != 0 {
ctx.IncProgress(progress.Increment)