You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/05/31 02:39:17 UTC
[beam] branch master updated: [Playground] Resolve CI/CD failures (#26927)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d86e5a13acc [Playground] Resolve CI/CD failures (#26927)
d86e5a13acc is described below
commit d86e5a13acc96b4b35c2966320df2e945f904a3e
Author: Timur Sultanov <ti...@akvelon.com>
AuthorDate: Wed May 31 06:39:11 2023 +0400
[Playground] Resolve CI/CD failures (#26927)
Fetch example execution results right after each example execution instead of waiting for all examples to finish execution
---
playground/infrastructure/helper.py | 38 +-------
playground/infrastructure/test_helper.py | 15 +--
playground/infrastructure/test_verify.py | 18 +++-
playground/infrastructure/verify.py | 156 +++++++++++++++++++------------
4 files changed, 117 insertions(+), 110 deletions(-)
diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py
index 5d02b516413..f211650f7bb 100644
--- a/playground/infrastructure/helper.py
+++ b/playground/infrastructure/helper.py
@@ -25,7 +25,6 @@ from typing import List, Optional, Dict
from api.v1 import api_pb2
import pydantic
-from tqdm.asyncio import tqdm
import yaml
from api.v1.api_pb2 import (
@@ -124,41 +123,6 @@ def find_examples(root_dir: str, subdirs: List[str], sdk: SdkEnum) -> List[Examp
return examples
-async def get_statuses(
- client: GRPCClient, examples: List[Example], concurrency: int = 10
-):
- """
- Receive status and update example.status and example.pipeline_id for
- each example
-
- Args:
- examples: beam examples for processing and updating statuses and
- pipeline_id values.
- """
- tasks = []
- try:
- concurrency = int(os.environ["BEAM_CONCURRENCY"])
- logging.info("override default concurrency: %d", concurrency)
- except (KeyError, ValueError):
- pass
-
- semaphore = asyncio.Semaphore(concurrency)
-
- async def _semaphored_task(example):
- await semaphore.acquire()
- try:
- await _update_example_status(example, client)
- finally:
- semaphore.release()
-
- for example in examples:
- if example.tag.never_run:
- logging.info("skipping non runnable example %s", example.filepath)
- else:
- tasks.append(_semaphored_task(example))
- await tqdm.gather(*tasks)
-
-
def get_tag(filepath: PurePath) -> Optional[Tag]:
"""
Parse file by filepath and find beam tag
@@ -299,7 +263,7 @@ def _get_example(filepath: str, filename: str, tag: Tag, sdk: int) -> Example:
)
-async def _update_example_status(example: Example, client: GRPCClient):
+async def update_example_status(example: Example, client: GRPCClient):
"""
Receive status for examples and update example.status and pipeline_id
diff --git a/playground/infrastructure/test_helper.py b/playground/infrastructure/test_helper.py
index d9f4128a9bc..e9f6b2dbcc0 100644
--- a/playground/infrastructure/test_helper.py
+++ b/playground/infrastructure/test_helper.py
@@ -46,9 +46,8 @@ from helper import (
_load_example,
get_tag,
Tag,
- get_statuses,
_check_no_nested,
- _update_example_status,
+ update_example_status,
_get_object_type,
validate_examples_for_duplicates_by_name,
validate_examples_for_conflicting_datasets,
@@ -118,16 +117,6 @@ def test_find_examples(
)
-@pytest.mark.asyncio
-@mock.patch("helper._update_example_status")
-async def test_get_statuses(mock_update_example_status, create_test_example):
- example = create_test_example()
- client = mock.sentinel
- await get_statuses(client, [example])
-
- mock_update_example_status.assert_called_once_with(example, client)
-
-
@mock.patch(
"builtins.open",
mock_open(
@@ -454,7 +443,7 @@ async def test__update_example_status(
mock_grpc_client_run_code.return_value = "pipeline_id"
mock_grpc_client_check_status.side_effect = [STATUS_VALIDATING, STATUS_FINISHED]
- await _update_example_status(example, GRPCClient())
+ await update_example_status(example, GRPCClient())
assert example.pipeline_id == "pipeline_id"
assert example.status == STATUS_FINISHED
diff --git a/playground/infrastructure/test_verify.py b/playground/infrastructure/test_verify.py
index df887478a98..c1a8429e81b 100644
--- a/playground/infrastructure/test_verify.py
+++ b/playground/infrastructure/test_verify.py
@@ -15,6 +15,7 @@
import mock
import pytest
+from mock.mock import AsyncMock
from api.v1.api_pb2 import (
SDK_JAVA,
@@ -26,9 +27,10 @@ from api.v1.api_pb2 import (
STATUS_COMPILE_ERROR,
STATUS_RUN_ERROR,
)
-from verify import Verifier, VerifyException
+
from config import Origin
from models import SdkEnum
+from verify import Verifier, VerifyException
@pytest.mark.asyncio
@@ -71,3 +73,17 @@ async def test__verify_examples(create_test_example):
client, examples_with_several_def_ex, Origin.PG_EXAMPLES
)
await verifier._verify_examples(client, examples_without_errors, Origin.PG_EXAMPLES)
+
+
+@pytest.mark.asyncio
+@mock.patch("verify.update_example_status")
+async def test_get_statuses(mock_update_example_status, create_test_example):
+ example = create_test_example()
+ client = mock.sentinel
+ verifier = Verifier(SdkEnum.JAVA, Origin.PG_EXAMPLES)
+
+ verifier._populate_fields = AsyncMock()
+
+ await verifier._get_statuses(client, [example])
+
+ mock_update_example_status.assert_called_once_with(example, client)
\ No newline at end of file
diff --git a/playground/infrastructure/verify.py b/playground/infrastructure/verify.py
index 59ac32d8643..78d3b71a2f2 100644
--- a/playground/infrastructure/verify.py
+++ b/playground/infrastructure/verify.py
@@ -15,9 +15,12 @@
import asyncio
import logging
+import os
from pathlib import Path
from typing import List
+from tqdm.asyncio import tqdm
+
from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA
from api.v1.api_pb2 import (
STATUS_COMPILE_ERROR,
@@ -29,8 +32,8 @@ from api.v1.api_pb2 import (
)
from config import Origin, Config
from grpc_client import GRPCClient
+from helper import update_example_status
from models import Example, SdkEnum
-from helper import get_statuses
class VerifyException(Exception):
@@ -61,6 +64,98 @@ class Verifier:
asyncio.run(self._run_and_verify(examples))
logging.info("Finish of executing Playground examples")
+ async def _get_statuses(
+ self,
+ client: GRPCClient,
+ examples: List[Example],
+ concurrency: int = 10
+ ):
+ """
+ Receive status and update example.status and example.pipeline_id for
+ each example
+
+ Args:
+ examples: beam examples for processing and updating statuses and
+ pipeline_id values.
+ """
+ tasks = []
+ try:
+ concurrency = int(os.environ["BEAM_CONCURRENCY"])
+ logging.info("override default concurrency: %d", concurrency)
+ except (KeyError, ValueError):
+ pass
+
+ semaphore = asyncio.Semaphore(concurrency)
+
+ async def _semaphored_task(example):
+ await semaphore.acquire()
+ try:
+ await update_example_status(example, client)
+ await self._populate_fields(example, client)
+ finally:
+ semaphore.release()
+
+ for example in examples:
+ if example.tag.never_run:
+ logging.info("skipping non runnable example %s", example.filepath)
+ else:
+ tasks.append(_semaphored_task(example))
+ await tqdm.gather(*tasks)
+
+ async def _populate_fields(self, example: Example, client: GRPCClient):
+ """
+ Populate fields of the example reading them from the backend or from the repository.
+ Args:
+ example: beam example that should be verified
+ """
+ if example.tag.never_run:
+ logging.info("populating example fields from provided files %s", example.filepath)
+ self._populate_from_repo(example)
+ else:
+ await self._populate_from_runner(example, client)
+
+ def _populate_from_repo(self, example: Example):
+ """
+ Populate fields of the example reading them from the repository.
+ Args:
+ example: beam example that should be verified
+ """
+ path = Path(example.filepath)
+ example_folder = path.parent
+
+ log_file_path = example_folder / self.LOGS_FILENAME
+ # Check if the file exists and read its content
+ if log_file_path.exists():
+ example.logs = log_file_path.read_text()
+ graph_file_path = example_folder / self.GRAPH_FILENAME
+ # Check if the file exists and read its content
+ if graph_file_path.exists():
+ example.graph = graph_file_path.read_text()
+ output_file_path = example_folder / self.OUTPUT_FILENAME
+ # Check if the file exists and read its content
+ if output_file_path.exists():
+ example.output = output_file_path.read_text()
+ compile_output_file_path = example_folder / self.COMPILE_OUTPUT_FILENAME
+ # Check if the file exists and read its content
+ if compile_output_file_path.exists():
+ example.compile_output = compile_output_file_path.read_text()
+
+ async def _populate_from_runner(self, example: Example, client: GRPCClient):
+ try:
+ example.compile_output = await client.get_compile_output(
+ example.pipeline_id
+ )
+ example.output = await client.get_run_output(example.pipeline_id, example.filepath)
+ example.logs = await client.get_log(example.pipeline_id, example.filepath)
+ if example.sdk in [SDK_JAVA, SDK_PYTHON]:
+ example.graph = await client.get_graph(
+ example.pipeline_id, example.filepath
+ )
+ except Exception as e:
+ logging.error(example.url_vcs)
+ logging.error(example.compile_output)
+ raise RuntimeError(f"error in {example.tag.name}") from e
+
async def _run_and_verify(self, examples: List[Example]):
"""
Run beam examples and keep their output.
@@ -72,67 +167,10 @@ class Verifier:
examples: beam examples that should be run
"""
-
- async def _populate_fields(example: Example):
- """
- Populate fields of the example reading them from the backend or from the repository.
- Args:
- example: beam example that should be verified
- """
- if example.tag.never_run:
- logging.info("populating example fields from provided files %s", example.filepath)
- _populate_from_repo(example)
- else:
- await _populate_from_runner(example)
-
- def _populate_from_repo(example: Example):
- """
- Populate fields of the example reading them from the repository.
- Args:
- example: beam example that should be verified
- """
- path = Path(example.filepath)
- example_folder = path.parent
-
- log_file_path = example_folder / self.LOGS_FILENAME
- # Check if the file exists and read its content
- if log_file_path.exists():
- example.logs = log_file_path.read_text()
- graph_file_path = example_folder / self.GRAPH_FILENAME
- # Check if the file exists and read its content
- if graph_file_path.exists():
- example.graph = graph_file_path.read_text()
- output_file_path = example_folder / self.OUTPUT_FILENAME
- # Check if the file exists and read its content
- if output_file_path.exists():
- example.output = output_file_path.read_text()
- compile_output_file_path = example_folder / self.COMPILE_OUTPUT_FILENAME
- # Check if the file exists and read its content
- if compile_output_file_path.exists():
- example.compile_output = compile_output_file_path.read_text()
-
- async def _populate_from_runner(example: Example):
- try:
- example.compile_output = await client.get_compile_output(
- example.pipeline_id
- )
- example.output = await client.get_run_output(example.pipeline_id, example.filepath)
- example.logs = await client.get_log(example.pipeline_id, example.filepath)
- if example.sdk in [SDK_JAVA, SDK_PYTHON]:
- example.graph = await client.get_graph(
- example.pipeline_id, example.filepath
- )
- except Exception as e:
- logging.error(example.url_vcs)
- logging.error(example.compile_output)
- raise RuntimeError(f"error in {example.tag.name}") from e
-
async with GRPCClient() as client:
- await get_statuses(
+ await self._get_statuses(
client, examples
) # run examples code and wait until all are executed
- tasks = [_populate_fields(example) for example in examples]
- await asyncio.gather(*tasks)
await self._verify_examples(client, examples, self._origin)
async def _verify_examples(