You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/12/19 19:22:36 UTC
[beam] branch master updated: [Playground] infrastructure ci/cd unify (#24696)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8d45bc530d2 [Playground] infrastructure ci/cd unify (#24696)
8d45bc530d2 is described below
commit 8d45bc530d2d97a097420b5b2a254db18f10f5a9
Author: Evgeny Antyshev <ea...@gmail.com>
AuthorDate: Mon Dec 19 22:22:25 2022 +0300
[Playground] infrastructure ci/cd unify (#24696)
* unnecessary typing
* fix
* requirements.txt
* black
* easy
* BASIC
* HttpUrl
* examples
* url_quote
* source_dataset
* fix
* fix
* fix
* fix
* cnt
* ci_cd
* verify
* datastore_fixes
* config
* kafka_fix
* -dup
---
.../apache/beam/examples/KafkaWordCountAvro.java | 14 +-
.../apache/beam/examples/KafkaWordCountJson.java | 14 +-
.../python/Examples/Word Count/Word Count/task.py | 2 +-
playground/infrastructure/cd_helper.py | 99 ----
playground/infrastructure/ci_cd.py | 80 ++-
playground/infrastructure/config.py | 35 +-
playground/infrastructure/conftest.py | 99 ++++
playground/infrastructure/datastore_client.py | 91 ++-
playground/infrastructure/grpc_client.py | 3 +-
playground/infrastructure/helper.py | 479 +++++-----------
playground/infrastructure/models.py | 166 ++++++
playground/infrastructure/repository.py | 36 --
playground/infrastructure/requirements.txt | 2 +-
playground/infrastructure/test_cd_helper.py | 38 --
playground/infrastructure/test_ci_cd.py | 35 +-
playground/infrastructure/test_ci_helper.py | 176 ------
playground/infrastructure/test_datastore_client.py | 50 +-
playground/infrastructure/test_helper.py | 630 +++++++++------------
playground/infrastructure/test_repository.py | 53 --
playground/infrastructure/test_utils.py | 30 +-
playground/infrastructure/test_verify.py | 73 +++
.../infrastructure/{ci_helper.py => verify.py} | 119 ++--
22 files changed, 988 insertions(+), 1336 deletions(-)
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
index c13a55e8557..81b9261a314 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
@@ -32,14 +32,14 @@ package org.apache.beam.examples;
// - strings
// - emulator
// emulators:
-// kafka:
-// topic:
-// id: dataset
-// dataset: CountWords
+// - type: kafka
+// topic:
+// id: dataset
+// source_dataset: CountWords
// datasets:
-// CountWords:
-// location: local
-// format: avro
+// CountWords:
+// location: local
+// format: avro
import java.util.Collections;
import java.util.HashMap;
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
index 29bb9be32c4..ed5dbc88824 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
@@ -32,14 +32,14 @@ package org.apache.beam.examples;
// - strings
// - emulator
// emulators:
-// kafka:
-// topic:
-// id: dataset
-// dataset: CountWords
+// - type: kafka
+// topic:
+// id: dataset
+// source_dataset: CountWords
// datasets:
-// CountWords:
-// location: local
-// format: json
+// CountWords:
+// location: local
+// format: json
import java.util.Collections;
import java.util.HashMap;
diff --git a/learning/katas/python/Examples/Word Count/Word Count/task.py b/learning/katas/python/Examples/Word Count/Word Count/task.py
index 4c605c401ef..fb3bf9c2386 100644
--- a/learning/katas/python/Examples/Word Count/Word Count/task.py
+++ b/learning/katas/python/Examples/Word Count/Word Count/task.py
@@ -18,7 +18,7 @@
# name: WordCountKata
# description: Task from katas to create a pipeline that counts the number of words.
# multifile: false
-# context_line: 29
+# context_line: 32
# categories:
# - Combiners
# complexity: BASIC
diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py
deleted file mode 100644
index bfec3d54e28..00000000000
--- a/playground/infrastructure/cd_helper.py
+++ /dev/null
@@ -1,99 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
- Helper for CD step.
-
- It is used to save beam examples/katas/tests and their output on the GCS.
-"""
-import asyncio
-import logging
-from typing import List
-
-from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA
-from config import Origin
-from datastore_client import DatastoreClient
-from grpc_client import GRPCClient
-from helper import Example, get_statuses
-from repository import set_dataset_path_for_examples
-
-
-class CDHelper:
- """
- Helper for CD step.
-
- It is used to save beam examples/katas/tests and their output on the GCD.
- """
- _sdk: Sdk
- _origin: Origin
-
- def __init__(self, sdk: Sdk, origin: Origin):
- self._sdk = sdk
- self._origin = origin
-
- def save_examples(self, examples: List[Example]):
- """
- Save beam examples and their output in the Google Cloud Datastore.
-
- Outputs for multifile examples are left empty.
- """
- single_file_examples = list(filter(
- lambda example: example.tag.multifile is False, examples))
- set_dataset_path_for_examples(single_file_examples)
- logging.info("Start of executing only single-file Playground examples ...")
- asyncio.run(self._get_outputs(single_file_examples))
- logging.info("Finish of executing single-file Playground examples")
-
- logging.info("Start of sending Playground examples to the Cloud Datastore ...")
- self._save_to_datastore(single_file_examples)
- logging.info("Finish of sending Playground examples to the Cloud Datastore")
-
- def _save_to_datastore(self, examples: List[Example]):
- """
- Save beam examples to the Google Cloud Datastore
- :param examples: beam examples from the repository
- """
- datastore_client = DatastoreClient()
- datastore_client.save_catalogs()
- datastore_client.save_to_cloud_datastore(examples, self._sdk, self._origin)
-
- async def _get_outputs(self, examples: List[Example]):
- """
- Run beam examples and keep their output.
-
- Call the backend to start code processing for the examples.
- Then receive code output.
-
- Args:
- examples: beam examples that should be run
- """
-
- async def _populate_fields(example: Example):
- try:
- example.compile_output = await client.get_compile_output(example.pipeline_id)
- example.output = await client.get_run_output(example.pipeline_id)
- example.logs = await client.get_log(example.pipeline_id)
- if example.sdk in [SDK_JAVA, SDK_PYTHON]:
- example.graph = await client.get_graph(example.pipeline_id, example.filepath)
- except Exception as e:
- logging.error(example.url_vcs)
- logging.error(example.compile_output)
- raise RuntimeError(f"error in {example.name}") from e
-
- async with GRPCClient() as client:
- await get_statuses(client,
- examples) # run examples code and wait until all are executed
- tasks = [_populate_fields(example) for example in examples]
- await asyncio.gather(*tasks)
diff --git a/playground/infrastructure/ci_cd.py b/playground/infrastructure/ci_cd.py
index 1dced6de964..35197ef95d3 100644
--- a/playground/infrastructure/ci_cd.py
+++ b/playground/infrastructure/ci_cd.py
@@ -22,93 +22,89 @@ import logging
import os
from typing import List
+from models import SdkEnum, Example, StringToSdkEnum
from config import Config, Origin
-from api.v1.api_pb2 import Sdk
-from cd_helper import CDHelper
-from ci_helper import CIHelper
-from helper import find_examples, get_supported_categories, Example, validate_examples_for_duplicates_by_name
+from datastore_client import DatastoreClient
+from api.v1 import api_pb2
+from verify import Verifier
+from helper import (
+ find_examples,
+ load_supported_categories,
+ validate_examples_for_duplicates_by_name,
+)
from logger import setup_logger
-parser = argparse.ArgumentParser(
- description="CI/CD Steps for Playground objects")
+parser = argparse.ArgumentParser(description="CI/CD Steps for Playground objects")
parser.add_argument(
"--step",
dest="step",
required=True,
help="CI step to verify all beam examples/tests/katas. CD step to save all "
- "beam examples/tests/katas and their outputs on the GCD",
- choices=[Config.CI_STEP_NAME, Config.CD_STEP_NAME])
+ "beam examples/tests/katas and their outputs on the GCD",
+ choices=[Config.CI_STEP_NAME, Config.CD_STEP_NAME],
+)
parser.add_argument(
"--sdk",
dest="sdk",
required=True,
help="Supported SDKs",
- choices=Config.SUPPORTED_SDK)
+ choices=Config.SUPPORTED_SDK,
+)
parser.add_argument(
"--origin",
type=Origin,
required=True,
help="ORIGIN field of pg_examples/pg_snippets",
- choices=[o.value for o in [Origin.PG_EXAMPLES, Origin.TB_EXAMPLES]])
+ choices=[o.value for o in [Origin.PG_EXAMPLES, Origin.TB_EXAMPLES]],
+)
parser.add_argument(
"--subdirs",
default=[],
nargs="+",
required=True,
- help="limit sub directories to walk through, relative to BEAM_ROOT_DIR")
+ help="limit sub directories to walk through, relative to BEAM_ROOT_DIR",
+)
root_dir = os.getenv("BEAM_ROOT_DIR")
categories_file = os.getenv("BEAM_EXAMPLE_CATEGORIES")
-def _ci_step(examples: List[Example], origin: Origin):
- """
- CI step to verify single-file beam examples/tests/katas
- """
-
- ci_helper = CIHelper()
- asyncio.run(ci_helper.verify_examples(examples, origin))
-
-
-def _cd_step(examples: List[Example], sdk: Sdk, origin: Origin):
- """
- CD step to save all beam examples/tests/katas and their outputs on the GCD
- """
- cd_helper = CDHelper(sdk, origin)
- cd_helper.save_examples(examples)
-
-
def _check_envs():
if root_dir is None:
- raise KeyError(
- "BEAM_ROOT_DIR environment variable should be specified in os")
+ raise KeyError("BEAM_ROOT_DIR environment variable should be specified in os")
if categories_file is None:
raise KeyError(
"BEAM_EXAMPLE_CATEGORIES environment variable should be specified in os"
)
-def _run_ci_cd(step: Config.CI_CD_LITERAL, sdk: Sdk, origin: Origin, subdirs: List[str]):
- supported_categories = get_supported_categories(categories_file)
+def _run_ci_cd(step: str, raw_sdk: str, origin: Origin, subdirs: List[str]):
+ sdk: SdkEnum = StringToSdkEnum(raw_sdk)
+
+ load_supported_categories(categories_file)
logging.info("Start of searching Playground examples ...")
- examples = find_examples(root_dir, subdirs, supported_categories, sdk)
+ examples = find_examples(root_dir, subdirs, sdk)
validate_examples_for_duplicates_by_name(examples)
logging.info("Finish of searching Playground examples")
logging.info("Number of found Playground examples: %s", len(examples))
- if step == Config.CI_STEP_NAME:
- logging.info(
- "Start of verification only single_file Playground examples ...")
- _ci_step(examples=examples, origin=origin)
- logging.info("Finish of verification single_file Playground examples")
+ examples = list(filter(lambda example: example.tag.multifile is False, examples))
+ logging.info("Number of sinlge-file Playground examples: %s", len(examples))
+
+ logging.info("Execute Playground examples ...")
+ runner = Verifier(sdk, origin)
+ runner.run_verify(examples)
+
if step == Config.CD_STEP_NAME:
- logging.info("Start of saving Playground examples ...")
- _cd_step(examples=examples, sdk=sdk, origin=origin)
- logging.info("Finish of saving Playground examples")
+ logging.info("Start of sending Playground examples to the Cloud Datastore ...")
+ datastore_client = DatastoreClient()
+ datastore_client.save_catalogs()
+ datastore_client.save_to_cloud_datastore(examples, sdk, origin)
+ logging.info("Finish of sending Playground examples to the Cloud Datastore")
if __name__ == "__main__":
parser = parser.parse_args()
_check_envs()
setup_logger()
- _run_ci_cd(parser.step, Sdk.Value(parser.sdk), parser.origin, parser.subdirs)
+ _run_ci_cd(parser.step, parser.sdk, parser.origin, parser.subdirs)
diff --git a/playground/infrastructure/config.py b/playground/infrastructure/config.py
index d11cc063e28..4cc5dbd4376 100644
--- a/playground/infrastructure/config.py
+++ b/playground/infrastructure/config.py
@@ -22,8 +22,6 @@ from dataclasses import dataclass
from enum import Enum
from typing import Literal
-from dataclasses_json import dataclass_json
-
from api.v1.api_pb2 import STATUS_VALIDATION_ERROR, STATUS_ERROR, \
STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR, \
STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR, SDK_JAVA, SDK_GO, SDK_PYTHON, \
@@ -57,7 +55,7 @@ class Config:
]
BEAM_PLAYGROUND_TITLE = "beam-playground:\n"
BEAM_PLAYGROUND = "beam-playground"
- PAUSE_DELAY = 10
+ PAUSE_DELAY = 1
CI_STEP_NAME = "CI"
CD_STEP_NAME = "CD"
CI_CD_LITERAL = Literal["CI", "CD"]
@@ -98,15 +96,6 @@ class PrecompiledExampleType:
test_ends = ("test", "it")
-@dataclass(frozen=True)
-class OptionalTagFields:
- pipeline_options: str = "pipeline_options"
- default_example: str = "default_example"
- emulators: str = "emulators"
- datasets: str = "datasets"
- url_notebook: str = "url_notebook"
-
-
@dataclass(frozen=True)
class DatastoreProps:
NAMESPACE = "Playground"
@@ -130,25 +119,3 @@ class Origin(str, Enum):
TB_EXAMPLES = 'TB_EXAMPLES'
TB_USER = 'TB_USER'
-
-@dataclass_json
-@dataclass
-class Dataset:
- format: str
- location: str
- name: str = ""
- path: str = ""
-
-
-@dataclass_json
-@dataclass
-class Topic:
- id: str
- dataset: str
-
-
-@dataclass_json
-@dataclass
-class Emulator:
- topic: Topic
- name: str = ""
diff --git a/playground/infrastructure/conftest.py b/playground/infrastructure/conftest.py
new file mode 100644
index 00000000000..3a776a4aac5
--- /dev/null
+++ b/playground/infrastructure/conftest.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os.path
+import pytest
+from typing import Optional, List, Dict, Any
+
+from models import Example, SdkEnum, Tag
+
+from helper import (
+ load_supported_categories,
+)
+
+
+@pytest.fixture(autouse=True, scope="session")
+def supported_categories():
+ load_supported_categories("../categories.yaml")
+
+
+@pytest.fixture(autouse=True)
+def mock_dataset_file_name(mocker):
+ def _mock_isfile(filepath):
+ if filepath in [
+ "../backend/datasets/dataset_id_1.json",
+ "../backend/datasets/dataset_id_1.avro",
+ ]:
+ return True
+ raise FileNotFoundError(filepath)
+ mocker.patch('os.path.isfile', side_effect=_mock_isfile)
+
+
+@pytest.fixture
+def create_test_example(create_test_tag):
+ def _create_test_example(
+ with_kafka=False,
+ tag_meta: Optional[Dict[str, Any]] = None, **example_meta
+ ) -> Example:
+ if tag_meta is None:
+ tag_meta = {}
+ meta: Dict[str, Any] = dict(
+ sdk=SdkEnum.JAVA,
+ pipeline_id="MOCK_PIPELINE_ID",
+ filepath="MOCK_FILEPATH",
+ code="MOCK_CODE",
+ output="MOCK_OUTPUT",
+ url_vcs="https://github.com/proj/MOCK_LINK",
+ context_line=132,
+ )
+ meta.update(**example_meta)
+ return Example(
+ tag=create_test_tag(with_kafka=with_kafka, **tag_meta),
+ **meta,
+ )
+
+ return _create_test_example
+
+
+@pytest.fixture
+def create_test_tag():
+ def _create_test_tag(with_kafka=False, **tag_meta) -> Tag:
+ meta = {
+ "name": "MOCK_NAME",
+ "description": "MOCK_DESCRIPTION",
+ "complexity": "ADVANCED",
+ "multifile": False,
+ "categories": ["Testing", "Schemas"],
+ "pipeline_options": "--MOCK_OPTION MOCK_OPTION_VALUE",
+ }
+ if with_kafka:
+ meta.update(
+ emulators=[
+ {"type": "kafka", "topic": {"id": "topic1", "source_dataset": "dataset_id_1"}}
+ ],
+ datasets={"dataset_id_1": {"format": "avro", "location": "local"}},
+ )
+ for k, v in tag_meta.items():
+ if v is None:
+ meta.pop(k, None)
+ else:
+ meta[k] = v
+ return Tag(
+ line_start=10,
+ line_finish=20,
+ context_line=30,
+ **meta,
+ )
+
+ return _create_test_tag
diff --git a/playground/infrastructure/datastore_client.py b/playground/infrastructure/datastore_client.py
index f46b5beeea0..77743b1299d 100644
--- a/playground/infrastructure/datastore_client.py
+++ b/playground/infrastructure/datastore_client.py
@@ -29,18 +29,13 @@ from tqdm import tqdm
import config
from config import Config, Origin, PrecompiledExample, DatastoreProps
-from helper import Example
+from models import Example, SdkEnum, Dataset, Emulator
-from api.v1.api_pb2 import Sdk, PrecompiledObjectType
+from api.v1 import api_pb2
class DatastoreException(Exception):
- def __init__(self, error: str):
- super().__init__()
- self.msg = error
-
- def __str__(self):
- return self.msg
+ pass
# Google Datastore documentation link: https://cloud.google.com/datastore/docs/concepts
@@ -64,7 +59,7 @@ class DatastoreClient:
raise KeyError("SDK_CONFIG environment variable should be specified in os")
def save_to_cloud_datastore(
- self, examples_from_rep: List[Example], sdk: Sdk, origin: Origin
+ self, examples_from_rep: List[Example], sdk: SdkEnum, origin: Origin
):
"""
Save examples, output and meta to datastore
@@ -86,8 +81,10 @@ class DatastoreClient:
# loop through every example to save them to the Cloud Datastore
for example in tqdm(examples_from_rep):
with self._datastore_client.transaction():
- sdk_key = self._get_key(DatastoreProps.SDK_KIND, Sdk.Name(example.sdk))
- example_id = self._make_example_id(origin, sdk, example.name)
+ sdk_key = self._get_key(
+ DatastoreProps.SDK_KIND, api_pb2.Sdk.Name(example.sdk)
+ )
+ example_id = self._make_example_id(origin, sdk, example.tag.name)
self._datastore_client.put(
self._to_example_entity(
@@ -104,18 +101,13 @@ class DatastoreClient:
)
# only single-file examples are supported by now
self._datastore_client.put(self._to_file_entity(example, example_id))
- if example.datasets and example.emulators:
- dataset = example.datasets[0]
- emulator = example.emulators[0]
- file_name = f"{dataset.name}.{dataset.format}"
- dataset = self._to_dataset_entity(file_name, dataset.path)
- self._datastore_client.put(dataset)
-
- dataset_nested_entity = self._to_dataset_nested_entity(
- file_name, example_id, emulator
+ if example.tag.datasets:
+ self._datastore_client.put_multi(
+ [
+ self._to_dataset_entity(dataset_id, dataset.file_name)
+ for dataset_id, dataset in example.tag.datasets.items()
+ ]
)
- snippet_datasets = [dataset_nested_entity]
- snippet.update({"datasets": snippet_datasets})
updated_example_ids.add(example_id)
@@ -195,13 +187,13 @@ class DatastoreClient:
schema_names.sort(reverse=True)
return self._get_key(DatastoreProps.SCHEMA_KIND, schema_names[0])
- def _get_all_examples(self, sdk: Sdk, origin: Origin) -> List[str]:
+ def _get_all_examples(self, sdk: SdkEnum, origin: Origin) -> List[str]:
examples_ids_before_updating = []
all_examples_query = self._datastore_client.query(
kind=DatastoreProps.EXAMPLE_KIND
)
all_examples_query.add_filter(
- "sdk", "=", self._get_key(DatastoreProps.SDK_KIND, Sdk.Name(sdk))
+ "sdk", "=", self._get_key(DatastoreProps.SDK_KIND, api_pb2.Sdk.Name(sdk))
)
all_examples_query.add_filter("origin", "=", origin)
all_examples_query.keys_only()
@@ -222,20 +214,20 @@ class DatastoreClient:
def _get_dataset_key(self, dataset_id: str):
return self._get_key(DatastoreProps.DATASET_KIND, dataset_id)
- def _make_example_id(self, origin: Origin, sdk: Sdk, name: str):
+ def _make_example_id(self, origin: Origin, sdk: SdkEnum, name: str):
# ToB examples (and other related entities: snippets, files, pc_objects)
# have origin prefix in a key
if origin == Origin.TB_EXAMPLES:
return config.DatastoreProps.KEY_NAME_DELIMITER.join(
[
origin,
- Sdk.Name(sdk),
+ api_pb2.Sdk.Name(sdk),
name,
]
)
return config.DatastoreProps.KEY_NAME_DELIMITER.join(
[
- Sdk.Name(sdk),
+ api_pb2.Sdk.Name(sdk),
name,
]
)
@@ -268,9 +260,11 @@ class DatastoreClient:
"origin": origin,
"numberOfFiles": 1,
"schVer": schema_key,
- "complexity": f"COMPLEXITY_{example.complexity}",
+ "complexity": f"COMPLEXITY_{example.tag.complexity}",
}
)
+ if example.tag.datasets:
+ snippet_entity.update({"datasets": self._snippet_datasets(example)})
return snippet_entity
def _get_pipeline_options(self, example: Example):
@@ -290,17 +284,17 @@ class DatastoreClient:
example_entity = datastore.Entity(self._get_example_key(example_id))
example_entity.update(
{
- "name": example.name,
+ "name": example.tag.name,
"sdk": sdk_key,
"descr": example.tag.description,
"tags": example.tag.tags,
"cats": example.tag.categories,
- "path": example.url_vcs, # keep for backward-compatibity, to be removed
- "type": PrecompiledObjectType.Name(example.type),
+ "path": example.url_vcs, # keep for backward-compatibity, to be removed
+ "type": api_pb2.PrecompiledObjectType.Name(example.type),
"origin": origin,
"schVer": schema_key,
"urlVCS": example.url_vcs,
- "urlNotebook": example.url_notebook,
+ "urlNotebook": example.tag.url_notebook,
}
)
return example_entity
@@ -349,7 +343,9 @@ class DatastoreClient:
)
file_entity.update(
{
- "name": self._get_file_name_with_extension(example.name, example.sdk),
+ "name": self._get_file_name_with_extension(
+ example.tag.name, example.sdk
+ ),
"content": example.code,
"cntxLine": example.tag.context_line,
"isMain": True,
@@ -357,29 +353,32 @@ class DatastoreClient:
)
return file_entity
- def _to_dataset_entity(self, dataset_id: str, path: str):
+ def _to_dataset_entity(self, dataset_id: str, file_name: str):
dataset_entity = datastore.Entity(self._get_dataset_key(dataset_id))
- dataset_entity.update({"path": path})
+ dataset_entity.update({"path": file_name})
return dataset_entity
- def _to_dataset_nested_entity(
- self, dataset_id: str, example_id: str, emulator: config.Emulator
- ):
- emulator_config_as_dict = {"topic": emulator.topic.id}
- emulator_config_as_json = json.dumps(emulator_config_as_dict)
- nested_entity = datastore.Entity(
- self._get_snippet_key(example_id), exclude_from_indexes=("config",)
- )
+ def _to_dataset_nested_entity(self, dataset_id: str, emulator: Emulator):
+ nested_entity = datastore.Entity()
nested_entity.update(
{
"dataset": self._get_dataset_key(dataset_id),
- "emulator": emulator.name,
- "config": emulator_config_as_json,
+ "emulator": emulator.type,
+ "config": json.dumps({"topic": emulator.topic.id})
}
)
return nested_entity
- def _get_file_name_with_extension(self, name: str, sdk: Sdk) -> str:
+ def _snippet_datasets(self, example: Example) -> List[datastore.Entity]:
+ datasets = []
+ for emulator in example.tag.emulators:
+ dataset_nested_entity = self._to_dataset_nested_entity(
+ emulator.topic.source_dataset, emulator
+ )
+ datasets.append(dataset_nested_entity)
+ return datasets
+
+ def _get_file_name_with_extension(self, name: str, sdk: int) -> str:
filename, file_extension = os.path.splitext(name)
if len(file_extension) == 0:
extension = Config.SDK_TO_EXTENSION[sdk]
diff --git a/playground/infrastructure/grpc_client.py b/playground/infrastructure/grpc_client.py
index 4f835d12071..b78b94beeb0 100644
--- a/playground/infrastructure/grpc_client.py
+++ b/playground/infrastructure/grpc_client.py
@@ -26,6 +26,7 @@ import sonora.aio
from api.v1 import api_pb2_grpc, api_pb2
from config import Config
+from models import SdkEnum
class GRPCClient:
@@ -50,7 +51,7 @@ class GRPCClient:
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._channel.__aexit__(exc_type, exc_val, exc_tb)
- async def run_code(self, code: str, sdk: api_pb2.Sdk, pipeline_options: str, datasets: List[api_pb2.Dataset]) -> str:
+ async def run_code(self, code: str, sdk: SdkEnum, pipeline_options: str, datasets: List[api_pb2.Dataset]) -> str:
"""
Run example by his code and SDK
diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py
index e41c3d960d0..d3941f3083e 100644
--- a/playground/infrastructure/helper.py
+++ b/playground/infrastructure/helper.py
@@ -19,76 +19,32 @@ Common helper module for CI/CD Steps
import asyncio
import logging
import os
-from collections import namedtuple
-from dataclasses import dataclass, fields, field
-import urllib3
+import urllib.parse
from pathlib import PurePath
from typing import List, Optional, Dict
from api.v1 import api_pb2
from tqdm.asyncio import tqdm
import yaml
-from yaml import YAMLError
-
-from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, Sdk, \
- STATUS_VALIDATING, STATUS_PREPARING, \
- STATUS_COMPILING, STATUS_EXECUTING, PRECOMPILED_OBJECT_TYPE_UNIT_TEST, \
- PRECOMPILED_OBJECT_TYPE_KATA, PRECOMPILED_OBJECT_TYPE_UNSPECIFIED, \
- PRECOMPILED_OBJECT_TYPE_EXAMPLE, PrecompiledObjectType
-from config import Config, TagFields, PrecompiledExampleType, OptionalTagFields, Dataset, Emulator
+
+from api.v1.api_pb2 import (
+ SDK_UNSPECIFIED,
+ STATUS_UNSPECIFIED,
+ Sdk,
+ STATUS_VALIDATING,
+ STATUS_PREPARING,
+ STATUS_COMPILING,
+ STATUS_EXECUTING,
+ PRECOMPILED_OBJECT_TYPE_UNIT_TEST,
+ PRECOMPILED_OBJECT_TYPE_KATA,
+ PRECOMPILED_OBJECT_TYPE_UNSPECIFIED,
+ PRECOMPILED_OBJECT_TYPE_EXAMPLE,
+ PrecompiledObjectType,
+)
+from config import Config, TagFields, PrecompiledExampleType
from grpc_client import GRPCClient
-# TODO replace with dataclass
-Tag = namedtuple(
- "Tag",
- [
- TagFields.name,
- TagFields.complexity,
- TagFields.emulators,
- TagFields.datasets,
- TagFields.description,
- TagFields.multifile,
- TagFields.categories,
- TagFields.pipeline_options,
- TagFields.default_example,
- TagFields.context_line,
- TagFields.tags,
- TagFields.url_notebook,
- ],
- defaults=(None, None, None, None, None, False, None, None, False, None, None, None))
-
-
-@dataclass
-class Example:
- """
- Class which contains all information about beam example
- """
- name: str
- complexity: str
- sdk: SDK_UNSPECIFIED
- filepath: str
- code: str
- status: STATUS_UNSPECIFIED
- tag: Tag
- url_vcs: str
- url_notebook: Optional[str] = None
- logs: str = ""
- type: PrecompiledObjectType = PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
- pipeline_id: str = ""
- output: str = ""
- compile_output: str = ""
- graph: str = ""
- datasets: List[Dataset] = field(default_factory=list)
- emulators: List[Emulator] = field(default_factory=list)
-
-
-@dataclass
-class ExampleTag:
- """
- Class which contains all information about beam playground tag
- """
- tag_as_dict: Dict[str, str]
- tag_as_string: str
+from models import Example, Tag, SdkEnum, Dataset
def _check_no_nested(subdirs: List[str]):
@@ -104,8 +60,7 @@ def _check_no_nested(subdirs: List[str]):
raise ValueError(f"{dir2} is a subdirectory of {dir1}")
-def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[str],
- sdk: Sdk) -> List[Example]:
+def find_examples(root_dir: str, subdirs: List[str], sdk: SdkEnum) -> List[Example]:
"""
Find and return beam examples.
@@ -128,13 +83,12 @@ def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[
Args:
root_dir: project root dir
subdirs: sub-directories where to search examples.
- supported_categories: list of supported categories.
sdk: sdk that using to find examples for the specific sdk.
Returns:
List of Examples.
"""
- has_error = False
+ has_errors = False
examples = []
_check_no_nested(subdirs)
for subdir in subdirs:
@@ -143,21 +97,26 @@ def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[
for root, _, files in os.walk(subdir):
for filename in files:
filepath = os.path.join(root, filename)
- error_during_check_file = _check_file(
- examples=examples,
- filename=filename,
- filepath=filepath,
- supported_categories=supported_categories,
- sdk=sdk)
- has_error = has_error or error_during_check_file
- if has_error:
+ try:
+ example = _load_example(
+ filename=filename, filepath=filepath, sdk=sdk
+ )
+ if example is not None:
+ examples.append(example)
+ except Exception:
+ logging.exception("error loading example at %s", filepath)
+ has_errors = True
+ if has_errors:
raise ValueError(
"Some of the beam examples contain beam playground tag with "
- "an incorrect format")
+ "an incorrect format"
+ )
return examples
-async def get_statuses(client: GRPCClient, examples: List[Example], concurrency: int = 10):
+async def get_statuses(
+ client: GRPCClient, examples: List[Example], concurrency: int = 10
+):
"""
Receive status and update example.status and example.pipeline_id for
each example
@@ -187,7 +146,7 @@ async def get_statuses(client: GRPCClient, examples: List[Example], concurrency:
await tqdm.gather(*tasks)
-def get_tag(filepath) -> Optional[ExampleTag]:
+def get_tag(filepath) -> Optional[Tag]:
"""
Parse file by filepath and find beam tag
@@ -195,97 +154,102 @@ def get_tag(filepath) -> Optional[ExampleTag]:
filepath: path of the file
Returns:
- If file contains tag, returns tag as a map.
+ If file contains tag, returns Tag object
If file doesn't contain tag, returns None
"""
- add_to_yaml = False
- yaml_string = ""
- tag_string = ""
-
with open(filepath, encoding="utf-8") as parsed_file:
lines = parsed_file.readlines()
- for line in lines:
- formatted_line = line.replace("//", "").replace("#",
- "").replace("\t", " ")
- if add_to_yaml is False:
- if formatted_line.lstrip() == Config.BEAM_PLAYGROUND_TITLE:
- add_to_yaml = True
- yaml_string += formatted_line.lstrip()
- tag_string += line
- else:
- yaml_with_new_string = yaml_string + formatted_line
- try:
- yaml.load(yaml_with_new_string, Loader=yaml.SafeLoader)
- yaml_string += formatted_line
- tag_string += line
- except YAMLError:
- break
-
- if add_to_yaml:
- tag_object = yaml.load(yaml_string, Loader=yaml.SafeLoader)
- return ExampleTag(tag_object[Config.BEAM_PLAYGROUND], tag_string)
+ line_start: Optional[int] = None
+ line_finish: Optional[int] = None
+ tag_prefix: Optional[str] = ""
+ for idx, line in enumerate(lines):
+ if line_start is None and line.endswith(Config.BEAM_PLAYGROUND_TITLE):
+ line_start = idx
+ prefix_len = len(line) - len(Config.BEAM_PLAYGROUND_TITLE)
+ tag_prefix = line[:prefix_len]
+ elif line_start and not line.startswith(tag_prefix):
+ line_finish = idx
+ break
- return None
+ if not line_start or not line_finish:
+ return None
+ embdedded_yaml_content = "".join(
+ line[len(tag_prefix) :] for line in lines[line_start:line_finish]
+ )
+ yml = yaml.load(embdedded_yaml_content, Loader=yaml.SafeLoader)
+ return Tag(
+ line_start=line_start, line_finish=line_finish, **yml[Config.BEAM_PLAYGROUND]
+ )
-def _check_file(examples, filename, filepath, supported_categories, sdk: Sdk):
+
+def _load_example(filename, filepath, sdk: SdkEnum) -> Optional[Example]:
"""
Check file by filepath for matching to beam example. If file is beam example,
- then add it to list of examples
Args:
- examples: list of examples.
filename: name of the file.
filepath: path to the file.
- supported_categories: list of supported categories.
sdk: sdk that using to find examples for the specific sdk.
Returns:
- True if file has beam playground tag with incorrect format.
- False if file has correct beam playground tag.
- False if file doesn't contains beam playground tag.
+ If the file is an example, return Example object
+ If it's not, return None
+ In case of error, raise Exception
"""
- if filepath.endswith("infrastructure/helper.py"):
- return False
-
- has_error = False
+ logging.debug("inspecting file %s", filepath)
extension = filepath.split(os.extsep)[-1]
if extension == Config.SDK_TO_EXTENSION[sdk]:
+ logging.debug("sdk %s matched extension %s", api_pb2.Sdk.Name(sdk), extension)
tag = get_tag(filepath)
if tag is not None:
- if _validate(tag.tag_as_dict, supported_categories) is False:
- logging.error(
- "%s contains beam playground tag with incorrect format", filepath)
- has_error = True
- else:
- examples.append(_get_example(filepath, filename, tag))
- return has_error
+ logging.debug("playground-beam tag found")
+ return _get_example(filepath, filename, tag, sdk)
+ return None
-def get_supported_categories(categories_path: str) -> List[str]:
+# Make load_supported_categories called only once
+# to make testing easier
+_load_supported_categories = False
+
+
+def load_supported_categories(categories_path: str):
"""
- Return list of supported categories from categories_path file
+ Load the list of supported categories from categories_path file
+ into Tag model config
Args:
categories_path: path to the file with categories.
-
- Returns:
- All supported categories as a list.
"""
+ global _load_supported_categories
+ if _load_supported_categories:
+ return
with open(categories_path, encoding="utf-8") as supported_categories:
yaml_object = yaml.load(supported_categories.read(), Loader=yaml.SafeLoader)
- return yaml_object[TagFields.categories]
-def _get_url_vcs(filepath: str):
+ Tag.Config.supported_categories = yaml_object[TagFields.categories]
+ _load_supported_categories = True
+
+
+def _get_content(filepath: str, tag_start_line: int, tag_finish_line) -> str:
+ with open(filepath, encoding="utf-8") as parsed_file:
+ lines = parsed_file.readlines()
+ lines = lines[:tag_start_line] + lines[tag_finish_line:]
+ return "".join(lines)
+
+
+def _get_url_vcs(filepath: str) -> str:
"""
Construct VCS URL from example's filepath
"""
root_dir = os.getenv("BEAM_ROOT_DIR", "../..")
rel_path = os.path.relpath(filepath, root_dir)
- return "{}/{}".format(Config.URL_VCS_PREFIX, rel_path)
+ url_vcs = "{}/{}".format(Config.URL_VCS_PREFIX, urllib.parse.quote(rel_path))
+ return url_vcs
+
-def _get_example(filepath: str, filename: str, tag: ExampleTag) -> Example:
+def _get_example(filepath: str, filename: str, tag: Tag, sdk: int) -> Example:
"""
Return an Example by filepath and filename.
@@ -297,153 +261,16 @@ def _get_example(filepath: str, filename: str, tag: ExampleTag) -> Example:
Returns:
Parsed Example object.
"""
- name = tag.tag_as_dict[TagFields.name]
- complexity = tag.tag_as_dict[TagFields.complexity]
- url_notebook = tag.tag_as_dict.get(TagFields.url_notebook)
- sdk = Config.EXTENSION_TO_SDK[filename.split(os.extsep)[-1]]
- object_type = _get_object_type(filename, filepath)
- with open(filepath, encoding="utf-8") as parsed_file:
- content = parsed_file.read()
- content = content.replace(tag.tag_as_string, "")
- tag.tag_as_dict[TagFields.context_line] -= tag.tag_as_string.count("\n")
-
- example = Example(
- name=name,
- complexity=complexity,
- sdk=sdk,
+ return Example(
+ sdk=SdkEnum(sdk),
+ tag=tag,
filepath=filepath,
- code=content,
status=STATUS_UNSPECIFIED,
- tag=Tag(**tag.tag_as_dict),
- type=object_type,
- url_vcs=_get_url_vcs(filepath),
- url_notebook=url_notebook,
- )
-
- if tag.tag_as_dict.get(TagFields.datasets):
- datasets_as_dict = tag.tag_as_dict[TagFields.datasets]
- datasets = []
- for key in datasets_as_dict:
- dataset = Dataset.from_dict(datasets_as_dict.get(key))
- dataset.name = key
- datasets.append(dataset)
- example.datasets = datasets
-
- if tag.tag_as_dict.get(TagFields.emulators):
- emulators_as_dict = tag.tag_as_dict[TagFields.emulators]
- emulators = []
- for key in emulators_as_dict:
- emulator = Emulator.from_dict(emulators_as_dict.get(key))
- emulator.name = key
- emulators.append(emulator)
- example.emulators = emulators
-
- validate_example_fields(example)
- return example
-
-
-def _validate(tag: dict, supported_categories: List[str]) -> bool:
- """
- Validate all tag's fields
-
- Validate that tag contains all required fields and all fields have required
- format.
-
- Args:
- tag: beam tag to validate.
- supported_categories: list of supported categories.
-
- Returns:
- In case tag is valid, True
- In case tag is not valid, False
- """
- valid = True
- required_tag_fields = {
- f.default
- for f in fields(TagFields)
- if f.default not in {o_f.default
- for o_f in fields(OptionalTagFields)}
- }
- # check that all fields exist and they have no empty value
- for field in required_tag_fields:
- if field not in tag:
- logging.error(
- "tag doesn't contain %s field: %s \n"
- "Please, check that this field exists in the beam playground tag."
- "If you are sure that this field exists in the tag"
- " check the format of indenting.",
- field,
- tag)
- valid = False
- if valid is True:
- value = tag.get(field)
- if (value == "" or value is None) and field != TagFields.pipeline_options:
- logging.error(
- "tag's value is incorrect: %s\n%s field can not be empty.",
- tag,
- field)
- valid = False
-
- if valid is False:
- return valid
-
- # check that multifile's value is boolean
- multifile = tag.get(TagFields.multifile)
- if str(multifile).lower() not in ["true", "false"]:
- logging.error(
- "tag's field multifile is incorrect: %s \n"
- "multifile variable should be boolean format, but tag contains: %s",
- tag,
- multifile)
- valid = False
-
- # check that categories' value is a list of supported categories
- categories = tag.get(TagFields.categories)
- if not isinstance(categories, list):
- logging.error(
- "tag's field categories is incorrect: %s \n"
- "categories variable should be list format, but tag contains: %s",
- tag,
- type(categories))
- valid = False
- else:
- for category in categories:
- if category not in supported_categories:
- logging.error(
- "tag contains unsupported category: %s \n"
- "If you are sure that %s category should be placed in "
- "Beam Playground, you can add it to the "
- "`playground/categories.yaml` file",
- category,
- category)
- valid = False
-
- # check that context line's value is integer
- context_line = tag.get(TagFields.context_line)
- if not isinstance(context_line, int):
- logging.error(
- "Tag's field context_line is incorrect: %s \n"
- "context_line variable should be integer format, "
- "but tag contains: %s",
- tag,
- context_line)
- valid = False
- return valid
-
-
-def _get_name(filename: str) -> str:
- """
- Return name of the example by his filepath.
-
- Get name of the example by his filename.
-
- Args:
- filename: filename of the beam example file.
-
- Returns:
- example's name.
- """
- return filename.split(os.extsep)[0]
+ type=_get_object_type(filename, filepath),
+ code=_get_content(filepath, tag.line_start, tag.line_finish),
+ url_vcs=_get_url_vcs(filepath), # type: ignore
+ context_line=tag.context_line - (tag.line_finish - tag.line_start),
+ )
async def _update_example_status(example: Example, client: GRPCClient):
@@ -460,28 +287,31 @@ async def _update_example_status(example: Example, client: GRPCClient):
example: beam example for processing and updating status and pipeline_id.
client: client to send requests to the server.
"""
- datasets = []
- if example.datasets and example.emulators:
- dataset_tag = example.datasets[0]
- emulator_tag = example.emulators[0]
- options = {
- "topic": emulator_tag.topic.id
- }
- dataset = api_pb2.Dataset(
- type=api_pb2.EmulatorType.Value(f"EMULATOR_TYPE_{emulator_tag.name.upper()}"),
- options=options,
- dataset_path=dataset_tag.path
+ datasets: List[api_pb2.Dataset] = []
+ for emulator in example.tag.emulators:
+ dataset: Dataset = example.tag.datasets[emulator.topic.source_dataset]
+
+ datasets.append(
+ api_pb2.Dataset(
+ type=api_pb2.EmulatorType.Value(
+ f"EMULATOR_TYPE_{emulator.type.upper()}"
+ ),
+ options={"topic": emulator.topic.id},
+ dataset_path=dataset.file_name,
+ )
)
- datasets.append(dataset)
pipeline_id = await client.run_code(
- example.code, example.sdk, example.tag.pipeline_options, datasets)
+ example.code, example.sdk, example.tag.pipeline_options, datasets
+ )
example.pipeline_id = pipeline_id
status = await client.check_status(pipeline_id)
- while status in [STATUS_VALIDATING,
- STATUS_PREPARING,
- STATUS_COMPILING,
- STATUS_EXECUTING]:
+ while status in [
+ STATUS_VALIDATING,
+ STATUS_PREPARING,
+ STATUS_COMPILING,
+ STATUS_EXECUTING,
+ ]:
await asyncio.sleep(Config.PAUSE_DELAY)
status = await client.check_status(pipeline_id)
example.status = status
@@ -509,67 +339,20 @@ def _get_object_type(filename, filepath):
return object_type
+class DuplicatesError(Exception):
+ pass
+
+
def validate_examples_for_duplicates_by_name(examples: List[Example]):
"""
Validate examples for duplicates by example name to avoid duplicates in the Cloud Datastore
:param examples: examples from the repository for saving to the Cloud Datastore
"""
- duplicates = {str: Example}
+ duplicates: Dict[str, Example] = {}
for example in examples:
- if example.name not in duplicates.keys():
- duplicates[example.name] = example
+ if example.tag.name not in duplicates.keys():
+ duplicates[example.tag.name] = example
else:
- err_msg = f"Examples have duplicate names.\nDuplicates: \n - path #1: {duplicates[example.name].filepath} \n - path #2: {example.filepath}"
+ err_msg = f"Examples have duplicate names.\nDuplicates: \n - path #1: {duplicates[example.tag.name].filepath} \n - path #2: {example.filepath}"
logging.error(err_msg)
- raise ValidationException(err_msg)
-
-
-def validate_example_fields(example: Example):
- """
- Validate example fields to avoid side effects in the next step
- :param example: example from the repository
- """
- if example.filepath == "":
- _log_and_raise_validation_err(f"Example doesn't have a file path field. Example: {example}")
- if example.name == "":
- _log_and_raise_validation_err(f"Example doesn't have a name field. Path: {example.filepath}")
- if example.sdk == SDK_UNSPECIFIED:
- _log_and_raise_validation_err(f"Example doesn't have a sdk field. Path: {example.filepath}")
- if example.code == "":
- _log_and_raise_validation_err(f"Example doesn't have a code field. Path: {example.filepath}")
- if example.url_vcs == "":
- _log_and_raise_validation_err(f"Example doesn't have a url_vcs field. Path: {example.filepath}")
- if example.complexity == "":
- _log_and_raise_validation_err(f"Example doesn't have a complexity field. Path: {example.filepath}")
- datasets = example.datasets
- emulators = example.emulators
-
- if datasets and not emulators:
- _log_and_raise_validation_err(f"Example has a datasets field but an emulators field not found. Path: {example.filepath}")
- if emulators and not datasets:
- _log_and_raise_validation_err(f"Example has an emulators field but a datasets field not found. Path: {example.filepath}")
-
- dataset_names = []
- for dataset in datasets:
- location = dataset.location
- dataset_format = dataset.format
- if not location or not dataset_format or location not in ["local"] or dataset_format not in ["json", "avro"]:
- _log_and_raise_validation_err(f"Example has invalid dataset value. Path: {example.filepath}")
- dataset_names.append(dataset.name)
- for emulator in emulators:
- if not (emulator.name == "kafka" and emulator.topic.dataset in dataset_names):
- _log_and_raise_validation_err(f"Example has invalid emulator value. Path: {example.filepath}")
-
-
-def _log_and_raise_validation_err(msg: str):
- logging.error(msg)
- raise ValidationException(msg)
-
-
-class ValidationException(Exception):
- def __init__(self, error: str):
- super().__init__()
- self.msg = error
-
- def __str__(self):
- return self.msg
+ raise DuplicatesError(err_msg)
diff --git a/playground/infrastructure/models.py b/playground/infrastructure/models.py
new file mode 100644
index 00000000000..5479883b83f
--- /dev/null
+++ b/playground/infrastructure/models.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import os.path
+
+from enum import Enum, IntEnum
+from typing import List, Optional, Dict
+from api.v1 import api_pb2
+
+from pydantic import (
+ BaseModel,
+ Extra,
+ Field,
+ validator,
+ root_validator,
+ HttpUrl
+)
+
+from config import RepoProps
+
+class ComplexityEnum(str, Enum):
+ BASIC = "BASIC"
+ MEDIUM = "MEDIUM"
+ ADVANCED = "ADVANCED"
+
+
+class DatasetFormat(str, Enum):
+ JSON = "json"
+ AVRO = "avro"
+
+
+class DatasetLocation(str, Enum):
+ LOCAL = "local"
+ GCS = "GCS"
+
+
+class Dataset(BaseModel):
+ format: DatasetFormat
+ location: DatasetLocation
+ file_name: str = ""
+
+
+class Topic(BaseModel):
+ id: str
+ source_dataset: str
+
+
+class EmulatorType(str, Enum):
+ KAFKA = "kafka"
+
+
+class Emulator(BaseModel):
+ type: EmulatorType
+ topic: Topic
+
+
+class Tag(BaseModel):
+ """
+ Tag represents the beam-playground embedded yaml content
+ """
+ line_start: int
+ line_finish: int
+ context_line: int
+ name: str = Field(..., min_length=1)
+ complexity: ComplexityEnum
+ description: str
+ categories: List[str]
+ pipeline_options: str = ""
+ datasets: Dict[str, Dataset] = {}
+ emulators: List[Emulator] = []
+ multifile: bool = False
+ default_example: bool = False
+ tags: List[str] = []
+ url_notebook: Optional[HttpUrl] = None
+
+ class Config:
+ supported_categories = []
+ extra = Extra.forbid
+
+ @root_validator(skip_on_failure=True)
+ def lines_order(cls, values):
+ assert (
+ 0 < values["line_start"] < values["line_finish"] <= values["context_line"]
+ ), f"line ordering error: {values}"
+ return values
+
+ @root_validator(skip_on_failure=True)
+ def datasets_with_emulators(csl, values):
+ if values.get("datasets") and not values.get("emulators"):
+ raise ValueError("datasets w/o emulators")
+ return values
+
+ @validator("emulators", each_item=True)
+ def dataset_defined(cls, v, values, **kwargs):
+ if "datasets" not in values:
+ raise ValueError("datasets not defined")
+ for dataset_id in values["datasets"]:
+ if dataset_id == v.topic.source_dataset:
+ return v
+ raise ValueError(
+ f"Emulator topic {v.topic.id} has undefined dataset {v.topic.source_dataset}"
+ )
+
+ @validator('datasets')
+ def dataset_file_name(cls, datasets):
+ for dataset_id, dataset in datasets.items():
+ dataset.file_name = f"{dataset_id}.{dataset.format}"
+ if dataset.location == DatasetLocation.LOCAL:
+ dataset_path = os.path.join(RepoProps.REPO_DATASETS_PATH, dataset.file_name)
+ if not os.path.isfile(dataset_path):
+ logging.error("File not found at the specified path: %s", dataset_path)
+ raise FileNotFoundError
+ return datasets
+
+
+ @validator("categories", each_item=True)
+ def category_supported(cls, v, values, config, **kwargs):
+ if v not in config.supported_categories:
+ raise ValueError(f"Category {v} not in {config.supported_categories}")
+ return v
+
+
+
+class SdkEnum(IntEnum):
+ JAVA = api_pb2.SDK_JAVA
+ GO = api_pb2.SDK_GO
+ PYTHON = api_pb2.SDK_PYTHON
+ SCIO = api_pb2.SDK_SCIO
+
+
+def StringToSdkEnum(s: str) -> SdkEnum:
+ parsed: int = api_pb2.Sdk.Value(s)
+ return SdkEnum(parsed)
+
+
+class Example(BaseModel):
+ """
+ Class which contains all information about beam example
+ """
+
+ sdk: SdkEnum
+ tag: Tag
+ filepath: str = Field(..., min_length=1)
+ code: str = Field(..., min_length=1)
+ url_vcs: HttpUrl
+ type: int = api_pb2.PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
+ context_line: int = Field(..., gt=0)
+
+ status: int = api_pb2.STATUS_UNSPECIFIED
+ logs: str = ""
+ pipeline_id: str = ""
+ output: str = ""
+ compile_output: str = ""
+ graph: str = ""
diff --git a/playground/infrastructure/repository.py b/playground/infrastructure/repository.py
deleted file mode 100644
index 639f202de47..00000000000
--- a/playground/infrastructure/repository.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Module contains methods to work with repository
-"""
-import logging
-import os
-from typing import List
-
-from config import RepoProps
-
-from helper import Example
-
-
-def set_dataset_path_for_examples(examples: List[Example]):
- for example in examples:
- for dataset in example.datasets:
- file_name = f"{dataset.name}.{dataset.format}"
- dataset_path = os.path.join(RepoProps.REPO_DATASETS_PATH, file_name)
- if not os.path.isfile(dataset_path):
- logging.error("File not found at the specified path: %s", dataset_path)
- raise FileNotFoundError
- dataset.path = file_name
diff --git a/playground/infrastructure/requirements.txt b/playground/infrastructure/requirements.txt
index 96f5ac3a7b3..16afc877c7f 100644
--- a/playground/infrastructure/requirements.txt
+++ b/playground/infrastructure/requirements.txt
@@ -26,4 +26,4 @@ PyYAML==6.0
tqdm~=4.62.3
google-cloud-datastore==2.7.1
sonora==0.2.2
-dataclasses-json==0.5.7
+pydantic==1.10.2
diff --git a/playground/infrastructure/test_cd_helper.py b/playground/infrastructure/test_cd_helper.py
deleted file mode 100644
index 74893c1c477..00000000000
--- a/playground/infrastructure/test_cd_helper.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import mock
-from config import Origin
-
-import test_utils
-from api.v1.api_pb2 import SDK_JAVA
-from cd_helper import CDHelper
-
-"""
-Unit tests for the CD helper
-"""
-
-
-class TestCDHelper(unittest.TestCase):
-
- @mock.patch("cd_helper.CDHelper._save_to_datastore")
- @mock.patch("cd_helper.CDHelper._get_outputs")
- def test_save_examples(self, mock_get_outputs, mock_save_to_datastore):
- examples = test_utils._get_examples(1)
- helper = CDHelper(SDK_JAVA, Origin.PG_EXAMPLES)
- helper.save_examples(examples)
- mock_get_outputs.assert_called_once()
- mock_save_to_datastore.assert_called_once_with(examples)
diff --git a/playground/infrastructure/test_ci_cd.py b/playground/infrastructure/test_ci_cd.py
index f42699e1927..5f5b880ba02 100644
--- a/playground/infrastructure/test_ci_cd.py
+++ b/playground/infrastructure/test_ci_cd.py
@@ -17,20 +17,33 @@ import mock
import pytest
from api.v1.api_pb2 import SDK_JAVA
-from ci_cd import _ci_step, _cd_step, _check_envs
+from ci_cd import _check_envs, _run_ci_cd
from config import Origin
-@mock.patch("ci_helper.CIHelper.verify_examples")
-def test_ci_step(mock_verify_examples):
- _ci_step([], Origin.PG_EXAMPLES)
- mock_verify_examples.assert_called_once_with([], Origin.PG_EXAMPLES)
-
-
-@mock.patch("cd_helper.CDHelper.save_examples")
-def test_cd_step(mock_save_examples):
- _cd_step([], SDK_JAVA, Origin.PG_EXAMPLES)
- mock_save_examples.assert_called_once_with([])
+@pytest.mark.parametrize("step", ["CI", "CD"])
+@mock.patch("ci_cd.DatastoreClient")
+@mock.patch("ci_cd.find_examples")
+@mock.patch("verify.Verifier._run_and_verify")
+def test_ci_step(
+ mock_run_and_verify, mock_find_examples, mock_datastore, create_test_example, step
+):
+ mock_find_examples.return_value = [
+ create_test_example(tag_meta=dict(name="Default", default_example=True)),
+ create_test_example(tag_meta=dict(name="Single", multifile=False)),
+ create_test_example(tag_meta=dict(name="Multi", multifile=True)),
+ ]
+ _run_ci_cd(
+ step,
+ "SDK_JAVA",
+ Origin.PG_EXAMPLES,
+ [
+ "../../examples",
+ ],
+ )
+ mock_run_and_verify.assert_called_once()
+ if step == "CD":
+ mock_datastore.assert_called_once()
def test__check_envs():
diff --git a/playground/infrastructure/test_ci_helper.py b/playground/infrastructure/test_ci_helper.py
deleted file mode 100644
index 4b9815622f9..00000000000
--- a/playground/infrastructure/test_ci_helper.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import copy
-import uuid
-
-import mock
-import pytest
-
-from api.v1.api_pb2 import SDK_JAVA, STATUS_FINISHED, STATUS_ERROR, \
- STATUS_VALIDATION_ERROR, STATUS_PREPARATION_ERROR, STATUS_RUN_TIMEOUT, \
- STATUS_COMPILE_ERROR, STATUS_RUN_ERROR
-from ci_helper import CIHelper, VerifyException
-from config import Origin
-from helper import Example, Tag
-
-
-@pytest.mark.asyncio
-@mock.patch("ci_helper.CIHelper._verify_examples")
-@mock.patch("ci_helper.get_statuses")
-async def test_verify_examples(mock_get_statuses, mock_verify_examples):
- helper = CIHelper()
- await helper.verify_examples([], Origin.PG_EXAMPLES)
-
- mock_get_statuses.assert_called_once_with(mock.ANY, [])
- mock_verify_examples.assert_called_once_with(mock.ANY, [], Origin.PG_EXAMPLES)
-
-
-@pytest.mark.asyncio
-async def test__verify_examples():
- helper = CIHelper()
- object_meta = {
- "name": "name",
- "description": "description",
- "multifile": False,
- "categories": ["category-1", "category-2"],
- "pipeline_options": "--option option",
- "default_example": False
- }
- object_meta_def_ex = copy.copy(object_meta)
- object_meta_def_ex["default_example"] = True
- pipeline_id = str(uuid.uuid4())
- default_example = Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_FINISHED,
- tag=Tag(**object_meta_def_ex),
- url_vcs="link")
- finished_example = Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_FINISHED,
- tag=Tag(**object_meta),
- url_vcs="link",
- url_notebook="notebook_link")
- examples_without_def_ex = [
- finished_example,
- finished_example,
- ]
- examples_with_several_def_ex = [
- default_example,
- default_example,
- ]
- examples_without_errors = [
- default_example,
- finished_example,
- ]
- examples_with_errors = [
- Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_VALIDATION_ERROR,
- tag=Tag(**object_meta_def_ex),
- url_vcs="link"),
- Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_ERROR,
- tag=Tag(**object_meta),
- url_vcs="link"),
- Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_COMPILE_ERROR,
- tag=Tag(**object_meta),
- url_vcs="link"),
- Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_PREPARATION_ERROR,
- tag=Tag(**object_meta),
- url_vcs="link"),
- Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_RUN_TIMEOUT,
- tag=Tag(**object_meta),
- url_vcs="link"),
- Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_VALIDATION_ERROR,
- tag=Tag(**object_meta),
- url_vcs="link"),
- Example(
- name="name",
- complexity="MEDIUM",
- pipeline_id=pipeline_id,
- sdk=SDK_JAVA,
- filepath="filepath",
- code="code_of_example",
- output="output_of_example",
- status=STATUS_RUN_ERROR,
- tag=Tag(**object_meta),
- url_vcs="link"),
- ]
- client = mock.AsyncMock()
- with pytest.raises(VerifyException):
- await helper._verify_examples(client, examples_with_errors, Origin.PG_EXAMPLES)
- with pytest.raises(VerifyException):
- await helper._verify_examples(client, examples_without_def_ex, Origin.PG_EXAMPLES)
- with pytest.raises(VerifyException):
- await helper._verify_examples(client, examples_with_several_def_ex, Origin.PG_EXAMPLES)
- await helper._verify_examples(client, examples_without_errors, Origin.PG_EXAMPLES)
diff --git a/playground/infrastructure/test_datastore_client.py b/playground/infrastructure/test_datastore_client.py
index 958c5d1d0df..a4945343542 100644
--- a/playground/infrastructure/test_datastore_client.py
+++ b/playground/infrastructure/test_datastore_client.py
@@ -18,10 +18,11 @@ from unittest.mock import MagicMock, ANY
import mock
import pytest
from mock.mock import call
-from config import Origin
+from google.cloud import datastore
+from config import Origin
from datastore_client import DatastoreClient, DatastoreException
-from api.v1.api_pb2 import SDK_JAVA
+from models import SdkEnum
from test_utils import _get_examples
"""
@@ -44,7 +45,7 @@ def test_save_to_cloud_datastore_when_schema_version_not_found(
):
examples = _get_examples(1)
client = DatastoreClient()
- client.save_to_cloud_datastore(examples, SDK_JAVA, Origin.PG_EXAMPLES)
+ client.save_to_cloud_datastore(examples, SdkEnum.JAVA, Origin.PG_EXAMPLES)
def test_save_to_cloud_datastore_when_google_cloud_project_id_not_set():
@@ -58,6 +59,7 @@ def test_save_to_cloud_datastore_when_google_cloud_project_id_not_set():
DatastoreClient()
+@pytest.mark.parametrize("with_kafka", [False, True])
@pytest.mark.parametrize(
"origin, key_prefix",
[
@@ -74,8 +76,10 @@ def test_save_to_cloud_datastore_in_the_usual_case(
mock_config_project,
mock_get_schema,
mock_get_examples,
+ create_test_example,
origin,
key_prefix,
+ with_kafka,
):
"""
Test saving examples to the cloud datastore in the usual case
@@ -86,9 +90,9 @@ def test_save_to_cloud_datastore_in_the_usual_case(
mock_get_examples.return_value = mock_examples
mock_config_project.return_value = "MOCK_PROJECT_ID"
- examples = _get_examples(1)
+ examples = [create_test_example(with_kafka=with_kafka)]
client = DatastoreClient()
- client.save_to_cloud_datastore(examples, SDK_JAVA, origin)
+ client.save_to_cloud_datastore(examples, SdkEnum.JAVA, origin)
mock_client.assert_called_once()
mock_get_schema.assert_called_once()
mock_get_examples.assert_called_once()
@@ -96,15 +100,35 @@ def test_save_to_cloud_datastore_in_the_usual_case(
call().transaction(),
call().transaction().__enter__(),
call().key("pg_sdks", "SDK_JAVA"),
- call().key("pg_examples", key_prefix + "SDK_JAVA_MOCK_NAME_0"),
- call().put(ANY),
- call().key("pg_snippets", key_prefix + "SDK_JAVA_MOCK_NAME_0"),
- call().put(ANY),
- call().key("pg_pc_objects", key_prefix + "SDK_JAVA_MOCK_NAME_0_OUTPUT"),
- call().put_multi([ANY]),
- call().key("pg_files", key_prefix + "SDK_JAVA_MOCK_NAME_0_0"),
+ call().key("pg_examples", key_prefix + "SDK_JAVA_MOCK_NAME"),
call().put(ANY),
- call().transaction().__exit__(None, None, None),
+ call().key("pg_snippets", key_prefix + "SDK_JAVA_MOCK_NAME"),
]
+ if with_kafka:
+ calls.append(
+ call().key(
+ "pg_datasets", "dataset_id_1"
+ ), # used in the nested datasets construction
+ )
+ calls.extend(
+ [
+ call().put(ANY),
+ call().key("pg_pc_objects", key_prefix + "SDK_JAVA_MOCK_NAME_OUTPUT"),
+ call().put_multi([ANY]),
+ call().key("pg_files", key_prefix + "SDK_JAVA_MOCK_NAME_0"),
+ call().put(ANY),
+ ]
+ )
+ if with_kafka:
+ calls.extend(
+ [
+ call().key("pg_datasets", "dataset_id_1"),
+ call().put_multi([ANY]),
+ ]
+ )
+ calls.append(
+ call().transaction().__exit__(None, None, None),
+ )
+
mock_client.assert_has_calls(calls, any_order=False)
mock_client.delete_multi.assert_not_called()
diff --git a/playground/infrastructure/test_helper.py b/playground/infrastructure/test_helper.py
index f0ee4b1ae48..1b2277f6d8f 100644
--- a/playground/infrastructure/test_helper.py
+++ b/playground/infrastructure/test_helper.py
@@ -12,42 +12,45 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Dict, Union, List
+from typing import Dict, List, Any, Optional
from unittest.mock import mock_open
import mock
import pytest
+import pydantic
from api.v1.api_pb2 import (
SDK_UNSPECIFIED,
STATUS_UNSPECIFIED,
STATUS_VALIDATING,
STATUS_FINISHED,
- SDK_JAVA,
PRECOMPILED_OBJECT_TYPE_EXAMPLE,
PRECOMPILED_OBJECT_TYPE_KATA,
PRECOMPILED_OBJECT_TYPE_UNIT_TEST,
)
-from config import Emulator, Topic, Dataset
from grpc_client import GRPCClient
+from models import (
+ ComplexityEnum,
+ SdkEnum,
+ Emulator,
+ Topic,
+ EmulatorType,
+ Dataset,
+ DatasetFormat,
+ DatasetLocation,
+)
from helper import (
find_examples,
Example,
- _get_example,
- _get_name,
+ _load_example,
get_tag,
- _validate,
Tag,
get_statuses,
_check_no_nested,
_update_example_status,
- get_supported_categories,
- _check_file,
_get_object_type,
- ExampleTag,
validate_examples_for_duplicates_by_name,
- ValidationException,
- validate_example_fields,
+ DuplicatesError,
)
@@ -66,27 +69,28 @@ def test_check_for_nested():
@pytest.mark.parametrize("is_valid", [True, False])
@mock.patch("helper._check_no_nested")
-@mock.patch("helper._check_file")
+@mock.patch("helper._load_example")
@mock.patch("helper.os.walk")
-def test_find_examples(mock_os_walk, mock_check_file, mock_check_no_nested, is_valid):
+def test_find_examples(
+ mock_os_walk, mock_load_example, mock_check_no_nested, is_valid, create_test_example
+):
mock_os_walk.return_value = [
("/root/sub1", (), ("file.java",)),
("/root/sub2", (), ("file2.java",)),
]
- mock_check_file.return_value = not is_valid
- sdk = SDK_UNSPECIFIED
if is_valid:
- result = find_examples(
- root_dir="/root", subdirs=["sub1", "sub2"], supported_categories=[], sdk=sdk
+ mock_load_example.return_value = create_test_example()
+ assert (
+ find_examples(root_dir="/root", subdirs=["sub1", "sub2"], sdk=SdkEnum.JAVA)
+ == [create_test_example()] * 4
)
- assert not result
else:
+ mock_load_example.side_effect = Exception("MOCK_ERROR")
with pytest.raises(
- ValueError,
- match="Some of the beam examples contain beam playground tag with "
- "an incorrect format",
+ ValueError,
+ match="Some of the beam examples contain beam playground tag with an incorrect format",
):
- find_examples("/root", ["sub1", "sub2"], [], sdk=sdk)
+ find_examples(root_dir="/root", subdirs=["sub1", "sub2"], sdk=SdkEnum.JAVA)
mock_check_no_nested.assert_called_once_with(["sub1", "sub2"])
mock_os_walk.assert_has_calls(
@@ -95,21 +99,17 @@ def test_find_examples(mock_os_walk, mock_check_file, mock_check_no_nested, is_v
mock.call("/root/sub2"),
]
)
- mock_check_file.assert_has_calls(
+ mock_load_example.assert_has_calls(
[
mock.call(
- examples=[],
filename="file.java",
filepath="/root/sub1/file.java",
- supported_categories=[],
- sdk=sdk,
+ sdk=SdkEnum.JAVA,
),
mock.call(
- examples=[],
filename="file2.java",
filepath="/root/sub2/file2.java",
- supported_categories=[],
- sdk=sdk,
+ sdk=SdkEnum.JAVA,
),
]
)
@@ -117,19 +117,8 @@ def test_find_examples(mock_os_walk, mock_check_file, mock_check_no_nested, is_v
@pytest.mark.asyncio
@mock.patch("helper._update_example_status")
-async def test_get_statuses(mock_update_example_status):
- example = Example(
- name="file",
- complexity="MEDIUM",
- pipeline_id="pipeline_id",
- sdk=SDK_UNSPECIFIED,
- filepath="root/file.extension",
- code="code",
- output="output",
- status=STATUS_UNSPECIFIED,
- tag={"name": "Name"},
- url_vcs="link"
- )
+async def test_get_statuses(mock_update_example_status, create_test_example):
+ example = create_test_example()
client = mock.sentinel
await get_statuses(client, [example])
@@ -138,221 +127,158 @@ async def test_get_statuses(mock_update_example_status):
@mock.patch(
"builtins.open",
- mock_open(read_data="...\n# beam-playground:\n# name: Name\n\nimport ..."),
-)
-def test_get_tag_when_tag_is_exists():
- result = get_tag("")
-
- assert result.tag_as_dict.get("name") == "Name"
- assert result.tag_as_string == "# beam-playground:\n# name: Name\n\n"
-
-
-@mock.patch("builtins.open", mock_open(read_data="...\n..."))
-def test_get_tag_when_tag_does_not_exist():
- result = get_tag("")
-
- assert result is None
-
-
-@mock.patch("helper._get_example")
-@mock.patch("helper._validate")
-@mock.patch("helper.get_tag")
-def test__check_file_with_correct_tag(mock_get_tag, mock_validate, mock_get_example):
- tag = ExampleTag({"name": "Name"}, "")
- example = Example(
- name="filename",
- complexity="MEDIUM",
- sdk=SDK_JAVA,
- filepath="/root/filename.java",
- code="data",
- status=STATUS_UNSPECIFIED,
- tag=Tag("Name", "Description", False, [], "--option option"),
- url_vcs="link",
- )
- examples = []
-
- mock_get_tag.return_value = tag
- mock_validate.return_value = True
- mock_get_example.return_value = example
-
- result = _check_file(
- examples, "filename.java", "/root/filename.java", [], sdk=SDK_JAVA
- )
-
- assert result is False
- assert len(examples) == 1
- assert examples[0] == example
- mock_get_tag.assert_called_once_with("/root/filename.java")
- mock_validate.assert_called_once_with(tag.tag_as_dict, [])
- mock_get_example.assert_called_once_with(
- "/root/filename.java", "filename.java", tag
- )
-
-
-@mock.patch("helper._validate")
-@mock.patch("helper.get_tag")
-def test__check_file_with_incorrect_tag(mock_get_tag, mock_validate):
- tag = ExampleTag({"name": "Name"}, "")
- examples = []
- sdk = SDK_JAVA
- mock_get_tag.return_value = tag
- mock_validate.return_value = False
-
- result = _check_file(examples, "filename.java", "/root/filename.java", [], sdk)
-
- assert result is True
- assert len(examples) == 0
- mock_get_tag.assert_called_once_with("/root/filename.java")
- mock_validate.assert_called_once_with(tag.tag_as_dict, [])
-
-
-@mock.patch("builtins.open", mock_open(read_data="categories:\n - category"))
-def test_get_supported_categories():
- result = get_supported_categories("")
-
- assert len(result) == 1
- assert result[0] == "category"
+ mock_open(
+ read_data="""
+// license line 1
+// license line 2
+//
+// beam-playground:
+// name: KafkaWordCount
+// description: Test example with Apache Kafka
+// multifile: false
+// context_line: 28
+// categories:
+// - Filtering
+// - Options
+// - Quickstart
+// complexity: MEDIUM
+// tags:
+// - filter
+// - strings
+// - emulator
+// emulators:
+// - type: kafka
+// topic:
+// id: topic_1
+// source_dataset: dataset_id_1
+// datasets:
+// dataset_id_1:
+// location: local
+// format: json
+code line 1
+code line 2
-@mock.patch("builtins.open", mock_open(read_data="data"))
-def test__get_example():
- tag = ExampleTag(
- {
- "name": "Name",
- "description": "Description",
- "multifile": "False",
- "categories": [""],
- "pipeline_options": "--option option",
- "context_line": 1,
- "complexity": "MEDIUM",
- "url_notebook": "https://some_url/py-name.ipynb",
- "emulators": {"kafka": {"topic": {"id": "dataset", "dataset": "dataset"}}},
- "datasets": {"dataset": {"location": "local", "format": "json"}},
- },
- "",
+"""
+ ),
+)
+def test_load_example():
+ example = _load_example(
+ "kafka.java", "../../examples/path/kafka.java", SdkEnum.JAVA
)
-
- result = _get_example("../../examples/dir/filepath.java", "filepath.java", tag)
-
- assert result == Example(
- name="Name",
- sdk=SDK_JAVA,
- filepath="../../examples/dir/filepath.java",
- code="data",
+ assert example == Example(
+ sdk=SdkEnum.JAVA,
type=PRECOMPILED_OBJECT_TYPE_EXAMPLE,
- status=STATUS_UNSPECIFIED,
+ filepath="../../examples/path/kafka.java",
+ code="""
+// license line 1
+// license line 2
+//
+
+code line 1
+code line 2
+
+""",
+ url_vcs="https://github.com/apache/beam/blob/master/examples/path/kafka.java", # type: ignore
+ context_line=5,
tag=Tag(
- "Name",
- "MEDIUM",
- {"kafka": {"topic": {"id": "dataset", "dataset": "dataset"}}},
- {"dataset": {"location": "local", "format": "json"}},
- "Description",
- "False",
- [""],
- "--option option",
- False,
- 1,
- None,
- "https://some_url/py-name.ipynb",
+ line_start=4,
+ line_finish=27,
+ name="KafkaWordCount",
+ description="Test example with Apache Kafka",
+ multifile=False,
+ context_line=28,
+ categories=["Filtering", "Options", "Quickstart"],
+ complexity=ComplexityEnum.MEDIUM,
+ tags=["filter", "strings", "emulator"],
+ emulators=[
+ Emulator(
+ type=EmulatorType.KAFKA,
+ topic=Topic(id="topic_1", source_dataset="dataset_id_1"),
+ )
+ ],
+ datasets={
+ "dataset_id_1": Dataset(
+ location=DatasetLocation.LOCAL, format=DatasetFormat.JSON
+ )
+ },
),
- url_vcs="https://github.com/apache/beam/blob/master/examples/dir/filepath.java",
- url_notebook="https://some_url/py-name.ipynb",
- complexity="MEDIUM",
- emulators=[
- Emulator(topic=Topic(id="dataset", dataset="dataset"), name="kafka")
- ],
- datasets=[Dataset(format="json", location="local", name="dataset")],
)
-def test__validate_without_name_field():
- tag = {}
- assert _validate(tag, []) is False
-
-
-def test__validate_without_description_field():
- tag = {"name": "Name"}
- assert _validate(tag, []) is False
-
-
-def test__validate_without_multifile_field():
- tag = {"name": "Name", "description": "Description"}
- assert _validate(tag, []) is False
-
-
-def test__validate_with_incorrect_multifile_field():
- tag = {"name": "Name", "description": "Description", "multifile": "Multifile"}
- assert _validate(tag, []) is False
-
+def test__validate_without_name_field(create_test_tag):
+ with pytest.raises(
+ pydantic.ValidationError,
+ match="field required",
+ ):
+ create_test_tag(name=None)
-def test__validate_without_categories_field():
- tag = {"name": "Name", "description": "Description", "multifile": "true"}
- assert _validate(tag, []) is False
+def test__validate_without_description_field(create_test_tag):
+ with pytest.raises(
+ pydantic.ValidationError,
+ match="field required",
+ ):
+ create_test_tag(description=None)
-def test__validate_without_incorrect_categories_field():
- tag = {
- "name": "Name",
- "description": "Description",
- "multifile": "true",
- "categories": "Categories",
- }
- assert _validate(tag, []) is False
+def test__validate_with_incorrect_multifile_field(create_test_tag):
+ with pytest.raises(
+ pydantic.ValidationError,
+ match="value could not be parsed to a boolean",
+ ):
+ create_test_tag(multifile="multifile")
-def test__validate_with_not_supported_category():
- tag = {
- "name": "Name",
- "description": "Description",
- "multifile": "true",
- "categories": ["category1"],
- }
- assert _validate(tag, ["category"]) is False
+def test__validate_without_categories_field(create_test_tag):
+ with pytest.raises(
+ pydantic.ValidationError,
+ match="field required",
+ ):
+ create_test_tag(categories=None)
-def test__validate_with_all_fields():
- tag = {
- "name": "Name",
- "description": "Description",
- "multifile": "true",
- "categories": ["category"],
- "pipeline_options": "--option option",
- "context_line": 1,
- "complexity": "MEDIUM",
- "tags": ["tag"],
- "emulator": "KAFKA",
- "dataset": "dataset.json",
- }
- assert _validate(tag, ["category"]) is True
+def test__validate_with_incorrect_categories_field(create_test_tag):
+ with pytest.raises(
+ pydantic.ValidationError,
+ match="value is not a valid list",
+ ):
+ create_test_tag(categories="MOCK_CATEGORY_1")
-def test__get_name():
- result = _get_name("filepath.extension")
- assert result == "filepath"
+def test__validate_with_not_supported_category(create_test_tag):
+ with pytest.raises(
+ pydantic.ValidationError,
+ match="Category MOCK_CATEGORY_1 not in",
+ ):
+ create_test_tag(categories=["MOCK_CATEGORY_1"])
@pytest.mark.asyncio
@mock.patch("grpc_client.GRPCClient.check_status")
@mock.patch("grpc_client.GRPCClient.run_code")
-async def test__update_example_status(mock_grpc_client_run_code, mock_grpc_client_check_status):
+async def test__update_example_status(
+ mock_grpc_client_run_code, mock_grpc_client_check_status
+):
example = Example(
- name="file",
- complexity="MEDIUM",
+ tag=Tag(
+ line_start=10,
+ line_finish=20,
+ context_line=100,
+ name="file",
+ description="MOCK_DESCRIPTION",
+ complexity=ComplexityEnum.MEDIUM,
+ pipeline_options="--key value",
+ categories=["Testing"],
+ ),
+ context_line=100,
pipeline_id="pipeline_id",
- sdk=SDK_UNSPECIFIED,
+ sdk=SdkEnum.JAVA,
filepath="root/file.extension",
code="code",
output="output",
status=STATUS_UNSPECIFIED,
- tag=Tag(
- name="MOCK_NAME",
- description="MOCK_DESCR",
- context_line=333,
- pipeline_options="--key value",
- complexity="MEDIUM",
- ),
- url_vcs="link",
+ url_vcs="https://github.com/link", # type: ignore
)
mock_grpc_client_run_code.return_value = "pipeline_id"
@@ -384,192 +310,141 @@ def test__get_object_type():
assert result_test == PRECOMPILED_OBJECT_TYPE_UNIT_TEST
-def test_validate_examples_for_duplicates_by_name_in_the_usual_case():
+def test_validate_examples_for_duplicates_by_name_in_the_usual_case(
+ create_test_example,
+):
examples_names = ["MOCK_NAME_1", "MOCK_NAME_2", "MOCK_NAME_3"]
- examples = list(map(lambda name: _create_example(name), examples_names))
+ examples = list(
+ map(lambda name: create_test_example(tag_meta=dict(name=name)), examples_names)
+ )
try:
validate_examples_for_duplicates_by_name(examples)
- except ValidationException:
+ except DuplicatesError:
pytest.fail("Unexpected ValidationException")
-def test_validate_examples_for_duplicates_by_name_when_examples_have_duplicates():
+def test_validate_examples_for_duplicates_by_name_when_examples_have_duplicates(
+ create_test_example,
+):
examples_names = ["MOCK_NAME_1", "MOCK_NAME_2", "MOCK_NAME_1", "MOCK_NAME_3"]
- examples = list(map(lambda name: _create_example(name), examples_names))
+ examples = list(
+ map(lambda name: create_test_example(tag_meta=dict(name=name)), examples_names)
+ )
with pytest.raises(
- ValidationException,
- match="Examples have duplicate names.\nDuplicates: \n - path #1: MOCK_FILEPATH \n - path #2: MOCK_FILEPATH",
+ DuplicatesError,
+ match="Examples have duplicate names.\nDuplicates: \n - path #1: MOCK_FILEPATH \n - path #2: MOCK_FILEPATH",
):
validate_examples_for_duplicates_by_name(examples)
-def test_validate_example_fields_when_filepath_is_invalid():
- example = _create_example("MOCK_NAME")
- example.filepath = ""
+def test_validate_example_fields_when_filepath_is_invalid(create_test_example):
with pytest.raises(
- ValidationException, match="Example doesn't have a file path field. Example: "
+ pydantic.ValidationError,
+ match="ensure this value has at least 1 characters",
):
- validate_example_fields(example)
+ create_test_example(filepath="")
-def test_validate_example_fields_when_name_is_invalid():
- example = _create_example("")
+def test_validate_example_fields_when_sdk_is_invalid(create_test_example):
with pytest.raises(
- ValidationException,
- match="Example doesn't have a name field. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="value is not a valid enumeration member",
):
- validate_example_fields(example)
+ create_test_example(sdk=SDK_UNSPECIFIED)
-def test_validate_example_fields_when_sdk_is_invalid():
- example = _create_example("MOCK_NAME")
- example.sdk = SDK_UNSPECIFIED
+def test_validate_example_fields_when_code_is_invalid(create_test_example):
with pytest.raises(
- ValidationException,
- match="Example doesn't have a sdk field. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="ensure this value has at least 1 characters",
):
- validate_example_fields(example)
+ create_test_example(code="")
-def test_validate_example_fields_when_code_is_invalid():
- example = _create_example("MOCK_NAME")
- example.code = ""
+def test_validate_example_fields_when_url_vcs_is_invalid(create_test_example):
with pytest.raises(
- ValidationException,
- match="Example doesn't have a code field. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="ensure this value has at least 1 characters",
):
- validate_example_fields(example)
-
+ create_test_example(url_vcs="")
-def test_validate_example_fields_when_link_is_invalid():
- example = _create_example("MOCK_NAME")
- example.url_vcs = ""
- with pytest.raises(ValidationException, match="Example doesn't have a url_vcs field. Path: MOCK_FILEPATH"):
- validate_example_fields(example)
-
-def test_validate_example_fields_when_complexity_is_invalid():
- example = _create_example("MOCK_NAME")
- example.complexity = ""
- with pytest.raises(
- ValidationException,
- match="Example doesn't have a complexity field. Path: MOCK_FILEPATH",
- ):
- validate_example_fields(example)
-
-
-def test_validate_example_fields_when_dataset_not_set_but_emulator_set():
- example = _create_example("MOCK_NAME")
- emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="kafka")
- example.emulators.append(emulator)
+def test_validate_example_fields_when_name_is_invalid(create_test_tag):
with pytest.raises(
- ValidationException,
- match="Example has an emulators field but a datasets field not found. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="ensure this value has at least 1 characters",
):
- validate_example_fields(example)
+ create_test_tag(name="")
-def test_validate_example_fields_when_emulator_not_set_but_dataset_set():
- example = _create_example("MOCK_NAME")
- dataset = Dataset(format="json", location="local", name="dataset")
- example.datasets.append(dataset)
+def test_validate_example_fields_when_complexity_is_invalid(create_test_tag):
with pytest.raises(
- ValidationException,
- match="Example has a datasets field but an emulators field not found. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="value is not a valid enumeration member",
):
- validate_example_fields(example)
+ create_test_tag(complexity="")
-def test_validate_example_fields_when_topic_dataset_is_invalid():
- example = _create_example("MOCK_NAME")
- emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="MOCK_DATASET"), name="kafka")
- dataset = Dataset(format="json", location="local", name="dataset")
- example.datasets.append(dataset)
- example.emulators.append(emulator)
+def test_validate_example_fields_when_emulator_not_set_but_dataset_set(create_test_tag):
with pytest.raises(
- ValidationException,
- match="Example has invalid emulator value. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="datasets w/o emulators",
):
- validate_example_fields(example)
+ create_test_tag(datasets={"dataset_id_1": {"format": "avro", "location": "local"}})
-def test_validate_example_fields_when_emulator_name_is_invalid():
- example = _create_example("MOCK_NAME")
- emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="MOCK_NAME")
- dataset = Dataset(format="json", location="local", name="dataset")
- example.datasets.append(dataset)
- example.emulators.append(emulator)
+def test_validate_example_fields_when_emulator_type_is_invalid(create_test_tag):
with pytest.raises(
- ValidationException,
- match="Example has invalid emulator value. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="value is not a valid enumeration member",
):
- validate_example_fields(example)
+ create_test_tag(
+ emulators=[
+ {
+ "type": "MOCK_TYPE",
+ "topic": {"id": "topic1", "source_dataset": "dataset_id_1"},
+ }
+ ],
+ datasets={"dataset_id_1": {"format": "json", "location": "local"}},
+ )
-def test_validate_example_fields_when_dataset_format_is_invalid():
- example = _create_example("MOCK_NAME")
- emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="dataset")
- dataset = Dataset(format="MOCK_FORMAT", location="local", name="dataset")
- example.datasets.append(dataset)
- example.emulators.append(emulator)
+def test_validate_example_fields_when_dataset_format_is_invalid(create_test_tag):
with pytest.raises(
- ValidationException,
- match="Example has invalid dataset value. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="value is not a valid enumeration member",
):
- validate_example_fields(example)
+ create_test_tag(
+ emulators=[
+ {"type": "kafka", "topic": {"id": "topic1", "source_dataset": "src"}}
+ ],
+ datasets={"src": {"format": "MOCK_FORMAT", "location": "local"}},
+ )
-def test_validate_example_fields_when_dataset_location_is_invalid():
- example = _create_example("MOCK_NAME")
- emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="dataset")
- dataset = Dataset(format="avro", location="MOCK_LOCATION", name="dataset")
- example.datasets.append(dataset)
- example.emulators.append(emulator)
+def test_validate_example_fields_when_dataset_location_is_invalid(create_test_tag):
with pytest.raises(
- ValidationException,
- match="Example has invalid dataset value. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="value is not a valid enumeration member",
):
- validate_example_fields(example)
+ create_test_tag(
+ emulators=[
+ {"type": "kafka", "topic": {"id": "topic1", "source_dataset": "src"}}
+ ],
+ datasets={"src": {"format": "avro", "location": "MOCK_LOCATION"}},
+ )
-def test_validate_example_fields_when_dataset_name_is_invalid():
- example = _create_example("MOCK_NAME")
- emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="dataset")
- dataset = Dataset(format="avro", location="MOCK_LOCATION", name="")
- example.datasets.append(dataset)
- example.emulators.append(emulator)
+def test_validate_example_fields_when_dataset_name_is_invalid(create_test_tag):
with pytest.raises(
- ValidationException,
- match="Example has invalid dataset value. Path: MOCK_FILEPATH",
+ pydantic.ValidationError,
+ match="mulator topic topic1 has undefined dataset src",
):
- validate_example_fields(example)
-
-
-def _create_example(name: str) -> Example:
- object_meta = {
- "name": "MOCK_NAME",
- "description": "MOCK_DESCRIPTION",
- "multifile": False,
- "categories": ["MOCK_CATEGORY_1", "MOCK_CATEGORY_2"],
- "pipeline_options": "--MOCK_OPTION MOCK_OPTION_VALUE",
- }
- return _create_example_with_meta(name, object_meta)
-
-
-def _create_example_with_meta(name: str, object_meta: Dict[str, Union[str, bool, List[str]]]) -> Example:
- example = Example(
- name=name,
- pipeline_id="MOCK_PIPELINE_ID",
- sdk=SDK_JAVA,
- filepath="MOCK_FILEPATH",
- code="MOCK_CODE",
- output="MOCK_OUTPUT",
- status=STATUS_UNSPECIFIED,
- tag=Tag(**object_meta),
- url_vcs="MOCK_LINK",
- complexity="MOCK_COMPLEXITY",
- )
- return example
+ create_test_tag(
+ emulators=[
+ {"type": "kafka", "topic": {"id": "topic1", "source_dataset": "src"}}
+ ]
+ )
@mock.patch(
@@ -592,10 +467,10 @@ def _create_example_with_meta(name: str, object_meta: Dict[str, Union[str, bool,
// - strings
// - emulator
// emulators:
-// kafka:
-// topic:
-// id: topic_1
-// dataset: dataset_id_1
+// - type: kafka
+// topic:
+// id: topic_1
+// source_dataset: dataset_id_1
// datasets:
// dataset_id_1:
// location: local
@@ -605,10 +480,11 @@ def _create_example_with_meta(name: str, object_meta: Dict[str, Union[str, bool,
),
)
def test_get_tag_with_datasets():
- example = get_tag("filepath")
- example.tag_as_string = "" # to not compare with itself
- assert example == ExampleTag(
- tag_as_dict={
+ tag = get_tag("filepath")
+ assert tag == Tag(
+ **{
+ "line_start": 2,
+ "line_finish": 25,
"name": "KafkaWordCount",
"description": "Test example with Apache Kafka",
"multifile": False,
@@ -616,8 +492,24 @@ def test_get_tag_with_datasets():
"categories": ["Filtering", "Options", "Quickstart"],
"complexity": "MEDIUM",
"tags": ["filter", "strings", "emulator"],
- "emulators": {"kafka": {"topic": {"id": "topic_1", "dataset": "dataset_id_1"}}},
+ "emulators": [
+ {
+ "type": "kafka",
+ "topic": {"id": "topic_1", "source_dataset": "dataset_id_1"},
+ }
+ ],
"datasets": {"dataset_id_1": {"location": "local", "format": "json"}},
},
- tag_as_string="",
)
+
+@mock.patch("os.path.isfile", return_value=True)
+def test_dataset_path_ok(mock_file_check, create_test_example):
+ example = create_test_example(with_kafka=True)
+ assert len(example.tag.datasets) > 0
+ assert example.tag.datasets.popitem()[1].file_name == "dataset_id_1.avro"
+
+
+@mock.patch("os.path.isfile", return_value=False)
+def test_dataset_path_notfound(mock_file_check, create_test_example):
+ with pytest.raises(FileNotFoundError):
+ create_test_example(with_kafka=True)
\ No newline at end of file
diff --git a/playground/infrastructure/test_repository.py b/playground/infrastructure/test_repository.py
deleted file mode 100644
index dfbd00df467..00000000000
--- a/playground/infrastructure/test_repository.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import mock
-import pytest
-
-from config import Dataset, RepoProps
-from repository import set_dataset_path_for_examples
-from test_utils import _get_examples
-
-"""
-Unit tests for the Cloud Storage client
-"""
-
-
-@mock.patch("os.path.isfile", return_value=True)
-def test_set_dataset_path_for_examples(mock_file_check):
- examples = _get_examples_with_datasets(3)
- set_dataset_path_for_examples(examples)
- for example in examples:
- assert example.datasets[0].path == "MOCK_NAME.MOCK_FORMAT"
-
-
-@mock.patch("os.path.isfile", return_value=False)
-def test_set_dataset_path_for_examples_when_path_is_invalid(mock_file_check):
- with pytest.raises(FileNotFoundError):
- examples = _get_examples_with_datasets(1)
- set_dataset_path_for_examples(examples)
-
-
-def _get_examples_with_datasets(number_of_examples: int):
- examples = _get_examples(number_of_examples)
- for example in examples:
- datasets = []
- dataset = Dataset(
- format="MOCK_FORMAT",
- location="MOCK_LOCATION",
- name="MOCK_NAME"
- )
- datasets.append(dataset)
- example.datasets = datasets
- return examples
diff --git a/playground/infrastructure/test_utils.py b/playground/infrastructure/test_utils.py
index 68950a3659c..e65e723dccc 100644
--- a/playground/infrastructure/test_utils.py
+++ b/playground/infrastructure/test_utils.py
@@ -16,29 +16,33 @@
from typing import List
from api.v1.api_pb2 import SDK_JAVA, STATUS_UNSPECIFIED
-from helper import Example, Tag
+from models import Example, Tag, SdkEnum, ComplexityEnum
def _get_examples(number_of_examples: int) -> List[Example]:
examples = []
for number in range(number_of_examples):
- object_meta = {
- "name": f"MOCK_NAME_{number}",
- "description": f"MOCK_DESCRIPTION_{number}",
- "multifile": False,
- "categories": ["MOCK_CATEGORY_1", "MOCK_CATEGORY_2"],
- "pipeline_options": "--MOCK_OPTION MOCK_OPTION_VALUE"
- }
- example = Example(
+ tag = Tag(
+ line_start=100,
+ line_finish=120,
+ context_line=123,
name=f"MOCK_NAME_{number}",
- complexity="MEDIUM",
+ complexity=ComplexityEnum.MEDIUM,
+ description=f"MOCK_DESCRIPTION_{number}",
+ multifile=False,
+ categories=["Side Input", "Multiple Outputs"],
+ pipeline_options="--MOCK_OPTION MOCK_OPTION_VALUE",
+ )
+ example = Example(
+ tag=tag,
+ context_line=123,
pipeline_id=f"MOCK_PIPELINE_ID_{number}",
- sdk=SDK_JAVA,
+ sdk=SdkEnum.JAVA,
filepath=f"MOCK_FILEPATH_{number}",
code=f"MOCK_CODE_{number}",
output=f"MOCK_OUTPUT_{number}",
status=STATUS_UNSPECIFIED,
- tag=Tag(**object_meta),
- url_vcs=f"MOCK_LINK_{number}")
+ url_vcs=f"https://mock.link/{number}", # type: ignore
+ )
examples.append(example)
return examples
diff --git a/playground/infrastructure/test_verify.py b/playground/infrastructure/test_verify.py
new file mode 100644
index 00000000000..df887478a98
--- /dev/null
+++ b/playground/infrastructure/test_verify.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+import pytest
+
+from api.v1.api_pb2 import (
+ SDK_JAVA,
+ STATUS_FINISHED,
+ STATUS_ERROR,
+ STATUS_VALIDATION_ERROR,
+ STATUS_PREPARATION_ERROR,
+ STATUS_RUN_TIMEOUT,
+ STATUS_COMPILE_ERROR,
+ STATUS_RUN_ERROR,
+)
+from verify import Verifier, VerifyException
+from config import Origin
+from models import SdkEnum
+
+
+@pytest.mark.asyncio
+async def test__verify_examples(create_test_example):
+ verifier = Verifier(SdkEnum.JAVA, Origin.PG_EXAMPLES)
+ default_example = create_test_example(tag_meta=dict(default_example=True))
+ finished_example = create_test_example(tag_meta=dict(default_example=True))
+ finished_example = create_test_example(status=STATUS_FINISHED)
+ examples_without_def_ex = [
+ finished_example,
+ finished_example,
+ ]
+ examples_with_several_def_ex = [
+ default_example,
+ default_example,
+ ]
+ examples_without_errors = [
+ default_example,
+ finished_example,
+ ]
+ examples_with_errors = [
+ create_test_example(status=STATUS_VALIDATION_ERROR),
+ create_test_example(status=STATUS_ERROR),
+ create_test_example(status=STATUS_COMPILE_ERROR),
+ create_test_example(status=STATUS_PREPARATION_ERROR),
+ create_test_example(status=STATUS_RUN_TIMEOUT),
+ create_test_example(status=STATUS_RUN_ERROR),
+ ]
+ client = mock.AsyncMock()
+ with pytest.raises(VerifyException):
+ await verifier._verify_examples(
+ client, examples_with_errors, Origin.PG_EXAMPLES
+ )
+ with pytest.raises(VerifyException):
+ await verifier._verify_examples(
+ client, examples_without_def_ex, Origin.PG_EXAMPLES
+ )
+ with pytest.raises(VerifyException):
+ await verifier._verify_examples(
+ client, examples_with_several_def_ex, Origin.PG_EXAMPLES
+ )
+ await verifier._verify_examples(client, examples_without_errors, Origin.PG_EXAMPLES)
diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/verify.py
similarity index 60%
rename from playground/infrastructure/ci_helper.py
rename to playground/infrastructure/verify.py
index a70a515ca54..9d3784bd600 100644
--- a/playground/infrastructure/ci_helper.py
+++ b/playground/infrastructure/verify.py
@@ -13,56 +13,86 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""
-Module contains the helper for CI step.
-
-It is used to find and verify correctness if beam examples/katas/tests.
-"""
-
+import asyncio
import logging
from typing import List
-from api.v1.api_pb2 import STATUS_COMPILE_ERROR, STATUS_ERROR, STATUS_RUN_ERROR, \
- STATUS_RUN_TIMEOUT, \
- STATUS_VALIDATION_ERROR, STATUS_PREPARATION_ERROR
-from config import Config, Origin
+from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA
+from api.v1.api_pb2 import (
+ STATUS_COMPILE_ERROR,
+ STATUS_ERROR,
+ STATUS_RUN_ERROR,
+ STATUS_RUN_TIMEOUT,
+ STATUS_VALIDATION_ERROR,
+ STATUS_PREPARATION_ERROR,
+)
+from config import Origin, Config
from grpc_client import GRPCClient
-from helper import Example, get_statuses
-from repository import set_dataset_path_for_examples
+from models import Example, SdkEnum
+from helper import get_statuses
class VerifyException(Exception):
- def __init__(self, error: str):
- super().__init__()
- self.msg = error
+ pass
- def __str__(self):
- return self.msg
+class Verifier:
+ """Run examples and verify the results, enrich examples with produced artifacts"""
-class CIHelper:
- """
- Helper for CI step.
+ _sdk: SdkEnum
+ _origin: Origin
- It is used to find and verify correctness if beam examples/katas/tests.
- """
+ def __init__(self, sdk: SdkEnum, origin: Origin):
+ self._sdk = sdk
+ self._origin = origin
- async def verify_examples(self, examples: List[Example], origin: Origin):
+ def run_verify(self, examples: List[Example]):
"""
- Verify correctness of beam examples.
+ Save beam examples and their output in the Google Cloud Datastore.
- 1. Find all beam examples starting from directory os.getenv("BEAM_ROOT_DIR")
- 2. Group code of examples by their SDK.
- 3. Run processing for single-file examples to verify examples' code.
"""
- single_file_examples = list(filter(
- lambda example: example.tag.multifile is False, examples))
- set_dataset_path_for_examples(single_file_examples)
- async with GRPCClient() as client:
- await get_statuses(client, single_file_examples)
- await self._verify_examples(client, single_file_examples, origin)
+ logging.info("Start of executing Playground examples ...")
+ asyncio.run(self._run_and_verify(examples))
+ logging.info("Finish of executing Playground examples")
+
+ async def _run_and_verify(self, examples: List[Example]):
+ """
+ Run beam examples and keep their output.
+
+ Call the backend to start code processing for the examples.
+ Then receive code output.
- async def _verify_examples(self, client: any, examples: List[Example], origin: Origin):
+ Args:
+ examples: beam examples that should be run
+ """
+
+ async def _populate_fields(example: Example):
+ try:
+ example.compile_output = await client.get_compile_output(
+ example.pipeline_id
+ )
+ example.output = await client.get_run_output(example.pipeline_id)
+ example.logs = await client.get_log(example.pipeline_id)
+ if example.sdk in [SDK_JAVA, SDK_PYTHON]:
+ example.graph = await client.get_graph(
+ example.pipeline_id, example.filepath
+ )
+ except Exception as e:
+ logging.error(example.url_vcs)
+ logging.error(example.compile_output)
+ raise RuntimeError(f"error in {example.tag.name}") from e
+
+ async with GRPCClient() as client:
+ await get_statuses(
+ client, examples
+ ) # run examples code and wait until all are executed
+ tasks = [_populate_fields(example) for example in examples]
+ await asyncio.gather(*tasks)
+ await self._verify_examples(client, examples, self._origin)
+
+ async def _verify_examples(
+ self, client: GRPCClient, examples: List[Example], origin: Origin
+ ):
"""
Verify statuses of beam examples and the number of found default examples.
@@ -93,34 +123,40 @@ class CIHelper:
logging.error("Example: %s has preparation error", example.filepath)
elif example.status == STATUS_ERROR:
logging.error(
- "Example: %s has error during setup run builder", example.filepath)
+ "Example: %s has error during setup run builder", example.filepath
+ )
elif example.status == STATUS_RUN_TIMEOUT:
logging.error("Example: %s failed because of timeout", example.filepath)
elif example.status == STATUS_COMPILE_ERROR:
err = await client.get_compile_output(example.pipeline_id)
logging.error(
- "Example: %s has compilation error: %s", example.filepath, err)
+ "Example: %s has compilation error: %s", example.filepath, err
+ )
elif example.status == STATUS_RUN_ERROR:
err = await client.get_run_error(example.pipeline_id)
logging.error(
- "Example: %s has execution error: %s", example.filepath, err)
+ "Example: %s has execution error: %s", example.filepath, err
+ )
verify_status_failed = True
logging.info(
"Number of verified Playground examples: %s / %s",
count_of_verified,
- len(examples))
+ len(examples),
+ )
logging.info(
"Number of Playground examples with some error: %s / %s",
len(examples) - count_of_verified,
- len(examples))
+ len(examples),
+ )
if origin == Origin.PG_EXAMPLES:
if len(default_examples) == 0:
logging.error("Default example not found")
raise VerifyException(
"CI step failed due to finding an incorrect number "
- "of default examples. Default example not found")
+ "of default examples. Default example not found"
+ )
if len(default_examples) > 1:
logging.error("Many default examples found")
logging.error("Examples where the default_example field is true:")
@@ -128,7 +164,8 @@ class CIHelper:
logging.error(example.filepath)
raise VerifyException(
"CI step failed due to finding an incorrect number "
- "of default examples. Many default examples found")
+ "of default examples. Many default examples found"
+ )
if verify_status_failed:
raise VerifyException("CI step failed due to errors in the examples")