You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/05/31 02:39:17 UTC

[beam] branch master updated: [Playground] Resolve CI/CD failures (#26927)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d86e5a13acc [Playground] Resolve CI/CD failures (#26927)
d86e5a13acc is described below

commit d86e5a13acc96b4b35c2966320df2e945f904a3e
Author: Timur Sultanov <ti...@akvelon.com>
AuthorDate: Wed May 31 06:39:11 2023 +0400

    [Playground] Resolve CI/CD failures (#26927)
    
    Fetch example execution results right after each example execution instead of waiting for all examples to finish execution
---
 playground/infrastructure/helper.py      |  38 +-------
 playground/infrastructure/test_helper.py |  15 +--
 playground/infrastructure/test_verify.py |  18 +++-
 playground/infrastructure/verify.py      | 156 +++++++++++++++++++------------
 4 files changed, 117 insertions(+), 110 deletions(-)

diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py
index 5d02b516413..f211650f7bb 100644
--- a/playground/infrastructure/helper.py
+++ b/playground/infrastructure/helper.py
@@ -25,7 +25,6 @@ from typing import List, Optional, Dict
 from api.v1 import api_pb2
 
 import pydantic
-from tqdm.asyncio import tqdm
 import yaml
 
 from api.v1.api_pb2 import (
@@ -124,41 +123,6 @@ def find_examples(root_dir: str, subdirs: List[str], sdk: SdkEnum) -> List[Examp
     return examples
 
 
-async def get_statuses(
-    client: GRPCClient, examples: List[Example], concurrency: int = 10
-):
-    """
-    Receive status and update example.status and example.pipeline_id for
-    each example
-
-    Args:
-        examples: beam examples for processing and updating statuses and
-        pipeline_id values.
-    """
-    tasks = []
-    try:
-        concurrency = int(os.environ["BEAM_CONCURRENCY"])
-        logging.info("override default concurrency: %d", concurrency)
-    except (KeyError, ValueError):
-        pass
-
-    semaphore = asyncio.Semaphore(concurrency)
-
-    async def _semaphored_task(example):
-        await semaphore.acquire()
-        try:
-            await _update_example_status(example, client)
-        finally:
-            semaphore.release()
-
-    for example in examples:
-        if example.tag.never_run:
-            logging.info("skipping non runnable example %s", example.filepath)
-        else:
-            tasks.append(_semaphored_task(example))
-    await tqdm.gather(*tasks)
-
-
 def get_tag(filepath: PurePath) -> Optional[Tag]:
     """
     Parse file by filepath and find beam tag
@@ -299,7 +263,7 @@ def _get_example(filepath: str, filename: str, tag: Tag, sdk: int) -> Example:
     )
 
 
-async def _update_example_status(example: Example, client: GRPCClient):
+async def update_example_status(example: Example, client: GRPCClient):
     """
     Receive status for examples and update example.status and pipeline_id
 
diff --git a/playground/infrastructure/test_helper.py b/playground/infrastructure/test_helper.py
index d9f4128a9bc..e9f6b2dbcc0 100644
--- a/playground/infrastructure/test_helper.py
+++ b/playground/infrastructure/test_helper.py
@@ -46,9 +46,8 @@ from helper import (
     _load_example,
     get_tag,
     Tag,
-    get_statuses,
     _check_no_nested,
-    _update_example_status,
+    update_example_status,
     _get_object_type,
     validate_examples_for_duplicates_by_name,
     validate_examples_for_conflicting_datasets,
@@ -118,16 +117,6 @@ def test_find_examples(
     )
 
 
-@pytest.mark.asyncio
-@mock.patch("helper._update_example_status")
-async def test_get_statuses(mock_update_example_status, create_test_example):
-    example = create_test_example()
-    client = mock.sentinel
-    await get_statuses(client, [example])
-
-    mock_update_example_status.assert_called_once_with(example, client)
-
-
 @mock.patch(
     "builtins.open",
     mock_open(
@@ -454,7 +443,7 @@ async def test__update_example_status(
     mock_grpc_client_run_code.return_value = "pipeline_id"
     mock_grpc_client_check_status.side_effect = [STATUS_VALIDATING, STATUS_FINISHED]
 
-    await _update_example_status(example, GRPCClient())
+    await update_example_status(example, GRPCClient())
 
     assert example.pipeline_id == "pipeline_id"
     assert example.status == STATUS_FINISHED
diff --git a/playground/infrastructure/test_verify.py b/playground/infrastructure/test_verify.py
index df887478a98..c1a8429e81b 100644
--- a/playground/infrastructure/test_verify.py
+++ b/playground/infrastructure/test_verify.py
@@ -15,6 +15,7 @@
 
 import mock
 import pytest
+from mock.mock import AsyncMock
 
 from api.v1.api_pb2 import (
     SDK_JAVA,
@@ -26,9 +27,10 @@ from api.v1.api_pb2 import (
     STATUS_COMPILE_ERROR,
     STATUS_RUN_ERROR,
 )
-from verify import Verifier, VerifyException
+
 from config import Origin
 from models import SdkEnum
+from verify import Verifier, VerifyException
 
 
 @pytest.mark.asyncio
@@ -71,3 +73,17 @@ async def test__verify_examples(create_test_example):
             client, examples_with_several_def_ex, Origin.PG_EXAMPLES
         )
     await verifier._verify_examples(client, examples_without_errors, Origin.PG_EXAMPLES)
+
+
+@pytest.mark.asyncio
+@mock.patch("verify.update_example_status")
+async def test_get_statuses(mock_update_example_status, create_test_example):
+    example = create_test_example()
+    client = mock.sentinel
+    verifier = Verifier(SdkEnum.JAVA, Origin.PG_EXAMPLES)
+
+    verifier._populate_fields = AsyncMock()
+
+    await verifier._get_statuses(client, [example])
+
+    mock_update_example_status.assert_called_once_with(example, client)
\ No newline at end of file
diff --git a/playground/infrastructure/verify.py b/playground/infrastructure/verify.py
index 59ac32d8643..78d3b71a2f2 100644
--- a/playground/infrastructure/verify.py
+++ b/playground/infrastructure/verify.py
@@ -15,9 +15,12 @@
 
 import asyncio
 import logging
+import os
 from pathlib import Path
 from typing import List
 
+from tqdm.asyncio import tqdm
+
 from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA
 from api.v1.api_pb2 import (
     STATUS_COMPILE_ERROR,
@@ -29,8 +32,8 @@ from api.v1.api_pb2 import (
 )
 from config import Origin, Config
 from grpc_client import GRPCClient
+from helper import update_example_status
 from models import Example, SdkEnum
-from helper import get_statuses
 
 
 class VerifyException(Exception):
@@ -61,6 +64,98 @@ class Verifier:
         asyncio.run(self._run_and_verify(examples))
         logging.info("Finish of executing Playground examples")
 
+    async def _get_statuses(
+            self,
+            client: GRPCClient,
+            examples: List[Example],
+            concurrency: int = 10
+    ):
+        """
+        Receive status and update example.status and example.pipeline_id for
+        each example
+
+        Args:
+            examples: beam examples for processing and updating statuses and
+            pipeline_id values.
+        """
+        tasks = []
+        try:
+            concurrency = int(os.environ["BEAM_CONCURRENCY"])
+            logging.info("override default concurrency: %d", concurrency)
+        except (KeyError, ValueError):
+            pass
+
+        semaphore = asyncio.Semaphore(concurrency)
+
+        async def _semaphored_task(example):
+            await semaphore.acquire()
+            try:
+                await update_example_status(example, client)
+                await self._populate_fields(example, client)
+            finally:
+                semaphore.release()
+
+        for example in examples:
+            if example.tag.never_run:
+                logging.info("skipping non runnable example %s", example.filepath)
+            else:
+                tasks.append(_semaphored_task(example))
+        await tqdm.gather(*tasks)
+
+    async def _populate_fields(self, example: Example, client: GRPCClient):
+        """
+        Populate fields of the example reading them from the backend or from the repository.
+        Args:
+            example: beam example that should be verified
+        """
+        if example.tag.never_run:
+            logging.info("populating example fields from provided files %s", example.filepath)
+            self._populate_from_repo(example)
+        else:
+            await self._populate_from_runner(example, client)
+
+    def _populate_from_repo(self, example: Example):
+        """
+        Populate fields of the example reading them from the repository.
+        Args:
+            example: beam example that should be verified
+        """
+        path = Path(example.filepath)
+        example_folder = path.parent
+
+        log_file_path = example_folder / self.LOGS_FILENAME
+        # Check if the file exists and read its content
+        if log_file_path.exists():
+            example.logs = log_file_path.read_text()
+        graph_file_path = example_folder / self.GRAPH_FILENAME
+        # Check if the file exists and read its content
+        if graph_file_path.exists():
+            example.graph = graph_file_path.read_text()
+        output_file_path = example_folder / self.OUTPUT_FILENAME
+        # Check if the file exists and read its content
+        if output_file_path.exists():
+            example.output = output_file_path.read_text()
+        compile_output_file_path = example_folder / self.COMPILE_OUTPUT_FILENAME
+        # Check if the file exists and read its content
+        if compile_output_file_path.exists():
+            example.compile_output = compile_output_file_path.read_text()
+
+    async def _populate_from_runner(self, example: Example, client: GRPCClient):
+        try:
+            example.compile_output = await client.get_compile_output(
+                example.pipeline_id
+            )
+            example.output = await client.get_run_output(example.pipeline_id, example.filepath)
+            example.logs = await client.get_log(example.pipeline_id, example.filepath)
+            if example.sdk in [SDK_JAVA, SDK_PYTHON]:
+                example.graph = await client.get_graph(
+                    example.pipeline_id, example.filepath
+                )
+        except Exception as e:
+            logging.error(example.url_vcs)
+            logging.error(example.compile_output)
+            raise RuntimeError(f"error in {example.tag.name}") from e
+
     async def _run_and_verify(self, examples: List[Example]):
         """
         Run beam examples and keep their output.
@@ -72,67 +167,10 @@ class Verifier:
             examples: beam examples that should be run
         """
 
-
-        async def _populate_fields(example: Example):
-            """
-            Populate fields of the example reading them from the backend or from the repository.
-            Args:
-                example: beam example that should be verified
-            """
-            if example.tag.never_run:
-                logging.info("populating example fields from provided files %s", example.filepath)
-                _populate_from_repo(example)
-            else:
-                await _populate_from_runner(example)
-
-        def _populate_from_repo(example: Example):
-            """
-            Populate fields of the example reading them from the repository.
-            Args:
-                example: beam example that should be verified
-            """
-            path = Path(example.filepath)
-            example_folder = path.parent
-
-            log_file_path = example_folder / self.LOGS_FILENAME
-            # Check if the file exists and read its content
-            if log_file_path.exists():
-                example.logs = log_file_path.read_text()
-            graph_file_path = example_folder / self.GRAPH_FILENAME
-            # Check if the file exists and read its content
-            if graph_file_path.exists():
-                example.graph = graph_file_path.read_text()
-            output_file_path = example_folder / self.OUTPUT_FILENAME
-            # Check if the file exists and read its content
-            if output_file_path.exists():
-                example.output = output_file_path.read_text()
-            compile_output_file_path = example_folder / self.COMPILE_OUTPUT_FILENAME
-            # Check if the file exists and read its content
-            if compile_output_file_path.exists():
-                example.compile_output = compile_output_file_path.read_text()
-
-        async def _populate_from_runner(example: Example):
-            try:
-                example.compile_output = await client.get_compile_output(
-                    example.pipeline_id
-                )
-                example.output = await client.get_run_output(example.pipeline_id, example.filepath)
-                example.logs = await client.get_log(example.pipeline_id, example.filepath)
-                if example.sdk in [SDK_JAVA, SDK_PYTHON]:
-                    example.graph = await client.get_graph(
-                        example.pipeline_id, example.filepath
-                    )
-            except Exception as e:
-                logging.error(example.url_vcs)
-                logging.error(example.compile_output)
-                raise RuntimeError(f"error in {example.tag.name}") from e
-
         async with GRPCClient() as client:
-            await get_statuses(
+            await self._get_statuses(
                 client, examples
             )  # run examples code and wait until all are executed
-            tasks = [_populate_fields(example) for example in examples]
-            await asyncio.gather(*tasks)
             await self._verify_examples(client, examples, self._origin)
 
     async def _verify_examples(