You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/12/19 19:22:36 UTC

[beam] branch master updated: [Playground] infrastructure ci/cd unify (#24696)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d45bc530d2 [Playground] infrastructure ci/cd unify (#24696)
8d45bc530d2 is described below

commit 8d45bc530d2d97a097420b5b2a254db18f10f5a9
Author: Evgeny Antyshev <ea...@gmail.com>
AuthorDate: Mon Dec 19 22:22:25 2022 +0300

    [Playground] infrastructure ci/cd unify (#24696)
    
    * unnecessary typing
    
    * fix
    
    * requirements.txt
    
    * black
    
    * easy
    
    * BASIC
    
    * HttpUrl
    
    * examples
    
    * url_quote
    
    * source_dataset
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * cnt
    
    * ci_cd
    
    * verify
    
    * datastore_fixes
    
    * config
    
    * kafka_fix
    
    * -dup
---
 .../apache/beam/examples/KafkaWordCountAvro.java   |  14 +-
 .../apache/beam/examples/KafkaWordCountJson.java   |  14 +-
 .../python/Examples/Word Count/Word Count/task.py  |   2 +-
 playground/infrastructure/cd_helper.py             |  99 ----
 playground/infrastructure/ci_cd.py                 |  80 ++-
 playground/infrastructure/config.py                |  35 +-
 playground/infrastructure/conftest.py              |  99 ++++
 playground/infrastructure/datastore_client.py      |  91 ++-
 playground/infrastructure/grpc_client.py           |   3 +-
 playground/infrastructure/helper.py                | 479 +++++-----------
 playground/infrastructure/models.py                | 166 ++++++
 playground/infrastructure/repository.py            |  36 --
 playground/infrastructure/requirements.txt         |   2 +-
 playground/infrastructure/test_cd_helper.py        |  38 --
 playground/infrastructure/test_ci_cd.py            |  35 +-
 playground/infrastructure/test_ci_helper.py        | 176 ------
 playground/infrastructure/test_datastore_client.py |  50 +-
 playground/infrastructure/test_helper.py           | 630 +++++++++------------
 playground/infrastructure/test_repository.py       |  53 --
 playground/infrastructure/test_utils.py            |  30 +-
 playground/infrastructure/test_verify.py           |  73 +++
 .../infrastructure/{ci_helper.py => verify.py}     | 119 ++--
 22 files changed, 988 insertions(+), 1336 deletions(-)

diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
index c13a55e8557..81b9261a314 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
@@ -32,14 +32,14 @@ package org.apache.beam.examples;
 //     - strings
 //     - emulator
 //   emulators:
-//      kafka:
-//          topic:
-//              id: dataset
-//              dataset: CountWords
+//      - type: kafka
+//        topic:
+//          id: dataset
+//          source_dataset: CountWords
 //   datasets:
-//      CountWords:
-//          location: local
-//          format: avro
+//     CountWords:
+//       location: local
+//       format: avro
 
 import java.util.Collections;
 import java.util.HashMap;
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
index 29bb9be32c4..ed5dbc88824 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
@@ -32,14 +32,14 @@ package org.apache.beam.examples;
 //     - strings
 //     - emulator
 //   emulators:
-//      kafka:
-//          topic:
-//              id: dataset
-//              dataset: CountWords
+//      - type: kafka
+//        topic:
+//          id: dataset
+//          source_dataset: CountWords
 //   datasets:
-//      CountWords:
-//          location: local
-//          format: json
+//     CountWords:
+//       location: local
+//       format: json
 
 import java.util.Collections;
 import java.util.HashMap;
diff --git a/learning/katas/python/Examples/Word Count/Word Count/task.py b/learning/katas/python/Examples/Word Count/Word Count/task.py
index 4c605c401ef..fb3bf9c2386 100644
--- a/learning/katas/python/Examples/Word Count/Word Count/task.py	
+++ b/learning/katas/python/Examples/Word Count/Word Count/task.py	
@@ -18,7 +18,7 @@
 #   name: WordCountKata
 #   description: Task from katas to create a pipeline that counts the number of words.
 #   multifile: false
-#   context_line: 29
+#   context_line: 32
 #   categories:
 #     - Combiners
 #   complexity: BASIC
diff --git a/playground/infrastructure/cd_helper.py b/playground/infrastructure/cd_helper.py
deleted file mode 100644
index bfec3d54e28..00000000000
--- a/playground/infrastructure/cd_helper.py
+++ /dev/null
@@ -1,99 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-  Helper for CD step.
-
-  It is used to save beam examples/katas/tests and their output on the GCS.
-"""
-import asyncio
-import logging
-from typing import List
-
-from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA
-from config import Origin
-from datastore_client import DatastoreClient
-from grpc_client import GRPCClient
-from helper import Example, get_statuses
-from repository import set_dataset_path_for_examples
-
-
-class CDHelper:
-    """
-    Helper for CD step.
-
-    It is used to save beam examples/katas/tests and their output on the GCD.
-    """
-    _sdk: Sdk
-    _origin: Origin
-
-    def __init__(self, sdk: Sdk, origin: Origin):
-        self._sdk = sdk
-        self._origin = origin
-
-    def save_examples(self, examples: List[Example]):
-        """
-        Save beam examples and their output in the Google Cloud Datastore.
-
-        Outputs for multifile examples are left empty.
-        """
-        single_file_examples = list(filter(
-            lambda example: example.tag.multifile is False, examples))
-        set_dataset_path_for_examples(single_file_examples)
-        logging.info("Start of executing only single-file Playground examples ...")
-        asyncio.run(self._get_outputs(single_file_examples))
-        logging.info("Finish of executing single-file Playground examples")
-
-        logging.info("Start of sending Playground examples to the Cloud Datastore ...")
-        self._save_to_datastore(single_file_examples)
-        logging.info("Finish of sending Playground examples to the Cloud Datastore")
-
-    def _save_to_datastore(self, examples: List[Example]):
-        """
-        Save beam examples to the Google Cloud Datastore
-        :param examples: beam examples from the repository
-        """
-        datastore_client = DatastoreClient()
-        datastore_client.save_catalogs()
-        datastore_client.save_to_cloud_datastore(examples, self._sdk, self._origin)
-
-    async def _get_outputs(self, examples: List[Example]):
-        """
-        Run beam examples and keep their output.
-
-        Call the backend to start code processing for the examples.
-        Then receive code output.
-
-        Args:
-            examples: beam examples that should be run
-        """
-
-        async def _populate_fields(example: Example):
-            try:
-                example.compile_output = await client.get_compile_output(example.pipeline_id)
-                example.output = await client.get_run_output(example.pipeline_id)
-                example.logs = await client.get_log(example.pipeline_id)
-                if example.sdk in [SDK_JAVA, SDK_PYTHON]:
-                    example.graph = await client.get_graph(example.pipeline_id, example.filepath)
-            except Exception as e:
-                logging.error(example.url_vcs)
-                logging.error(example.compile_output)
-                raise RuntimeError(f"error in {example.name}") from e
-
-        async with GRPCClient() as client:
-            await get_statuses(client,
-                               examples)  # run examples code and wait until all are executed
-            tasks = [_populate_fields(example) for example in examples]
-            await asyncio.gather(*tasks)
diff --git a/playground/infrastructure/ci_cd.py b/playground/infrastructure/ci_cd.py
index 1dced6de964..35197ef95d3 100644
--- a/playground/infrastructure/ci_cd.py
+++ b/playground/infrastructure/ci_cd.py
@@ -22,93 +22,89 @@ import logging
 import os
 from typing import List
 
+from models import SdkEnum, Example, StringToSdkEnum
 from config import Config, Origin
-from api.v1.api_pb2 import Sdk
-from cd_helper import CDHelper
-from ci_helper import CIHelper
-from helper import find_examples, get_supported_categories, Example, validate_examples_for_duplicates_by_name
+from datastore_client import DatastoreClient
+from api.v1 import api_pb2
+from verify import Verifier
+from helper import (
+    find_examples,
+    load_supported_categories,
+    validate_examples_for_duplicates_by_name,
+)
 from logger import setup_logger
 
-parser = argparse.ArgumentParser(
-    description="CI/CD Steps for Playground objects")
+parser = argparse.ArgumentParser(description="CI/CD Steps for Playground objects")
 parser.add_argument(
     "--step",
     dest="step",
     required=True,
     help="CI step to verify all beam examples/tests/katas. CD step to save all "
-         "beam examples/tests/katas and their outputs on the GCD",
-    choices=[Config.CI_STEP_NAME, Config.CD_STEP_NAME])
+    "beam examples/tests/katas and their outputs on the GCD",
+    choices=[Config.CI_STEP_NAME, Config.CD_STEP_NAME],
+)
 parser.add_argument(
     "--sdk",
     dest="sdk",
     required=True,
     help="Supported SDKs",
-    choices=Config.SUPPORTED_SDK)
+    choices=Config.SUPPORTED_SDK,
+)
 parser.add_argument(
     "--origin",
     type=Origin,
     required=True,
     help="ORIGIN field of pg_examples/pg_snippets",
-    choices=[o.value for o in [Origin.PG_EXAMPLES, Origin.TB_EXAMPLES]])
+    choices=[o.value for o in [Origin.PG_EXAMPLES, Origin.TB_EXAMPLES]],
+)
 parser.add_argument(
     "--subdirs",
     default=[],
     nargs="+",
     required=True,
-    help="limit sub directories to walk through, relative to BEAM_ROOT_DIR")
+    help="limit sub directories to walk through, relative to BEAM_ROOT_DIR",
+)
 
 root_dir = os.getenv("BEAM_ROOT_DIR")
 categories_file = os.getenv("BEAM_EXAMPLE_CATEGORIES")
 
 
-def _ci_step(examples: List[Example], origin: Origin):
-    """
-    CI step to verify single-file beam examples/tests/katas
-    """
-
-    ci_helper = CIHelper()
-    asyncio.run(ci_helper.verify_examples(examples, origin))
-
-
-def _cd_step(examples: List[Example], sdk: Sdk, origin: Origin):
-    """
-    CD step to save all beam examples/tests/katas and their outputs on the GCD
-    """
-    cd_helper = CDHelper(sdk, origin)
-    cd_helper.save_examples(examples)
-
-
 def _check_envs():
     if root_dir is None:
-        raise KeyError(
-            "BEAM_ROOT_DIR environment variable should be specified in os")
+        raise KeyError("BEAM_ROOT_DIR environment variable should be specified in os")
     if categories_file is None:
         raise KeyError(
             "BEAM_EXAMPLE_CATEGORIES environment variable should be specified in os"
         )
 
 
-def _run_ci_cd(step: Config.CI_CD_LITERAL, sdk: Sdk, origin: Origin, subdirs: List[str]):
-    supported_categories = get_supported_categories(categories_file)
+def _run_ci_cd(step: str, raw_sdk: str, origin: Origin, subdirs: List[str]):
+    sdk: SdkEnum = StringToSdkEnum(raw_sdk)
+
+    load_supported_categories(categories_file)
     logging.info("Start of searching Playground examples ...")
-    examples = find_examples(root_dir, subdirs, supported_categories, sdk)
+    examples = find_examples(root_dir, subdirs, sdk)
     validate_examples_for_duplicates_by_name(examples)
     logging.info("Finish of searching Playground examples")
     logging.info("Number of found Playground examples: %s", len(examples))
 
-    if step == Config.CI_STEP_NAME:
-        logging.info(
-            "Start of verification only single_file Playground examples ...")
-        _ci_step(examples=examples, origin=origin)
-        logging.info("Finish of verification single_file Playground examples")
+    examples = list(filter(lambda example: example.tag.multifile is False, examples))
+    logging.info("Number of sinlge-file Playground examples: %s", len(examples))
+
+    logging.info("Execute Playground examples ...")
+    runner = Verifier(sdk, origin)
+    runner.run_verify(examples)
+
     if step == Config.CD_STEP_NAME:
-        logging.info("Start of saving Playground examples ...")
-        _cd_step(examples=examples, sdk=sdk, origin=origin)
-        logging.info("Finish of saving Playground examples")
+        logging.info("Start of sending Playground examples to the Cloud Datastore ...")
+        datastore_client = DatastoreClient()
+        datastore_client.save_catalogs()
+        datastore_client.save_to_cloud_datastore(examples, sdk, origin)
+        logging.info("Finish of sending Playground examples to the Cloud Datastore")
 
 
 if __name__ == "__main__":
     parser = parser.parse_args()
     _check_envs()
     setup_logger()
-    _run_ci_cd(parser.step, Sdk.Value(parser.sdk), parser.origin, parser.subdirs)
+    _run_ci_cd(parser.step, parser.sdk, parser.origin, parser.subdirs)
diff --git a/playground/infrastructure/config.py b/playground/infrastructure/config.py
index d11cc063e28..4cc5dbd4376 100644
--- a/playground/infrastructure/config.py
+++ b/playground/infrastructure/config.py
@@ -22,8 +22,6 @@ from dataclasses import dataclass
 from enum import Enum
 from typing import Literal
 
-from dataclasses_json import dataclass_json
-
 from api.v1.api_pb2 import STATUS_VALIDATION_ERROR, STATUS_ERROR, \
     STATUS_PREPARATION_ERROR, STATUS_COMPILE_ERROR, \
     STATUS_RUN_TIMEOUT, STATUS_RUN_ERROR, SDK_JAVA, SDK_GO, SDK_PYTHON, \
@@ -57,7 +55,7 @@ class Config:
     ]
     BEAM_PLAYGROUND_TITLE = "beam-playground:\n"
     BEAM_PLAYGROUND = "beam-playground"
-    PAUSE_DELAY = 10
+    PAUSE_DELAY = 1
     CI_STEP_NAME = "CI"
     CD_STEP_NAME = "CD"
     CI_CD_LITERAL = Literal["CI", "CD"]
@@ -98,15 +96,6 @@ class PrecompiledExampleType:
     test_ends = ("test", "it")
 
 
-@dataclass(frozen=True)
-class OptionalTagFields:
-    pipeline_options: str = "pipeline_options"
-    default_example: str = "default_example"
-    emulators: str = "emulators"
-    datasets: str = "datasets"
-    url_notebook: str = "url_notebook"
-
-
 @dataclass(frozen=True)
 class DatastoreProps:
     NAMESPACE = "Playground"
@@ -130,25 +119,3 @@ class Origin(str, Enum):
     TB_EXAMPLES = 'TB_EXAMPLES'
     TB_USER = 'TB_USER'
 
-
-@dataclass_json
-@dataclass
-class Dataset:
-    format: str
-    location: str
-    name: str = ""
-    path: str = ""
-
-
-@dataclass_json
-@dataclass
-class Topic:
-    id: str
-    dataset: str
-
-
-@dataclass_json
-@dataclass
-class Emulator:
-    topic: Topic
-    name: str = ""
diff --git a/playground/infrastructure/conftest.py b/playground/infrastructure/conftest.py
new file mode 100644
index 00000000000..3a776a4aac5
--- /dev/null
+++ b/playground/infrastructure/conftest.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os.path
+import pytest
+from typing import Optional, List, Dict, Any
+
+from models import Example, SdkEnum, Tag
+
+from helper import (
+    load_supported_categories,
+)
+
+
+@pytest.fixture(autouse=True, scope="session")
+def supported_categories():
+    load_supported_categories("../categories.yaml")
+
+
+@pytest.fixture(autouse=True)
+def mock_dataset_file_name(mocker):
+    def _mock_isfile(filepath):
+        if filepath in [
+            "../backend/datasets/dataset_id_1.json",
+            "../backend/datasets/dataset_id_1.avro",
+        ]:
+            return True
+        raise FileNotFoundError(filepath)
+    mocker.patch('os.path.isfile', side_effect=_mock_isfile)
+
+
+@pytest.fixture
+def create_test_example(create_test_tag):
+    def _create_test_example(
+        with_kafka=False,
+        tag_meta: Optional[Dict[str, Any]] = None, **example_meta
+    ) -> Example:
+        if tag_meta is None:
+            tag_meta = {}
+        meta: Dict[str, Any] = dict(
+            sdk=SdkEnum.JAVA,
+            pipeline_id="MOCK_PIPELINE_ID",
+            filepath="MOCK_FILEPATH",
+            code="MOCK_CODE",
+            output="MOCK_OUTPUT",
+            url_vcs="https://github.com/proj/MOCK_LINK",
+            context_line=132,
+        )
+        meta.update(**example_meta)
+        return Example(
+            tag=create_test_tag(with_kafka=with_kafka, **tag_meta),
+            **meta,
+        )
+
+    return _create_test_example
+
+
+@pytest.fixture
+def create_test_tag():
+    def _create_test_tag(with_kafka=False, **tag_meta) -> Tag:
+        meta = {
+            "name": "MOCK_NAME",
+            "description": "MOCK_DESCRIPTION",
+            "complexity": "ADVANCED",
+            "multifile": False,
+            "categories": ["Testing", "Schemas"],
+            "pipeline_options": "--MOCK_OPTION MOCK_OPTION_VALUE",
+        }
+        if with_kafka:
+            meta.update(
+                emulators=[
+                    {"type": "kafka", "topic": {"id": "topic1", "source_dataset": "dataset_id_1"}}
+                ],
+                datasets={"dataset_id_1": {"format": "avro", "location": "local"}},
+            )
+        for k, v in tag_meta.items():
+            if v is None:
+                meta.pop(k, None)
+            else:
+                meta[k] = v
+        return Tag(
+            line_start=10,
+            line_finish=20,
+            context_line=30,
+            **meta,
+        )
+
+    return _create_test_tag
diff --git a/playground/infrastructure/datastore_client.py b/playground/infrastructure/datastore_client.py
index f46b5beeea0..77743b1299d 100644
--- a/playground/infrastructure/datastore_client.py
+++ b/playground/infrastructure/datastore_client.py
@@ -29,18 +29,13 @@ from tqdm import tqdm
 
 import config
 from config import Config, Origin, PrecompiledExample, DatastoreProps
-from helper import Example
+from models import Example, SdkEnum, Dataset, Emulator
 
-from api.v1.api_pb2 import Sdk, PrecompiledObjectType
+from api.v1 import api_pb2
 
 
 class DatastoreException(Exception):
-    def __init__(self, error: str):
-        super().__init__()
-        self.msg = error
-
-    def __str__(self):
-        return self.msg
+    pass
 
 
 # Google Datastore documentation link: https://cloud.google.com/datastore/docs/concepts
@@ -64,7 +59,7 @@ class DatastoreClient:
             raise KeyError("SDK_CONFIG environment variable should be specified in os")
 
     def save_to_cloud_datastore(
-        self, examples_from_rep: List[Example], sdk: Sdk, origin: Origin
+        self, examples_from_rep: List[Example], sdk: SdkEnum, origin: Origin
     ):
         """
         Save examples, output and meta to datastore
@@ -86,8 +81,10 @@ class DatastoreClient:
         # loop through every example to save them to the Cloud Datastore
         for example in tqdm(examples_from_rep):
             with self._datastore_client.transaction():
-                sdk_key = self._get_key(DatastoreProps.SDK_KIND, Sdk.Name(example.sdk))
-                example_id = self._make_example_id(origin, sdk, example.name)
+                sdk_key = self._get_key(
+                    DatastoreProps.SDK_KIND, api_pb2.Sdk.Name(example.sdk)
+                )
+                example_id = self._make_example_id(origin, sdk, example.tag.name)
 
                 self._datastore_client.put(
                     self._to_example_entity(
@@ -104,18 +101,13 @@ class DatastoreClient:
                 )
                 # only single-file examples are supported by now
                 self._datastore_client.put(self._to_file_entity(example, example_id))
-                if example.datasets and example.emulators:
-                    dataset = example.datasets[0]
-                    emulator = example.emulators[0]
-                    file_name = f"{dataset.name}.{dataset.format}"
-                    dataset = self._to_dataset_entity(file_name, dataset.path)
-                    self._datastore_client.put(dataset)
-
-                    dataset_nested_entity = self._to_dataset_nested_entity(
-                        file_name, example_id, emulator
+                if example.tag.datasets:
+                    self._datastore_client.put_multi(
+                        [
+                            self._to_dataset_entity(dataset_id, dataset.file_name)
+                            for dataset_id, dataset in example.tag.datasets.items()
+                        ]
                     )
-                    snippet_datasets = [dataset_nested_entity]
-                    snippet.update({"datasets": snippet_datasets})
 
                 updated_example_ids.add(example_id)
 
@@ -195,13 +187,13 @@ class DatastoreClient:
         schema_names.sort(reverse=True)
         return self._get_key(DatastoreProps.SCHEMA_KIND, schema_names[0])
 
-    def _get_all_examples(self, sdk: Sdk, origin: Origin) -> List[str]:
+    def _get_all_examples(self, sdk: SdkEnum, origin: Origin) -> List[str]:
         examples_ids_before_updating = []
         all_examples_query = self._datastore_client.query(
             kind=DatastoreProps.EXAMPLE_KIND
         )
         all_examples_query.add_filter(
-            "sdk", "=", self._get_key(DatastoreProps.SDK_KIND, Sdk.Name(sdk))
+            "sdk", "=", self._get_key(DatastoreProps.SDK_KIND, api_pb2.Sdk.Name(sdk))
         )
         all_examples_query.add_filter("origin", "=", origin)
         all_examples_query.keys_only()
@@ -222,20 +214,20 @@ class DatastoreClient:
     def _get_dataset_key(self, dataset_id: str):
         return self._get_key(DatastoreProps.DATASET_KIND, dataset_id)
 
-    def _make_example_id(self, origin: Origin, sdk: Sdk, name: str):
+    def _make_example_id(self, origin: Origin, sdk: SdkEnum, name: str):
         # ToB examples (and other related entities: snippets, files, pc_objects)
         # have origin prefix in a key
         if origin == Origin.TB_EXAMPLES:
             return config.DatastoreProps.KEY_NAME_DELIMITER.join(
                 [
                     origin,
-                    Sdk.Name(sdk),
+                    api_pb2.Sdk.Name(sdk),
                     name,
                 ]
             )
         return config.DatastoreProps.KEY_NAME_DELIMITER.join(
             [
-                Sdk.Name(sdk),
+                api_pb2.Sdk.Name(sdk),
                 name,
             ]
         )
@@ -268,9 +260,11 @@ class DatastoreClient:
                 "origin": origin,
                 "numberOfFiles": 1,
                 "schVer": schema_key,
-                "complexity": f"COMPLEXITY_{example.complexity}",
+                "complexity": f"COMPLEXITY_{example.tag.complexity}",
             }
         )
+        if example.tag.datasets:
+            snippet_entity.update({"datasets": self._snippet_datasets(example)})
         return snippet_entity
 
     def _get_pipeline_options(self, example: Example):
@@ -290,17 +284,17 @@ class DatastoreClient:
         example_entity = datastore.Entity(self._get_example_key(example_id))
         example_entity.update(
             {
-                "name": example.name,
+                "name": example.tag.name,
                 "sdk": sdk_key,
                 "descr": example.tag.description,
                 "tags": example.tag.tags,
                 "cats": example.tag.categories,
-                "path": example.url_vcs, # keep for backward-compatibity, to be removed
-                "type": PrecompiledObjectType.Name(example.type),
+                "path": example.url_vcs,  # keep for backward-compatibity, to be removed
+                "type": api_pb2.PrecompiledObjectType.Name(example.type),
                 "origin": origin,
                 "schVer": schema_key,
                 "urlVCS": example.url_vcs,
-                "urlNotebook": example.url_notebook,
+                "urlNotebook": example.tag.url_notebook,
             }
         )
         return example_entity
@@ -349,7 +343,9 @@ class DatastoreClient:
         )
         file_entity.update(
             {
-                "name": self._get_file_name_with_extension(example.name, example.sdk),
+                "name": self._get_file_name_with_extension(
+                    example.tag.name, example.sdk
+                ),
                 "content": example.code,
                 "cntxLine": example.tag.context_line,
                 "isMain": True,
@@ -357,29 +353,32 @@ class DatastoreClient:
         )
         return file_entity
 
-    def _to_dataset_entity(self, dataset_id: str, path: str):
+    def _to_dataset_entity(self, dataset_id: str, file_name: str):
         dataset_entity = datastore.Entity(self._get_dataset_key(dataset_id))
-        dataset_entity.update({"path": path})
+        dataset_entity.update({"path": file_name})
         return dataset_entity
 
-    def _to_dataset_nested_entity(
-        self, dataset_id: str, example_id: str, emulator: config.Emulator
-    ):
-        emulator_config_as_dict = {"topic": emulator.topic.id}
-        emulator_config_as_json = json.dumps(emulator_config_as_dict)
-        nested_entity = datastore.Entity(
-            self._get_snippet_key(example_id), exclude_from_indexes=("config",)
-        )
+    def _to_dataset_nested_entity(self, dataset_id: str, emulator: Emulator):
+        nested_entity = datastore.Entity()
         nested_entity.update(
             {
                 "dataset": self._get_dataset_key(dataset_id),
-                "emulator": emulator.name,
-                "config": emulator_config_as_json,
+                "emulator": emulator.type,
+                "config": json.dumps({"topic": emulator.topic.id})
             }
         )
         return nested_entity
 
-    def _get_file_name_with_extension(self, name: str, sdk: Sdk) -> str:
+    def _snippet_datasets(self, example: Example) -> List[datastore.Entity]:
+        datasets = []
+        for emulator in example.tag.emulators:
+            dataset_nested_entity = self._to_dataset_nested_entity(
+                emulator.topic.source_dataset, emulator
+            )
+            datasets.append(dataset_nested_entity)
+        return datasets
+
+    def _get_file_name_with_extension(self, name: str, sdk: int) -> str:
         filename, file_extension = os.path.splitext(name)
         if len(file_extension) == 0:
             extension = Config.SDK_TO_EXTENSION[sdk]
diff --git a/playground/infrastructure/grpc_client.py b/playground/infrastructure/grpc_client.py
index 4f835d12071..b78b94beeb0 100644
--- a/playground/infrastructure/grpc_client.py
+++ b/playground/infrastructure/grpc_client.py
@@ -26,6 +26,7 @@ import sonora.aio
 
 from api.v1 import api_pb2_grpc, api_pb2
 from config import Config
+from models import SdkEnum
 
 
 class GRPCClient:
@@ -50,7 +51,7 @@ class GRPCClient:
     async def __aexit__(self, exc_type, exc_val, exc_tb):
         await self._channel.__aexit__(exc_type, exc_val, exc_tb)
 
-    async def run_code(self, code: str, sdk: api_pb2.Sdk, pipeline_options: str, datasets: List[api_pb2.Dataset]) -> str:
+    async def run_code(self, code: str, sdk: SdkEnum, pipeline_options: str, datasets: List[api_pb2.Dataset]) -> str:
         """
         Run example by his code and SDK
 
diff --git a/playground/infrastructure/helper.py b/playground/infrastructure/helper.py
index e41c3d960d0..d3941f3083e 100644
--- a/playground/infrastructure/helper.py
+++ b/playground/infrastructure/helper.py
@@ -19,76 +19,32 @@ Common helper module for CI/CD Steps
 import asyncio
 import logging
 import os
-from collections import namedtuple
-from dataclasses import dataclass, fields, field
-import urllib3
+import urllib.parse
 from pathlib import PurePath
 from typing import List, Optional, Dict
 from api.v1 import api_pb2
 
 from tqdm.asyncio import tqdm
 import yaml
-from yaml import YAMLError
-
-from api.v1.api_pb2 import SDK_UNSPECIFIED, STATUS_UNSPECIFIED, Sdk, \
-    STATUS_VALIDATING, STATUS_PREPARING, \
-    STATUS_COMPILING, STATUS_EXECUTING, PRECOMPILED_OBJECT_TYPE_UNIT_TEST, \
-    PRECOMPILED_OBJECT_TYPE_KATA, PRECOMPILED_OBJECT_TYPE_UNSPECIFIED, \
-    PRECOMPILED_OBJECT_TYPE_EXAMPLE, PrecompiledObjectType
-from config import Config, TagFields, PrecompiledExampleType, OptionalTagFields, Dataset, Emulator
+
+from api.v1.api_pb2 import (
+    SDK_UNSPECIFIED,
+    STATUS_UNSPECIFIED,
+    Sdk,
+    STATUS_VALIDATING,
+    STATUS_PREPARING,
+    STATUS_COMPILING,
+    STATUS_EXECUTING,
+    PRECOMPILED_OBJECT_TYPE_UNIT_TEST,
+    PRECOMPILED_OBJECT_TYPE_KATA,
+    PRECOMPILED_OBJECT_TYPE_UNSPECIFIED,
+    PRECOMPILED_OBJECT_TYPE_EXAMPLE,
+    PrecompiledObjectType,
+)
+from config import Config, TagFields, PrecompiledExampleType
 from grpc_client import GRPCClient
 
-# TODO replace with dataclass
-Tag = namedtuple(
-    "Tag",
-    [
-        TagFields.name,
-        TagFields.complexity,
-        TagFields.emulators,
-        TagFields.datasets,
-        TagFields.description,
-        TagFields.multifile,
-        TagFields.categories,
-        TagFields.pipeline_options,
-        TagFields.default_example,
-        TagFields.context_line,
-        TagFields.tags,
-        TagFields.url_notebook,
-    ],
-    defaults=(None, None, None, None, None, False, None, None, False, None, None, None))
-
-
-@dataclass
-class Example:
-    """
-    Class which contains all information about beam example
-    """
-    name: str
-    complexity: str
-    sdk: SDK_UNSPECIFIED
-    filepath: str
-    code: str
-    status: STATUS_UNSPECIFIED
-    tag: Tag
-    url_vcs: str
-    url_notebook: Optional[str] = None
-    logs: str = ""
-    type: PrecompiledObjectType = PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
-    pipeline_id: str = ""
-    output: str = ""
-    compile_output: str = ""
-    graph: str = ""
-    datasets: List[Dataset] = field(default_factory=list)
-    emulators: List[Emulator] = field(default_factory=list)
-
-
-@dataclass
-class ExampleTag:
-    """
-    Class which contains all information about beam playground tag
-    """
-    tag_as_dict: Dict[str, str]
-    tag_as_string: str
+from models import Example, Tag, SdkEnum, Dataset
 
 
 def _check_no_nested(subdirs: List[str]):
@@ -104,8 +60,7 @@ def _check_no_nested(subdirs: List[str]):
             raise ValueError(f"{dir2} is a subdirectory of {dir1}")
 
 
-def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[str],
-                  sdk: Sdk) -> List[Example]:
+def find_examples(root_dir: str, subdirs: List[str], sdk: SdkEnum) -> List[Example]:
     """
     Find and return beam examples.
 
@@ -128,13 +83,12 @@ def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[
     Args:
         root_dir: project root dir
         subdirs: sub-directories where to search examples.
-        supported_categories: list of supported categories.
         sdk: sdk that using to find examples for the specific sdk.
 
     Returns:
         List of Examples.
     """
-    has_error = False
+    has_errors = False
     examples = []
     _check_no_nested(subdirs)
     for subdir in subdirs:
@@ -143,21 +97,26 @@ def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[
         for root, _, files in os.walk(subdir):
             for filename in files:
                 filepath = os.path.join(root, filename)
-                error_during_check_file = _check_file(
-                    examples=examples,
-                    filename=filename,
-                    filepath=filepath,
-                    supported_categories=supported_categories,
-                    sdk=sdk)
-                has_error = has_error or error_during_check_file
-    if has_error:
+                try:
+                    example = _load_example(
+                        filename=filename, filepath=filepath, sdk=sdk
+                    )
+                    if example is not None:
+                        examples.append(example)
+                except Exception:
+                    logging.exception("error loading example at %s", filepath)
+                    has_errors = True
+    if has_errors:
         raise ValueError(
             "Some of the beam examples contain beam playground tag with "
-            "an incorrect format")
+            "an incorrect format"
+        )
     return examples
 
 
-async def get_statuses(client: GRPCClient, examples: List[Example], concurrency: int = 10):
+async def get_statuses(
+    client: GRPCClient, examples: List[Example], concurrency: int = 10
+):
     """
     Receive status and update example.status and example.pipeline_id for
     each example
@@ -187,7 +146,7 @@ async def get_statuses(client: GRPCClient, examples: List[Example], concurrency:
     await tqdm.gather(*tasks)
 
 
-def get_tag(filepath) -> Optional[ExampleTag]:
+def get_tag(filepath) -> Optional[Tag]:
     """
     Parse file by filepath and find beam tag
 
@@ -195,97 +154,102 @@ def get_tag(filepath) -> Optional[ExampleTag]:
         filepath: path of the file
 
     Returns:
-        If file contains tag, returns tag as a map.
+        If file contains tag, returns Tag object
         If file doesn't contain tag, returns None
     """
-    add_to_yaml = False
-    yaml_string = ""
-    tag_string = ""
-
     with open(filepath, encoding="utf-8") as parsed_file:
         lines = parsed_file.readlines()
 
-    for line in lines:
-        formatted_line = line.replace("//", "").replace("#",
-                                                        "").replace("\t", "    ")
-        if add_to_yaml is False:
-            if formatted_line.lstrip() == Config.BEAM_PLAYGROUND_TITLE:
-                add_to_yaml = True
-                yaml_string += formatted_line.lstrip()
-                tag_string += line
-        else:
-            yaml_with_new_string = yaml_string + formatted_line
-            try:
-                yaml.load(yaml_with_new_string, Loader=yaml.SafeLoader)
-                yaml_string += formatted_line
-                tag_string += line
-            except YAMLError:
-                break
-
-    if add_to_yaml:
-        tag_object = yaml.load(yaml_string, Loader=yaml.SafeLoader)
-        return ExampleTag(tag_object[Config.BEAM_PLAYGROUND], tag_string)
+    line_start: Optional[int] = None
+    line_finish: Optional[int] = None
+    tag_prefix: Optional[str] = ""
+    for idx, line in enumerate(lines):
+        if line_start is None and line.endswith(Config.BEAM_PLAYGROUND_TITLE):
+            line_start = idx
+            prefix_len = len(line) - len(Config.BEAM_PLAYGROUND_TITLE)
+            tag_prefix = line[:prefix_len]
+        elif line_start and not line.startswith(tag_prefix):
+            line_finish = idx
+            break
 
-    return None
+    if not line_start or not line_finish:
+        return None
 
+    embdedded_yaml_content = "".join(
+        line[len(tag_prefix) :] for line in lines[line_start:line_finish]
+    )
+    yml = yaml.load(embdedded_yaml_content, Loader=yaml.SafeLoader)
+    return Tag(
+        line_start=line_start, line_finish=line_finish, **yml[Config.BEAM_PLAYGROUND]
+    )
 
-def _check_file(examples, filename, filepath, supported_categories, sdk: Sdk):
+
+def _load_example(filename, filepath, sdk: SdkEnum) -> Optional[Example]:
     """
     Check file by filepath for matching to beam example. If file is beam example,
-    then add it to list of examples
 
     Args:
-        examples: list of examples.
         filename: name of the file.
         filepath: path to the file.
-        supported_categories: list of supported categories.
         sdk: sdk that using to find examples for the specific sdk.
 
     Returns:
-        True if file has beam playground tag with incorrect format.
-        False if file has correct beam playground tag.
-        False if file doesn't contains beam playground tag.
+        If the file is an example, return Example object
+        If it's not, return None
+        In case of error, raise Exception
     """
-    if filepath.endswith("infrastructure/helper.py"):
-        return False
-
-    has_error = False
+    logging.debug("inspecting file %s", filepath)
     extension = filepath.split(os.extsep)[-1]
     if extension == Config.SDK_TO_EXTENSION[sdk]:
+        logging.debug("sdk %s matched extension %s", api_pb2.Sdk.Name(sdk), extension)
         tag = get_tag(filepath)
         if tag is not None:
-            if _validate(tag.tag_as_dict, supported_categories) is False:
-                logging.error(
-                    "%s contains beam playground tag with incorrect format", filepath)
-                has_error = True
-            else:
-                examples.append(_get_example(filepath, filename, tag))
-    return has_error
+            logging.debug("playground-beam tag found")
+            return _get_example(filepath, filename, tag, sdk)
+    return None
 
 
-def get_supported_categories(categories_path: str) -> List[str]:
+# Make load_supported_categories called only once
+# to make testing easier
+_load_supported_categories = False
+
+
+def load_supported_categories(categories_path: str):
     """
-    Return list of supported categories from categories_path file
+    Load the list of supported categories from categories_path file
+    into Tag model config
 
     Args:
         categories_path: path to the file with categories.
-
-    Returns:
-        All supported categories as a list.
     """
+    global _load_supported_categories
+    if _load_supported_categories:
+        return
     with open(categories_path, encoding="utf-8") as supported_categories:
         yaml_object = yaml.load(supported_categories.read(), Loader=yaml.SafeLoader)
-        return yaml_object[TagFields.categories]
 
-def _get_url_vcs(filepath: str):
+    Tag.Config.supported_categories = yaml_object[TagFields.categories]
+    _load_supported_categories = True
+
+
+def _get_content(filepath: str, tag_start_line: int, tag_finish_line) -> str:
+    with open(filepath, encoding="utf-8") as parsed_file:
+        lines = parsed_file.readlines()
+        lines = lines[:tag_start_line] + lines[tag_finish_line:]
+    return "".join(lines)
+
+
+def _get_url_vcs(filepath: str) -> str:
     """
     Construct VCS URL from example's filepath
     """
     root_dir = os.getenv("BEAM_ROOT_DIR", "../..")
     rel_path = os.path.relpath(filepath, root_dir)
-    return "{}/{}".format(Config.URL_VCS_PREFIX, rel_path)
+    url_vcs = "{}/{}".format(Config.URL_VCS_PREFIX, urllib.parse.quote(rel_path))
+    return url_vcs
+
 
-def _get_example(filepath: str, filename: str, tag: ExampleTag) -> Example:
+def _get_example(filepath: str, filename: str, tag: Tag, sdk: int) -> Example:
     """
     Return an Example by filepath and filename.
 
@@ -297,153 +261,16 @@ def _get_example(filepath: str, filename: str, tag: ExampleTag) -> Example:
     Returns:
         Parsed Example object.
     """
-    name = tag.tag_as_dict[TagFields.name]
-    complexity = tag.tag_as_dict[TagFields.complexity]
-    url_notebook = tag.tag_as_dict.get(TagFields.url_notebook)
-    sdk = Config.EXTENSION_TO_SDK[filename.split(os.extsep)[-1]]
-    object_type = _get_object_type(filename, filepath)
-    with open(filepath, encoding="utf-8") as parsed_file:
-        content = parsed_file.read()
-    content = content.replace(tag.tag_as_string, "")
-    tag.tag_as_dict[TagFields.context_line] -= tag.tag_as_string.count("\n")
-
-    example = Example(
-        name=name,
-        complexity=complexity,
-        sdk=sdk,
+    return Example(
+        sdk=SdkEnum(sdk),
+        tag=tag,
         filepath=filepath,
-        code=content,
         status=STATUS_UNSPECIFIED,
-        tag=Tag(**tag.tag_as_dict),
-        type=object_type,
-        url_vcs=_get_url_vcs(filepath),
-        url_notebook=url_notebook,
-        )
-
-    if tag.tag_as_dict.get(TagFields.datasets):
-        datasets_as_dict = tag.tag_as_dict[TagFields.datasets]
-        datasets = []
-        for key in datasets_as_dict:
-            dataset = Dataset.from_dict(datasets_as_dict.get(key))
-            dataset.name = key
-            datasets.append(dataset)
-        example.datasets = datasets
-
-    if tag.tag_as_dict.get(TagFields.emulators):
-        emulators_as_dict = tag.tag_as_dict[TagFields.emulators]
-        emulators = []
-        for key in emulators_as_dict:
-            emulator = Emulator.from_dict(emulators_as_dict.get(key))
-            emulator.name = key
-            emulators.append(emulator)
-        example.emulators = emulators
-
-    validate_example_fields(example)
-    return example
-
-
-def _validate(tag: dict, supported_categories: List[str]) -> bool:
-    """
-    Validate all tag's fields
-
-    Validate that tag contains all required fields and all fields have required
-    format.
-
-    Args:
-        tag: beam tag to validate.
-        supported_categories: list of supported categories.
-
-    Returns:
-        In case tag is valid, True
-        In case tag is not valid, False
-    """
-    valid = True
-    required_tag_fields = {
-        f.default
-        for f in fields(TagFields)
-        if f.default not in {o_f.default
-                             for o_f in fields(OptionalTagFields)}
-    }
-    # check that all fields exist and they have no empty value
-    for field in required_tag_fields:
-        if field not in tag:
-            logging.error(
-                "tag doesn't contain %s field: %s \n"
-                "Please, check that this field exists in the beam playground tag."
-                "If you are sure that this field exists in the tag"
-                " check the format of indenting.",
-                field,
-                tag)
-            valid = False
-        if valid is True:
-            value = tag.get(field)
-            if (value == "" or value is None) and field != TagFields.pipeline_options:
-                logging.error(
-                    "tag's value is incorrect: %s\n%s field can not be empty.",
-                    tag,
-                    field)
-                valid = False
-
-    if valid is False:
-        return valid
-
-    # check that multifile's value is boolean
-    multifile = tag.get(TagFields.multifile)
-    if str(multifile).lower() not in ["true", "false"]:
-        logging.error(
-            "tag's field multifile is incorrect: %s \n"
-            "multifile variable should be boolean format, but tag contains: %s",
-            tag,
-            multifile)
-        valid = False
-
-    # check that categories' value is a list of supported categories
-    categories = tag.get(TagFields.categories)
-    if not isinstance(categories, list):
-        logging.error(
-            "tag's field categories is incorrect: %s \n"
-            "categories variable should be list format, but tag contains: %s",
-            tag,
-            type(categories))
-        valid = False
-    else:
-        for category in categories:
-            if category not in supported_categories:
-                logging.error(
-                    "tag contains unsupported category: %s \n"
-                    "If you are sure that %s category should be placed in "
-                    "Beam Playground, you can add it to the "
-                    "`playground/categories.yaml` file",
-                    category,
-                    category)
-                valid = False
-
-    # check that context line's value is integer
-    context_line = tag.get(TagFields.context_line)
-    if not isinstance(context_line, int):
-        logging.error(
-            "Tag's field context_line is incorrect: %s \n"
-            "context_line variable should be integer format, "
-            "but tag contains: %s",
-            tag,
-            context_line)
-        valid = False
-    return valid
-
-
-def _get_name(filename: str) -> str:
-    """
-    Return name of the example by his filepath.
-
-    Get name of the example by his filename.
-
-    Args:
-        filename: filename of the beam example file.
-
-    Returns:
-        example's name.
-    """
-    return filename.split(os.extsep)[0]
+        type=_get_object_type(filename, filepath),
+        code=_get_content(filepath, tag.line_start, tag.line_finish),
+        url_vcs=_get_url_vcs(filepath),  # type: ignore
+        context_line=tag.context_line - (tag.line_finish - tag.line_start),
+    )
 
 
 async def _update_example_status(example: Example, client: GRPCClient):
@@ -460,28 +287,31 @@ async def _update_example_status(example: Example, client: GRPCClient):
         example: beam example for processing and updating status and pipeline_id.
         client: client to send requests to the server.
     """
-    datasets = []
-    if example.datasets and example.emulators:
-        dataset_tag = example.datasets[0]
-        emulator_tag = example.emulators[0]
-        options = {
-            "topic": emulator_tag.topic.id
-        }
-        dataset = api_pb2.Dataset(
-            type=api_pb2.EmulatorType.Value(f"EMULATOR_TYPE_{emulator_tag.name.upper()}"),
-            options=options,
-            dataset_path=dataset_tag.path
+    datasets: List[api_pb2.Dataset] = []
+    for emulator in example.tag.emulators:
+        dataset: Dataset = example.tag.datasets[emulator.topic.source_dataset]
+
+        datasets.append(
+            api_pb2.Dataset(
+                type=api_pb2.EmulatorType.Value(
+                    f"EMULATOR_TYPE_{emulator.type.upper()}"
+                ),
+                options={"topic": emulator.topic.id},
+                dataset_path=dataset.file_name,
+            )
         )
-        datasets.append(dataset)
 
     pipeline_id = await client.run_code(
-        example.code, example.sdk, example.tag.pipeline_options, datasets)
+        example.code, example.sdk, example.tag.pipeline_options, datasets
+    )
     example.pipeline_id = pipeline_id
     status = await client.check_status(pipeline_id)
-    while status in [STATUS_VALIDATING,
-                     STATUS_PREPARING,
-                     STATUS_COMPILING,
-                     STATUS_EXECUTING]:
+    while status in [
+        STATUS_VALIDATING,
+        STATUS_PREPARING,
+        STATUS_COMPILING,
+        STATUS_EXECUTING,
+    ]:
         await asyncio.sleep(Config.PAUSE_DELAY)
         status = await client.check_status(pipeline_id)
     example.status = status
@@ -509,67 +339,20 @@ def _get_object_type(filename, filepath):
     return object_type
 
 
+class DuplicatesError(Exception):
+    pass
+
+
 def validate_examples_for_duplicates_by_name(examples: List[Example]):
     """
     Validate examples for duplicates by example name to avoid duplicates in the Cloud Datastore
     :param examples: examples from the repository for saving to the Cloud Datastore
     """
-    duplicates = {str: Example}
+    duplicates: Dict[str, Example] = {}
     for example in examples:
-        if example.name not in duplicates.keys():
-            duplicates[example.name] = example
+        if example.tag.name not in duplicates.keys():
+            duplicates[example.tag.name] = example
         else:
-            err_msg = f"Examples have duplicate names.\nDuplicates: \n - path #1: {duplicates[example.name].filepath} \n - path #2: {example.filepath}"
+            err_msg = f"Examples have duplicate names.\nDuplicates: \n - path #1: {duplicates[example.tag.name].filepath} \n - path #2: {example.filepath}"
             logging.error(err_msg)
-            raise ValidationException(err_msg)
-
-
-def validate_example_fields(example: Example):
-    """
-    Validate example fields to avoid side effects in the next step
-    :param example: example from the repository
-    """
-    if example.filepath == "":
-        _log_and_raise_validation_err(f"Example doesn't have a file path field. Example: {example}")
-    if example.name == "":
-        _log_and_raise_validation_err(f"Example doesn't have a name field. Path: {example.filepath}")
-    if example.sdk == SDK_UNSPECIFIED:
-        _log_and_raise_validation_err(f"Example doesn't have a sdk field. Path: {example.filepath}")
-    if example.code == "":
-        _log_and_raise_validation_err(f"Example doesn't have a code field. Path: {example.filepath}")
-    if example.url_vcs == "":
-        _log_and_raise_validation_err(f"Example doesn't have a url_vcs field. Path: {example.filepath}")
-    if example.complexity == "":
-        _log_and_raise_validation_err(f"Example doesn't have a complexity field. Path: {example.filepath}")
-    datasets = example.datasets
-    emulators = example.emulators
-
-    if datasets and not emulators:
-        _log_and_raise_validation_err(f"Example has a datasets field but an emulators field not found. Path: {example.filepath}")
-    if emulators and not datasets:
-        _log_and_raise_validation_err(f"Example has an emulators field but a datasets field not found. Path: {example.filepath}")
-
-    dataset_names = []
-    for dataset in datasets:
-        location = dataset.location
-        dataset_format = dataset.format
-        if not location or not dataset_format or location not in ["local"] or dataset_format not in ["json", "avro"]:
-            _log_and_raise_validation_err(f"Example has invalid dataset value. Path: {example.filepath}")
-        dataset_names.append(dataset.name)
-    for emulator in emulators:
-        if not (emulator.name == "kafka" and emulator.topic.dataset in dataset_names):
-            _log_and_raise_validation_err(f"Example has invalid emulator value. Path: {example.filepath}")
-
-
-def _log_and_raise_validation_err(msg: str):
-    logging.error(msg)
-    raise ValidationException(msg)
-
-
-class ValidationException(Exception):
-    def __init__(self, error: str):
-        super().__init__()
-        self.msg = error
-
-    def __str__(self):
-        return self.msg
+            raise DuplicatesError(err_msg)
diff --git a/playground/infrastructure/models.py b/playground/infrastructure/models.py
new file mode 100644
index 00000000000..5479883b83f
--- /dev/null
+++ b/playground/infrastructure/models.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import os.path
+
+from enum import Enum, IntEnum
+from typing import List, Optional, Dict
+from api.v1 import api_pb2
+
+from pydantic import (
+    BaseModel,
+    Extra,
+    Field,
+    validator,
+    root_validator,
+    HttpUrl
+)
+
+from config import RepoProps
+
+class ComplexityEnum(str, Enum):
+    BASIC = "BASIC"
+    MEDIUM = "MEDIUM"
+    ADVANCED = "ADVANCED"
+
+
+class DatasetFormat(str, Enum):
+    JSON = "json"
+    AVRO = "avro"
+
+
+class DatasetLocation(str, Enum):
+    LOCAL = "local"
+    GCS = "GCS"
+
+
+class Dataset(BaseModel):
+    format: DatasetFormat
+    location: DatasetLocation
+    file_name: str = ""
+
+
+class Topic(BaseModel):
+    id: str
+    source_dataset: str
+
+
+class EmulatorType(str, Enum):
+    KAFKA = "kafka"
+
+
+class Emulator(BaseModel):
+    type: EmulatorType
+    topic: Topic
+
+
+class Tag(BaseModel):
+    """
+    Tag represents the beam-playground embedded yaml content
+    """
+    line_start: int
+    line_finish: int
+    context_line: int
+    name: str = Field(..., min_length=1)
+    complexity: ComplexityEnum
+    description: str
+    categories: List[str]
+    pipeline_options: str = ""
+    datasets: Dict[str, Dataset] = {}
+    emulators: List[Emulator] = []
+    multifile: bool = False
+    default_example: bool = False
+    tags: List[str] = []
+    url_notebook: Optional[HttpUrl] = None
+
+    class Config:
+        supported_categories = []
+        extra = Extra.forbid
+
+    @root_validator(skip_on_failure=True)
+    def lines_order(cls, values):
+        assert (
+            0 < values["line_start"] < values["line_finish"] <= values["context_line"]
+        ), f"line ordering error: {values}"
+        return values
+
+    @root_validator(skip_on_failure=True)
+    def datasets_with_emulators(csl, values):
+        if values.get("datasets") and not values.get("emulators"):
+            raise ValueError("datasets w/o emulators")
+        return values
+
+    @validator("emulators", each_item=True)
+    def dataset_defined(cls, v, values, **kwargs):
+        if "datasets" not in values:
+            raise ValueError("datasets not defined")
+        for dataset_id in values["datasets"]:
+            if dataset_id == v.topic.source_dataset:
+                return v
+        raise ValueError(
+            f"Emulator topic {v.topic.id} has undefined dataset {v.topic.source_dataset}"
+        )
+
+    @validator('datasets')
+    def dataset_file_name(cls, datasets):
+        for dataset_id, dataset in datasets.items():
+            dataset.file_name = f"{dataset_id}.{dataset.format}"
+            if dataset.location == DatasetLocation.LOCAL:
+                dataset_path = os.path.join(RepoProps.REPO_DATASETS_PATH, dataset.file_name)
+                if not os.path.isfile(dataset_path):
+                    logging.error("File not found at the specified path: %s", dataset_path)
+                    raise FileNotFoundError
+        return datasets
+
+
+    @validator("categories", each_item=True)
+    def category_supported(cls, v, values, config, **kwargs):
+        if v not in config.supported_categories:
+            raise ValueError(f"Category {v} not in {config.supported_categories}")
+        return v
+
+
+
+class SdkEnum(IntEnum):
+    JAVA = api_pb2.SDK_JAVA
+    GO = api_pb2.SDK_GO
+    PYTHON = api_pb2.SDK_PYTHON
+    SCIO = api_pb2.SDK_SCIO
+
+
+def StringToSdkEnum(s: str) -> SdkEnum:
+    parsed: int = api_pb2.Sdk.Value(s)
+    return SdkEnum(parsed)
+
+
+class Example(BaseModel):
+    """
+    Class which contains all information about beam example
+    """
+
+    sdk: SdkEnum
+    tag: Tag
+    filepath: str = Field(..., min_length=1)
+    code: str = Field(..., min_length=1)
+    url_vcs: HttpUrl
+    type: int = api_pb2.PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
+    context_line: int = Field(..., gt=0)
+
+    status: int = api_pb2.STATUS_UNSPECIFIED
+    logs: str = ""
+    pipeline_id: str = ""
+    output: str = ""
+    compile_output: str = ""
+    graph: str = ""
diff --git a/playground/infrastructure/repository.py b/playground/infrastructure/repository.py
deleted file mode 100644
index 639f202de47..00000000000
--- a/playground/infrastructure/repository.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Module contains methods to work with repository
-"""
-import logging
-import os
-from typing import List
-
-from config import RepoProps
-
-from helper import Example
-
-
-def set_dataset_path_for_examples(examples: List[Example]):
-    for example in examples:
-        for dataset in example.datasets:
-            file_name = f"{dataset.name}.{dataset.format}"
-            dataset_path = os.path.join(RepoProps.REPO_DATASETS_PATH, file_name)
-            if not os.path.isfile(dataset_path):
-                logging.error("File not found at the specified path: %s", dataset_path)
-                raise FileNotFoundError
-            dataset.path = file_name
diff --git a/playground/infrastructure/requirements.txt b/playground/infrastructure/requirements.txt
index 96f5ac3a7b3..16afc877c7f 100644
--- a/playground/infrastructure/requirements.txt
+++ b/playground/infrastructure/requirements.txt
@@ -26,4 +26,4 @@ PyYAML==6.0
 tqdm~=4.62.3
 google-cloud-datastore==2.7.1
 sonora==0.2.2
-dataclasses-json==0.5.7
+pydantic==1.10.2
diff --git a/playground/infrastructure/test_cd_helper.py b/playground/infrastructure/test_cd_helper.py
deleted file mode 100644
index 74893c1c477..00000000000
--- a/playground/infrastructure/test_cd_helper.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import mock
-from config import Origin
-
-import test_utils
-from api.v1.api_pb2 import SDK_JAVA
-from cd_helper import CDHelper
-
-"""
-Unit tests for the CD helper
-"""
-
-
-class TestCDHelper(unittest.TestCase):
-
-    @mock.patch("cd_helper.CDHelper._save_to_datastore")
-    @mock.patch("cd_helper.CDHelper._get_outputs")
-    def test_save_examples(self, mock_get_outputs, mock_save_to_datastore):
-        examples = test_utils._get_examples(1)
-        helper = CDHelper(SDK_JAVA, Origin.PG_EXAMPLES)
-        helper.save_examples(examples)
-        mock_get_outputs.assert_called_once()
-        mock_save_to_datastore.assert_called_once_with(examples)
diff --git a/playground/infrastructure/test_ci_cd.py b/playground/infrastructure/test_ci_cd.py
index f42699e1927..5f5b880ba02 100644
--- a/playground/infrastructure/test_ci_cd.py
+++ b/playground/infrastructure/test_ci_cd.py
@@ -17,20 +17,33 @@ import mock
 import pytest
 
 from api.v1.api_pb2 import SDK_JAVA
-from ci_cd import _ci_step, _cd_step, _check_envs
+from ci_cd import _check_envs, _run_ci_cd
 from config import Origin
 
 
-@mock.patch("ci_helper.CIHelper.verify_examples")
-def test_ci_step(mock_verify_examples):
-    _ci_step([], Origin.PG_EXAMPLES)
-    mock_verify_examples.assert_called_once_with([], Origin.PG_EXAMPLES)
-
-
-@mock.patch("cd_helper.CDHelper.save_examples")
-def test_cd_step(mock_save_examples):
-    _cd_step([], SDK_JAVA, Origin.PG_EXAMPLES)
-    mock_save_examples.assert_called_once_with([])
+@pytest.mark.parametrize("step", ["CI", "CD"])
+@mock.patch("ci_cd.DatastoreClient")
+@mock.patch("ci_cd.find_examples")
+@mock.patch("verify.Verifier._run_and_verify")
+def test_ci_step(
+    mock_run_and_verify, mock_find_examples, mock_datastore, create_test_example, step
+):
+    mock_find_examples.return_value = [
+        create_test_example(tag_meta=dict(name="Default", default_example=True)),
+        create_test_example(tag_meta=dict(name="Single", multifile=False)),
+        create_test_example(tag_meta=dict(name="Multi", multifile=True)),
+    ]
+    _run_ci_cd(
+        step,
+        "SDK_JAVA",
+        Origin.PG_EXAMPLES,
+        [
+            "../../examples",
+        ],
+    )
+    mock_run_and_verify.assert_called_once()
+    if step == "CD":
+        mock_datastore.assert_called_once()
 
 
 def test__check_envs():
diff --git a/playground/infrastructure/test_ci_helper.py b/playground/infrastructure/test_ci_helper.py
deleted file mode 100644
index 4b9815622f9..00000000000
--- a/playground/infrastructure/test_ci_helper.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import copy
-import uuid
-
-import mock
-import pytest
-
-from api.v1.api_pb2 import SDK_JAVA, STATUS_FINISHED, STATUS_ERROR, \
-    STATUS_VALIDATION_ERROR, STATUS_PREPARATION_ERROR, STATUS_RUN_TIMEOUT, \
-    STATUS_COMPILE_ERROR, STATUS_RUN_ERROR
-from ci_helper import CIHelper, VerifyException
-from config import Origin
-from helper import Example, Tag
-
-
-@pytest.mark.asyncio
-@mock.patch("ci_helper.CIHelper._verify_examples")
-@mock.patch("ci_helper.get_statuses")
-async def test_verify_examples(mock_get_statuses, mock_verify_examples):
-    helper = CIHelper()
-    await helper.verify_examples([], Origin.PG_EXAMPLES)
-
-    mock_get_statuses.assert_called_once_with(mock.ANY, [])
-    mock_verify_examples.assert_called_once_with(mock.ANY, [], Origin.PG_EXAMPLES)
-
-
-@pytest.mark.asyncio
-async def test__verify_examples():
-    helper = CIHelper()
-    object_meta = {
-        "name": "name",
-        "description": "description",
-        "multifile": False,
-        "categories": ["category-1", "category-2"],
-        "pipeline_options": "--option option",
-        "default_example": False
-    }
-    object_meta_def_ex = copy.copy(object_meta)
-    object_meta_def_ex["default_example"] = True
-    pipeline_id = str(uuid.uuid4())
-    default_example = Example(
-        name="name",
-        complexity="MEDIUM",
-        pipeline_id=pipeline_id,
-        sdk=SDK_JAVA,
-        filepath="filepath",
-        code="code_of_example",
-        output="output_of_example",
-        status=STATUS_FINISHED,
-        tag=Tag(**object_meta_def_ex),
-        url_vcs="link")
-    finished_example = Example(
-        name="name",
-        complexity="MEDIUM",
-        pipeline_id=pipeline_id,
-        sdk=SDK_JAVA,
-        filepath="filepath",
-        code="code_of_example",
-        output="output_of_example",
-        status=STATUS_FINISHED,
-        tag=Tag(**object_meta),
-        url_vcs="link",
-        url_notebook="notebook_link")
-    examples_without_def_ex = [
-        finished_example,
-        finished_example,
-    ]
-    examples_with_several_def_ex = [
-        default_example,
-        default_example,
-    ]
-    examples_without_errors = [
-        default_example,
-        finished_example,
-    ]
-    examples_with_errors = [
-        Example(
-            name="name",
-            complexity="MEDIUM",
-            pipeline_id=pipeline_id,
-            sdk=SDK_JAVA,
-            filepath="filepath",
-            code="code_of_example",
-            output="output_of_example",
-            status=STATUS_VALIDATION_ERROR,
-            tag=Tag(**object_meta_def_ex),
-            url_vcs="link"),
-        Example(
-            name="name",
-            complexity="MEDIUM",
-            pipeline_id=pipeline_id,
-            sdk=SDK_JAVA,
-            filepath="filepath",
-            code="code_of_example",
-            output="output_of_example",
-            status=STATUS_ERROR,
-            tag=Tag(**object_meta),
-            url_vcs="link"),
-        Example(
-            name="name",
-            complexity="MEDIUM",
-            pipeline_id=pipeline_id,
-            sdk=SDK_JAVA,
-            filepath="filepath",
-            code="code_of_example",
-            output="output_of_example",
-            status=STATUS_COMPILE_ERROR,
-            tag=Tag(**object_meta),
-            url_vcs="link"),
-        Example(
-            name="name",
-            complexity="MEDIUM",
-            pipeline_id=pipeline_id,
-            sdk=SDK_JAVA,
-            filepath="filepath",
-            code="code_of_example",
-            output="output_of_example",
-            status=STATUS_PREPARATION_ERROR,
-            tag=Tag(**object_meta),
-            url_vcs="link"),
-        Example(
-            name="name",
-            complexity="MEDIUM",
-            pipeline_id=pipeline_id,
-            sdk=SDK_JAVA,
-            filepath="filepath",
-            code="code_of_example",
-            output="output_of_example",
-            status=STATUS_RUN_TIMEOUT,
-            tag=Tag(**object_meta),
-            url_vcs="link"),
-        Example(
-            name="name",
-            complexity="MEDIUM",
-            pipeline_id=pipeline_id,
-            sdk=SDK_JAVA,
-            filepath="filepath",
-            code="code_of_example",
-            output="output_of_example",
-            status=STATUS_VALIDATION_ERROR,
-            tag=Tag(**object_meta),
-            url_vcs="link"),
-        Example(
-            name="name",
-            complexity="MEDIUM",
-            pipeline_id=pipeline_id,
-            sdk=SDK_JAVA,
-            filepath="filepath",
-            code="code_of_example",
-            output="output_of_example",
-            status=STATUS_RUN_ERROR,
-            tag=Tag(**object_meta),
-            url_vcs="link"),
-    ]
-    client = mock.AsyncMock()
-    with pytest.raises(VerifyException):
-        await helper._verify_examples(client, examples_with_errors, Origin.PG_EXAMPLES)
-    with pytest.raises(VerifyException):
-        await helper._verify_examples(client, examples_without_def_ex, Origin.PG_EXAMPLES)
-    with pytest.raises(VerifyException):
-        await helper._verify_examples(client, examples_with_several_def_ex, Origin.PG_EXAMPLES)
-    await helper._verify_examples(client, examples_without_errors, Origin.PG_EXAMPLES)
diff --git a/playground/infrastructure/test_datastore_client.py b/playground/infrastructure/test_datastore_client.py
index 958c5d1d0df..a4945343542 100644
--- a/playground/infrastructure/test_datastore_client.py
+++ b/playground/infrastructure/test_datastore_client.py
@@ -18,10 +18,11 @@ from unittest.mock import MagicMock, ANY
 import mock
 import pytest
 from mock.mock import call
-from config import Origin
+from google.cloud import datastore
 
+from config import Origin
 from datastore_client import DatastoreClient, DatastoreException
-from api.v1.api_pb2 import SDK_JAVA
+from models import SdkEnum
 from test_utils import _get_examples
 
 """
@@ -44,7 +45,7 @@ def test_save_to_cloud_datastore_when_schema_version_not_found(
     ):
         examples = _get_examples(1)
         client = DatastoreClient()
-        client.save_to_cloud_datastore(examples, SDK_JAVA, Origin.PG_EXAMPLES)
+        client.save_to_cloud_datastore(examples, SdkEnum.JAVA, Origin.PG_EXAMPLES)
 
 
 def test_save_to_cloud_datastore_when_google_cloud_project_id_not_set():
@@ -58,6 +59,7 @@ def test_save_to_cloud_datastore_when_google_cloud_project_id_not_set():
         DatastoreClient()
 
 
+@pytest.mark.parametrize("with_kafka", [False, True])
 @pytest.mark.parametrize(
     "origin, key_prefix",
     [
@@ -74,8 +76,10 @@ def test_save_to_cloud_datastore_in_the_usual_case(
     mock_config_project,
     mock_get_schema,
     mock_get_examples,
+    create_test_example,
     origin,
     key_prefix,
+    with_kafka,
 ):
     """
     Test saving examples to the cloud datastore in the usual case
@@ -86,9 +90,9 @@ def test_save_to_cloud_datastore_in_the_usual_case(
     mock_get_examples.return_value = mock_examples
     mock_config_project.return_value = "MOCK_PROJECT_ID"
 
-    examples = _get_examples(1)
+    examples = [create_test_example(with_kafka=with_kafka)]
     client = DatastoreClient()
-    client.save_to_cloud_datastore(examples, SDK_JAVA, origin)
+    client.save_to_cloud_datastore(examples, SdkEnum.JAVA, origin)
     mock_client.assert_called_once()
     mock_get_schema.assert_called_once()
     mock_get_examples.assert_called_once()
@@ -96,15 +100,35 @@ def test_save_to_cloud_datastore_in_the_usual_case(
         call().transaction(),
         call().transaction().__enter__(),
         call().key("pg_sdks", "SDK_JAVA"),
-        call().key("pg_examples", key_prefix + "SDK_JAVA_MOCK_NAME_0"),
-        call().put(ANY),
-        call().key("pg_snippets", key_prefix + "SDK_JAVA_MOCK_NAME_0"),
-        call().put(ANY),
-        call().key("pg_pc_objects", key_prefix + "SDK_JAVA_MOCK_NAME_0_OUTPUT"),
-        call().put_multi([ANY]),
-        call().key("pg_files", key_prefix + "SDK_JAVA_MOCK_NAME_0_0"),
+        call().key("pg_examples", key_prefix + "SDK_JAVA_MOCK_NAME"),
         call().put(ANY),
-        call().transaction().__exit__(None, None, None),
+        call().key("pg_snippets", key_prefix + "SDK_JAVA_MOCK_NAME"),
     ]
+    if with_kafka:
+        calls.append(
+            call().key(
+                "pg_datasets", "dataset_id_1"
+            ),  # used in the nested datasets construction
+        )
+    calls.extend(
+        [
+            call().put(ANY),
+            call().key("pg_pc_objects", key_prefix + "SDK_JAVA_MOCK_NAME_OUTPUT"),
+            call().put_multi([ANY]),
+            call().key("pg_files", key_prefix + "SDK_JAVA_MOCK_NAME_0"),
+            call().put(ANY),
+        ]
+    )
+    if with_kafka:
+        calls.extend(
+            [
+                call().key("pg_datasets", "dataset_id_1"),
+                call().put_multi([ANY]),
+            ]
+        )
+    calls.append(
+        call().transaction().__exit__(None, None, None),
+    )
+
     mock_client.assert_has_calls(calls, any_order=False)
     mock_client.delete_multi.assert_not_called()
diff --git a/playground/infrastructure/test_helper.py b/playground/infrastructure/test_helper.py
index f0ee4b1ae48..1b2277f6d8f 100644
--- a/playground/infrastructure/test_helper.py
+++ b/playground/infrastructure/test_helper.py
@@ -12,42 +12,45 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Dict, Union, List
+from typing import Dict, List, Any, Optional
 from unittest.mock import mock_open
 
 import mock
 import pytest
+import pydantic
 
 from api.v1.api_pb2 import (
     SDK_UNSPECIFIED,
     STATUS_UNSPECIFIED,
     STATUS_VALIDATING,
     STATUS_FINISHED,
-    SDK_JAVA,
     PRECOMPILED_OBJECT_TYPE_EXAMPLE,
     PRECOMPILED_OBJECT_TYPE_KATA,
     PRECOMPILED_OBJECT_TYPE_UNIT_TEST,
 )
-from config import Emulator, Topic, Dataset
 from grpc_client import GRPCClient
+from models import (
+    ComplexityEnum,
+    SdkEnum,
+    Emulator,
+    Topic,
+    EmulatorType,
+    Dataset,
+    DatasetFormat,
+    DatasetLocation,
+)
 from helper import (
     find_examples,
     Example,
-    _get_example,
-    _get_name,
+    _load_example,
     get_tag,
-    _validate,
     Tag,
     get_statuses,
     _check_no_nested,
     _update_example_status,
-    get_supported_categories,
-    _check_file,
     _get_object_type,
-    ExampleTag,
     validate_examples_for_duplicates_by_name,
-    ValidationException,
-    validate_example_fields,
+    DuplicatesError,
 )
 
 
@@ -66,27 +69,28 @@ def test_check_for_nested():
 
 @pytest.mark.parametrize("is_valid", [True, False])
 @mock.patch("helper._check_no_nested")
-@mock.patch("helper._check_file")
+@mock.patch("helper._load_example")
 @mock.patch("helper.os.walk")
-def test_find_examples(mock_os_walk, mock_check_file, mock_check_no_nested, is_valid):
+def test_find_examples(
+    mock_os_walk, mock_load_example, mock_check_no_nested, is_valid, create_test_example
+):
     mock_os_walk.return_value = [
         ("/root/sub1", (), ("file.java",)),
         ("/root/sub2", (), ("file2.java",)),
     ]
-    mock_check_file.return_value = not is_valid
-    sdk = SDK_UNSPECIFIED
     if is_valid:
-        result = find_examples(
-            root_dir="/root", subdirs=["sub1", "sub2"], supported_categories=[], sdk=sdk
+        mock_load_example.return_value = create_test_example()
+        assert (
+            find_examples(root_dir="/root", subdirs=["sub1", "sub2"], sdk=SdkEnum.JAVA)
+            == [create_test_example()] * 4
         )
-        assert not result
     else:
+        mock_load_example.side_effect = Exception("MOCK_ERROR")
         with pytest.raises(
-              ValueError,
-              match="Some of the beam examples contain beam playground tag with "
-                    "an incorrect format",
+            ValueError,
+            match="Some of the beam examples contain beam playground tag with an incorrect format",
         ):
-            find_examples("/root", ["sub1", "sub2"], [], sdk=sdk)
+            find_examples(root_dir="/root", subdirs=["sub1", "sub2"], sdk=SdkEnum.JAVA)
 
     mock_check_no_nested.assert_called_once_with(["sub1", "sub2"])
     mock_os_walk.assert_has_calls(
@@ -95,21 +99,17 @@ def test_find_examples(mock_os_walk, mock_check_file, mock_check_no_nested, is_v
             mock.call("/root/sub2"),
         ]
     )
-    mock_check_file.assert_has_calls(
+    mock_load_example.assert_has_calls(
         [
             mock.call(
-                examples=[],
                 filename="file.java",
                 filepath="/root/sub1/file.java",
-                supported_categories=[],
-                sdk=sdk,
+                sdk=SdkEnum.JAVA,
             ),
             mock.call(
-                examples=[],
                 filename="file2.java",
                 filepath="/root/sub2/file2.java",
-                supported_categories=[],
-                sdk=sdk,
+                sdk=SdkEnum.JAVA,
             ),
         ]
     )
@@ -117,19 +117,8 @@ def test_find_examples(mock_os_walk, mock_check_file, mock_check_no_nested, is_v
 
 @pytest.mark.asyncio
 @mock.patch("helper._update_example_status")
-async def test_get_statuses(mock_update_example_status):
-    example = Example(
-        name="file",
-        complexity="MEDIUM",
-        pipeline_id="pipeline_id",
-        sdk=SDK_UNSPECIFIED,
-        filepath="root/file.extension",
-        code="code",
-        output="output",
-        status=STATUS_UNSPECIFIED,
-        tag={"name": "Name"},
-        url_vcs="link"
-    )
+async def test_get_statuses(mock_update_example_status, create_test_example):
+    example = create_test_example()
     client = mock.sentinel
     await get_statuses(client, [example])
 
@@ -138,221 +127,158 @@ async def test_get_statuses(mock_update_example_status):
 
 @mock.patch(
     "builtins.open",
-    mock_open(read_data="...\n# beam-playground:\n#     name: Name\n\nimport ..."),
-)
-def test_get_tag_when_tag_is_exists():
-    result = get_tag("")
-
-    assert result.tag_as_dict.get("name") == "Name"
-    assert result.tag_as_string == "# beam-playground:\n#     name: Name\n\n"
-
-
-@mock.patch("builtins.open", mock_open(read_data="...\n..."))
-def test_get_tag_when_tag_does_not_exist():
-    result = get_tag("")
-
-    assert result is None
-
-
-@mock.patch("helper._get_example")
-@mock.patch("helper._validate")
-@mock.patch("helper.get_tag")
-def test__check_file_with_correct_tag(mock_get_tag, mock_validate, mock_get_example):
-    tag = ExampleTag({"name": "Name"}, "")
-    example = Example(
-        name="filename",
-        complexity="MEDIUM",
-        sdk=SDK_JAVA,
-        filepath="/root/filename.java",
-        code="data",
-        status=STATUS_UNSPECIFIED,
-        tag=Tag("Name", "Description", False, [], "--option option"),
-        url_vcs="link",
-    )
-    examples = []
-
-    mock_get_tag.return_value = tag
-    mock_validate.return_value = True
-    mock_get_example.return_value = example
-
-    result = _check_file(
-        examples, "filename.java", "/root/filename.java", [], sdk=SDK_JAVA
-    )
-
-    assert result is False
-    assert len(examples) == 1
-    assert examples[0] == example
-    mock_get_tag.assert_called_once_with("/root/filename.java")
-    mock_validate.assert_called_once_with(tag.tag_as_dict, [])
-    mock_get_example.assert_called_once_with(
-        "/root/filename.java", "filename.java", tag
-    )
-
-
-@mock.patch("helper._validate")
-@mock.patch("helper.get_tag")
-def test__check_file_with_incorrect_tag(mock_get_tag, mock_validate):
-    tag = ExampleTag({"name": "Name"}, "")
-    examples = []
-    sdk = SDK_JAVA
-    mock_get_tag.return_value = tag
-    mock_validate.return_value = False
-
-    result = _check_file(examples, "filename.java", "/root/filename.java", [], sdk)
-
-    assert result is True
-    assert len(examples) == 0
-    mock_get_tag.assert_called_once_with("/root/filename.java")
-    mock_validate.assert_called_once_with(tag.tag_as_dict, [])
-
-
-@mock.patch("builtins.open", mock_open(read_data="categories:\n    - category"))
-def test_get_supported_categories():
-    result = get_supported_categories("")
-
-    assert len(result) == 1
-    assert result[0] == "category"
+    mock_open(
+        read_data="""
+// license line 1
+// license line 2
+//
+// beam-playground:
+//   name: KafkaWordCount
+//   description: Test example with Apache Kafka
+//   multifile: false
+//   context_line: 28
+//   categories:
+//     - Filtering
+//     - Options
+//     - Quickstart
+//   complexity: MEDIUM
+//   tags:
+//     - filter
+//     - strings
+//     - emulator
+//   emulators:
+//    - type: kafka
+//      topic:
+//        id: topic_1
+//        source_dataset: dataset_id_1
+//   datasets:
+//      dataset_id_1:
+//          location: local
+//          format: json
 
+code line 1
+code line 2
 
-@mock.patch("builtins.open", mock_open(read_data="data"))
-def test__get_example():
-    tag = ExampleTag(
-        {
-            "name": "Name",
-            "description": "Description",
-            "multifile": "False",
-            "categories": [""],
-            "pipeline_options": "--option option",
-            "context_line": 1,
-            "complexity": "MEDIUM",
-            "url_notebook": "https://some_url/py-name.ipynb",
-            "emulators": {"kafka": {"topic": {"id": "dataset", "dataset": "dataset"}}},
-            "datasets": {"dataset": {"location": "local", "format": "json"}},
-        },
-        "",
+"""
+    ),
+)
+def test_load_example():
+    example = _load_example(
+        "kafka.java", "../../examples/path/kafka.java", SdkEnum.JAVA
     )
-
-    result = _get_example("../../examples/dir/filepath.java", "filepath.java", tag)
-
-    assert result == Example(
-        name="Name",
-        sdk=SDK_JAVA,
-        filepath="../../examples/dir/filepath.java",
-        code="data",
+    assert example == Example(
+        sdk=SdkEnum.JAVA,
         type=PRECOMPILED_OBJECT_TYPE_EXAMPLE,
-        status=STATUS_UNSPECIFIED,
+        filepath="../../examples/path/kafka.java",
+        code="""
+// license line 1
+// license line 2
+//
+
+code line 1
+code line 2
+
+""",
+        url_vcs="https://github.com/apache/beam/blob/master/examples/path/kafka.java", # type: ignore
+        context_line=5,
         tag=Tag(
-            "Name",
-            "MEDIUM",
-            {"kafka": {"topic": {"id": "dataset", "dataset": "dataset"}}},
-            {"dataset": {"location": "local", "format": "json"}},
-            "Description",
-            "False",
-            [""],
-            "--option option",
-            False,
-            1,
-            None,
-            "https://some_url/py-name.ipynb",
+            line_start=4,
+            line_finish=27,
+            name="KafkaWordCount",
+            description="Test example with Apache Kafka",
+            multifile=False,
+            context_line=28,
+            categories=["Filtering", "Options", "Quickstart"],
+            complexity=ComplexityEnum.MEDIUM,
+            tags=["filter", "strings", "emulator"],
+            emulators=[
+                Emulator(
+                    type=EmulatorType.KAFKA,
+                    topic=Topic(id="topic_1", source_dataset="dataset_id_1"),
+                )
+            ],
+            datasets={
+                "dataset_id_1": Dataset(
+                    location=DatasetLocation.LOCAL, format=DatasetFormat.JSON
+                )
+            },
         ),
-        url_vcs="https://github.com/apache/beam/blob/master/examples/dir/filepath.java",
-        url_notebook="https://some_url/py-name.ipynb",
-        complexity="MEDIUM",
-        emulators=[
-            Emulator(topic=Topic(id="dataset", dataset="dataset"), name="kafka")
-        ],
-        datasets=[Dataset(format="json", location="local", name="dataset")],
     )
 
 
-def test__validate_without_name_field():
-    tag = {}
-    assert _validate(tag, []) is False
-
-
-def test__validate_without_description_field():
-    tag = {"name": "Name"}
-    assert _validate(tag, []) is False
-
-
-def test__validate_without_multifile_field():
-    tag = {"name": "Name", "description": "Description"}
-    assert _validate(tag, []) is False
-
-
-def test__validate_with_incorrect_multifile_field():
-    tag = {"name": "Name", "description": "Description", "multifile": "Multifile"}
-    assert _validate(tag, []) is False
-
+def test__validate_without_name_field(create_test_tag):
+    with pytest.raises(
+        pydantic.ValidationError,
+        match="field required",
+    ):
+        create_test_tag(name=None)
 
-def test__validate_without_categories_field():
-    tag = {"name": "Name", "description": "Description", "multifile": "true"}
-    assert _validate(tag, []) is False
 
+def test__validate_without_description_field(create_test_tag):
+    with pytest.raises(
+        pydantic.ValidationError,
+        match="field required",
+    ):
+        create_test_tag(description=None)
 
-def test__validate_without_incorrect_categories_field():
-    tag = {
-        "name": "Name",
-        "description": "Description",
-        "multifile": "true",
-        "categories": "Categories",
-    }
-    assert _validate(tag, []) is False
 
+def test__validate_with_incorrect_multifile_field(create_test_tag):
+    with pytest.raises(
+        pydantic.ValidationError,
+        match="value could not be parsed to a boolean",
+    ):
+        create_test_tag(multifile="multifile")
 
-def test__validate_with_not_supported_category():
-    tag = {
-        "name": "Name",
-        "description": "Description",
-        "multifile": "true",
-        "categories": ["category1"],
-    }
-    assert _validate(tag, ["category"]) is False
 
+def test__validate_without_categories_field(create_test_tag):
+    with pytest.raises(
+        pydantic.ValidationError,
+        match="field required",
+    ):
+        create_test_tag(categories=None)
 
-def test__validate_with_all_fields():
-    tag = {
-        "name": "Name",
-        "description": "Description",
-        "multifile": "true",
-        "categories": ["category"],
-        "pipeline_options": "--option option",
-        "context_line": 1,
-        "complexity": "MEDIUM",
-        "tags": ["tag"],
-        "emulator": "KAFKA",
-        "dataset": "dataset.json",
-    }
-    assert _validate(tag, ["category"]) is True
 
+def test__validate_with_incorrect_categories_field(create_test_tag):
+    with pytest.raises(
+        pydantic.ValidationError,
+        match="value is not a valid list",
+    ):
+        create_test_tag(categories="MOCK_CATEGORY_1")
 
-def test__get_name():
-    result = _get_name("filepath.extension")
 
-    assert result == "filepath"
+def test__validate_with_not_supported_category(create_test_tag):
+    with pytest.raises(
+        pydantic.ValidationError,
+        match="Category MOCK_CATEGORY_1 not in",
+    ):
+        create_test_tag(categories=["MOCK_CATEGORY_1"])
 
 
 @pytest.mark.asyncio
 @mock.patch("grpc_client.GRPCClient.check_status")
 @mock.patch("grpc_client.GRPCClient.run_code")
-async def test__update_example_status(mock_grpc_client_run_code, mock_grpc_client_check_status):
+async def test__update_example_status(
+    mock_grpc_client_run_code, mock_grpc_client_check_status
+):
     example = Example(
-        name="file",
-        complexity="MEDIUM",
+        tag=Tag(
+            line_start=10,
+            line_finish=20,
+            context_line=100,
+            name="file",
+            description="MOCK_DESCRIPTION",
+            complexity=ComplexityEnum.MEDIUM,
+            pipeline_options="--key value",
+            categories=["Testing"],
+        ),
+        context_line=100,
         pipeline_id="pipeline_id",
-        sdk=SDK_UNSPECIFIED,
+        sdk=SdkEnum.JAVA,
         filepath="root/file.extension",
         code="code",
         output="output",
         status=STATUS_UNSPECIFIED,
-        tag=Tag(
-            name="MOCK_NAME",
-            description="MOCK_DESCR",
-            context_line=333,
-            pipeline_options="--key value",
-            complexity="MEDIUM",
-            ),
-        url_vcs="link",
+        url_vcs="https://github.com/link", # type: ignore
     )
 
     mock_grpc_client_run_code.return_value = "pipeline_id"
@@ -384,192 +310,141 @@ def test__get_object_type():
     assert result_test == PRECOMPILED_OBJECT_TYPE_UNIT_TEST
 
 
-def test_validate_examples_for_duplicates_by_name_in_the_usual_case():
+def test_validate_examples_for_duplicates_by_name_in_the_usual_case(
+    create_test_example,
+):
     examples_names = ["MOCK_NAME_1", "MOCK_NAME_2", "MOCK_NAME_3"]
-    examples = list(map(lambda name: _create_example(name), examples_names))
+    examples = list(
+        map(lambda name: create_test_example(tag_meta=dict(name=name)), examples_names)
+    )
     try:
         validate_examples_for_duplicates_by_name(examples)
-    except ValidationException:
+    except DuplicatesError:
         pytest.fail("Unexpected ValidationException")
 
 
-def test_validate_examples_for_duplicates_by_name_when_examples_have_duplicates():
+def test_validate_examples_for_duplicates_by_name_when_examples_have_duplicates(
+    create_test_example,
+):
     examples_names = ["MOCK_NAME_1", "MOCK_NAME_2", "MOCK_NAME_1", "MOCK_NAME_3"]
-    examples = list(map(lambda name: _create_example(name), examples_names))
+    examples = list(
+        map(lambda name: create_test_example(tag_meta=dict(name=name)), examples_names)
+    )
     with pytest.raises(
-          ValidationException,
-          match="Examples have duplicate names.\nDuplicates: \n - path #1: MOCK_FILEPATH \n - path #2: MOCK_FILEPATH",
+        DuplicatesError,
+        match="Examples have duplicate names.\nDuplicates: \n - path #1: MOCK_FILEPATH \n - path #2: MOCK_FILEPATH",
     ):
         validate_examples_for_duplicates_by_name(examples)
 
 
-def test_validate_example_fields_when_filepath_is_invalid():
-    example = _create_example("MOCK_NAME")
-    example.filepath = ""
+def test_validate_example_fields_when_filepath_is_invalid(create_test_example):
     with pytest.raises(
-          ValidationException, match="Example doesn't have a file path field. Example: "
+        pydantic.ValidationError,
+        match="ensure this value has at least 1 characters",
     ):
-        validate_example_fields(example)
+        create_test_example(filepath="")
 
 
-def test_validate_example_fields_when_name_is_invalid():
-    example = _create_example("")
+def test_validate_example_fields_when_sdk_is_invalid(create_test_example):
     with pytest.raises(
-          ValidationException,
-          match="Example doesn't have a name field. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="value is not a valid enumeration member",
     ):
-        validate_example_fields(example)
+        create_test_example(sdk=SDK_UNSPECIFIED)
 
 
-def test_validate_example_fields_when_sdk_is_invalid():
-    example = _create_example("MOCK_NAME")
-    example.sdk = SDK_UNSPECIFIED
+def test_validate_example_fields_when_code_is_invalid(create_test_example):
     with pytest.raises(
-          ValidationException,
-          match="Example doesn't have a sdk field. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="ensure this value has at least 1 characters",
     ):
-        validate_example_fields(example)
+        create_test_example(code="")
 
 
-def test_validate_example_fields_when_code_is_invalid():
-    example = _create_example("MOCK_NAME")
-    example.code = ""
+def test_validate_example_fields_when_url_vcs_is_invalid(create_test_example):
     with pytest.raises(
-          ValidationException,
-          match="Example doesn't have a code field. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="ensure this value has at least 1 characters",
     ):
-        validate_example_fields(example)
-
+        create_test_example(url_vcs="")
 
-def test_validate_example_fields_when_link_is_invalid():
-    example = _create_example("MOCK_NAME")
-    example.url_vcs = ""
-    with pytest.raises(ValidationException, match="Example doesn't have a url_vcs field. Path: MOCK_FILEPATH"):
-        validate_example_fields(example)
 
-
-def test_validate_example_fields_when_complexity_is_invalid():
-    example = _create_example("MOCK_NAME")
-    example.complexity = ""
-    with pytest.raises(
-          ValidationException,
-          match="Example doesn't have a complexity field. Path: MOCK_FILEPATH",
-    ):
-        validate_example_fields(example)
-
-
-def test_validate_example_fields_when_dataset_not_set_but_emulator_set():
-    example = _create_example("MOCK_NAME")
-    emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="kafka")
-    example.emulators.append(emulator)
+def test_validate_example_fields_when_name_is_invalid(create_test_tag):
     with pytest.raises(
-          ValidationException,
-          match="Example has an emulators field but a datasets field not found. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="ensure this value has at least 1 characters",
     ):
-        validate_example_fields(example)
+        create_test_tag(name="")
 
 
-def test_validate_example_fields_when_emulator_not_set_but_dataset_set():
-    example = _create_example("MOCK_NAME")
-    dataset = Dataset(format="json", location="local", name="dataset")
-    example.datasets.append(dataset)
+def test_validate_example_fields_when_complexity_is_invalid(create_test_tag):
     with pytest.raises(
-          ValidationException,
-          match="Example has a datasets field but an emulators field not found. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="value is not a valid enumeration member",
     ):
-        validate_example_fields(example)
+        create_test_tag(complexity="")
 
 
-def test_validate_example_fields_when_topic_dataset_is_invalid():
-    example = _create_example("MOCK_NAME")
-    emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="MOCK_DATASET"), name="kafka")
-    dataset = Dataset(format="json", location="local", name="dataset")
-    example.datasets.append(dataset)
-    example.emulators.append(emulator)
+def test_validate_example_fields_when_emulator_not_set_but_dataset_set(create_test_tag):
     with pytest.raises(
-          ValidationException,
-          match="Example has invalid emulator value. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="datasets w/o emulators",
     ):
-        validate_example_fields(example)
+        create_test_tag(datasets={"dataset_id_1": {"format": "avro", "location": "local"}})
 
 
-def test_validate_example_fields_when_emulator_name_is_invalid():
-    example = _create_example("MOCK_NAME")
-    emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="MOCK_NAME")
-    dataset = Dataset(format="json", location="local", name="dataset")
-    example.datasets.append(dataset)
-    example.emulators.append(emulator)
+def test_validate_example_fields_when_emulator_type_is_invalid(create_test_tag):
     with pytest.raises(
-          ValidationException,
-          match="Example has invalid emulator value. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="value is not a valid enumeration member",
     ):
-        validate_example_fields(example)
+        create_test_tag(
+            emulators=[
+                {
+                    "type": "MOCK_TYPE",
+                    "topic": {"id": "topic1", "source_dataset": "dataset_id_1"},
+                }
+            ],
+            datasets={"dataset_id_1": {"format": "json", "location": "local"}},
+        )
 
 
-def test_validate_example_fields_when_dataset_format_is_invalid():
-    example = _create_example("MOCK_NAME")
-    emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="dataset")
-    dataset = Dataset(format="MOCK_FORMAT", location="local", name="dataset")
-    example.datasets.append(dataset)
-    example.emulators.append(emulator)
+def test_validate_example_fields_when_dataset_format_is_invalid(create_test_tag):
     with pytest.raises(
-          ValidationException,
-          match="Example has invalid dataset value. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="value is not a valid enumeration member",
     ):
-        validate_example_fields(example)
+        create_test_tag(
+            emulators=[
+                {"type": "kafka", "topic": {"id": "topic1", "source_dataset": "src"}}
+            ],
+            datasets={"src": {"format": "MOCK_FORMAT", "location": "local"}},
+        )
 
 
-def test_validate_example_fields_when_dataset_location_is_invalid():
-    example = _create_example("MOCK_NAME")
-    emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="dataset")
-    dataset = Dataset(format="avro", location="MOCK_LOCATION", name="dataset")
-    example.datasets.append(dataset)
-    example.emulators.append(emulator)
+def test_validate_example_fields_when_dataset_location_is_invalid(create_test_tag):
     with pytest.raises(
-          ValidationException,
-          match="Example has invalid dataset value. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="value is not a valid enumeration member",
     ):
-        validate_example_fields(example)
+        create_test_tag(
+            emulators=[
+                {"type": "kafka", "topic": {"id": "topic1", "source_dataset": "src"}}
+            ],
+            datasets={"src": {"format": "avro", "location": "MOCK_LOCATION"}},
+        )
 
 
-def test_validate_example_fields_when_dataset_name_is_invalid():
-    example = _create_example("MOCK_NAME")
-    emulator = Emulator(topic=Topic(id="MOCK_ID", dataset="dataset"), name="dataset")
-    dataset = Dataset(format="avro", location="MOCK_LOCATION", name="")
-    example.datasets.append(dataset)
-    example.emulators.append(emulator)
+def test_validate_example_fields_when_dataset_name_is_invalid(create_test_tag):
     with pytest.raises(
-          ValidationException,
-          match="Example has invalid dataset value. Path: MOCK_FILEPATH",
+        pydantic.ValidationError,
+        match="mulator topic topic1 has undefined dataset src",
     ):
-        validate_example_fields(example)
-
-
-def _create_example(name: str) -> Example:
-    object_meta = {
-        "name": "MOCK_NAME",
-        "description": "MOCK_DESCRIPTION",
-        "multifile": False,
-        "categories": ["MOCK_CATEGORY_1", "MOCK_CATEGORY_2"],
-        "pipeline_options": "--MOCK_OPTION MOCK_OPTION_VALUE",
-    }
-    return _create_example_with_meta(name, object_meta)
-
-
-def _create_example_with_meta(name: str, object_meta: Dict[str, Union[str, bool, List[str]]]) -> Example:
-    example = Example(
-        name=name,
-        pipeline_id="MOCK_PIPELINE_ID",
-        sdk=SDK_JAVA,
-        filepath="MOCK_FILEPATH",
-        code="MOCK_CODE",
-        output="MOCK_OUTPUT",
-        status=STATUS_UNSPECIFIED,
-        tag=Tag(**object_meta),
-        url_vcs="MOCK_LINK",
-        complexity="MOCK_COMPLEXITY",
-    )
-    return example
+        create_test_tag(
+            emulators=[
+                {"type": "kafka", "topic": {"id": "topic1", "source_dataset": "src"}}
+            ]
+        )
 
 
 @mock.patch(
@@ -592,10 +467,10 @@ def _create_example_with_meta(name: str, object_meta: Dict[str, Union[str, bool,
 //     - strings
 //     - emulator
 //   emulators:
-//      kafka:
-//          topic:
-//              id: topic_1
-//              dataset: dataset_id_1
+//    - type: kafka
+//      topic:
+//        id: topic_1
+//        source_dataset: dataset_id_1
 //   datasets:
 //      dataset_id_1:
 //          location: local
@@ -605,10 +480,11 @@ def _create_example_with_meta(name: str, object_meta: Dict[str, Union[str, bool,
     ),
 )
 def test_get_tag_with_datasets():
-    example = get_tag("filepath")
-    example.tag_as_string = ""  # to not compare with itself
-    assert example == ExampleTag(
-        tag_as_dict={
+    tag = get_tag("filepath")
+    assert tag == Tag(
+        **{
+            "line_start": 2,
+            "line_finish": 25,
             "name": "KafkaWordCount",
             "description": "Test example with Apache Kafka",
             "multifile": False,
@@ -616,8 +492,24 @@ def test_get_tag_with_datasets():
             "categories": ["Filtering", "Options", "Quickstart"],
             "complexity": "MEDIUM",
             "tags": ["filter", "strings", "emulator"],
-            "emulators": {"kafka": {"topic": {"id": "topic_1", "dataset": "dataset_id_1"}}},
+            "emulators": [
+                {
+                    "type": "kafka",
+                    "topic": {"id": "topic_1", "source_dataset": "dataset_id_1"},
+                }
+            ],
             "datasets": {"dataset_id_1": {"location": "local", "format": "json"}},
         },
-        tag_as_string="",
     )
+
+@mock.patch("os.path.isfile", return_value=True)
+def test_dataset_path_ok(mock_file_check, create_test_example):
+    example = create_test_example(with_kafka=True)
+    assert len(example.tag.datasets) > 0
+    assert example.tag.datasets.popitem()[1].file_name == "dataset_id_1.avro"
+
+
+@mock.patch("os.path.isfile", return_value=False)
+def test_dataset_path_notfound(mock_file_check, create_test_example):
+    with pytest.raises(FileNotFoundError):
+        create_test_example(with_kafka=True)
\ No newline at end of file
diff --git a/playground/infrastructure/test_repository.py b/playground/infrastructure/test_repository.py
deleted file mode 100644
index dfbd00df467..00000000000
--- a/playground/infrastructure/test_repository.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import mock
-import pytest
-
-from config import Dataset, RepoProps
-from repository import set_dataset_path_for_examples
-from test_utils import _get_examples
-
-"""
-Unit tests for the Cloud Storage client
-"""
-
-
-@mock.patch("os.path.isfile", return_value=True)
-def test_set_dataset_path_for_examples(mock_file_check):
-    examples = _get_examples_with_datasets(3)
-    set_dataset_path_for_examples(examples)
-    for example in examples:
-        assert example.datasets[0].path == "MOCK_NAME.MOCK_FORMAT"
-
-
-@mock.patch("os.path.isfile", return_value=False)
-def test_set_dataset_path_for_examples_when_path_is_invalid(mock_file_check):
-    with pytest.raises(FileNotFoundError):
-        examples = _get_examples_with_datasets(1)
-        set_dataset_path_for_examples(examples)
-
-
-def _get_examples_with_datasets(number_of_examples: int):
-    examples = _get_examples(number_of_examples)
-    for example in examples:
-        datasets = []
-        dataset = Dataset(
-            format="MOCK_FORMAT",
-            location="MOCK_LOCATION",
-            name="MOCK_NAME"
-        )
-        datasets.append(dataset)
-        example.datasets = datasets
-    return examples
diff --git a/playground/infrastructure/test_utils.py b/playground/infrastructure/test_utils.py
index 68950a3659c..e65e723dccc 100644
--- a/playground/infrastructure/test_utils.py
+++ b/playground/infrastructure/test_utils.py
@@ -16,29 +16,33 @@
 from typing import List
 
 from api.v1.api_pb2 import SDK_JAVA, STATUS_UNSPECIFIED
-from helper import Example, Tag
+from models import Example, Tag, SdkEnum, ComplexityEnum
 
 
 def _get_examples(number_of_examples: int) -> List[Example]:
     examples = []
     for number in range(number_of_examples):
-        object_meta = {
-            "name": f"MOCK_NAME_{number}",
-            "description": f"MOCK_DESCRIPTION_{number}",
-            "multifile": False,
-            "categories": ["MOCK_CATEGORY_1", "MOCK_CATEGORY_2"],
-            "pipeline_options": "--MOCK_OPTION MOCK_OPTION_VALUE"
-        }
-        example = Example(
+        tag = Tag(
+            line_start=100,
+            line_finish=120,
+            context_line=123,
             name=f"MOCK_NAME_{number}",
-            complexity="MEDIUM",
+            complexity=ComplexityEnum.MEDIUM,
+            description=f"MOCK_DESCRIPTION_{number}",
+            multifile=False,
+            categories=["Side Input", "Multiple Outputs"],
+            pipeline_options="--MOCK_OPTION MOCK_OPTION_VALUE",
+        )
+        example = Example(
+            tag=tag,
+            context_line=123,
             pipeline_id=f"MOCK_PIPELINE_ID_{number}",
-            sdk=SDK_JAVA,
+            sdk=SdkEnum.JAVA,
             filepath=f"MOCK_FILEPATH_{number}",
             code=f"MOCK_CODE_{number}",
             output=f"MOCK_OUTPUT_{number}",
             status=STATUS_UNSPECIFIED,
-            tag=Tag(**object_meta),
-            url_vcs=f"MOCK_LINK_{number}")
+            url_vcs=f"https://mock.link/{number}", # type: ignore
+        )
         examples.append(example)
     return examples
diff --git a/playground/infrastructure/test_verify.py b/playground/infrastructure/test_verify.py
new file mode 100644
index 00000000000..df887478a98
--- /dev/null
+++ b/playground/infrastructure/test_verify.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+import pytest
+
+from api.v1.api_pb2 import (
+    SDK_JAVA,
+    STATUS_FINISHED,
+    STATUS_ERROR,
+    STATUS_VALIDATION_ERROR,
+    STATUS_PREPARATION_ERROR,
+    STATUS_RUN_TIMEOUT,
+    STATUS_COMPILE_ERROR,
+    STATUS_RUN_ERROR,
+)
+from verify import Verifier, VerifyException
+from config import Origin
+from models import SdkEnum
+
+
+@pytest.mark.asyncio
+async def test__verify_examples(create_test_example):
+    verifier = Verifier(SdkEnum.JAVA, Origin.PG_EXAMPLES)
+    default_example = create_test_example(tag_meta=dict(default_example=True))
+    finished_example = create_test_example(tag_meta=dict(default_example=True))
+    finished_example = create_test_example(status=STATUS_FINISHED)
+    examples_without_def_ex = [
+        finished_example,
+        finished_example,
+    ]
+    examples_with_several_def_ex = [
+        default_example,
+        default_example,
+    ]
+    examples_without_errors = [
+        default_example,
+        finished_example,
+    ]
+    examples_with_errors = [
+        create_test_example(status=STATUS_VALIDATION_ERROR),
+        create_test_example(status=STATUS_ERROR),
+        create_test_example(status=STATUS_COMPILE_ERROR),
+        create_test_example(status=STATUS_PREPARATION_ERROR),
+        create_test_example(status=STATUS_RUN_TIMEOUT),
+        create_test_example(status=STATUS_RUN_ERROR),
+    ]
+    client = mock.AsyncMock()
+    with pytest.raises(VerifyException):
+        await verifier._verify_examples(
+            client, examples_with_errors, Origin.PG_EXAMPLES
+        )
+    with pytest.raises(VerifyException):
+        await verifier._verify_examples(
+            client, examples_without_def_ex, Origin.PG_EXAMPLES
+        )
+    with pytest.raises(VerifyException):
+        await verifier._verify_examples(
+            client, examples_with_several_def_ex, Origin.PG_EXAMPLES
+        )
+    await verifier._verify_examples(client, examples_without_errors, Origin.PG_EXAMPLES)
diff --git a/playground/infrastructure/ci_helper.py b/playground/infrastructure/verify.py
similarity index 60%
rename from playground/infrastructure/ci_helper.py
rename to playground/infrastructure/verify.py
index a70a515ca54..9d3784bd600 100644
--- a/playground/infrastructure/ci_helper.py
+++ b/playground/infrastructure/verify.py
@@ -13,56 +13,86 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""
-Module contains the helper for CI step.
-
-It is used to find and verify correctness if beam examples/katas/tests.
-"""
-
+import asyncio
 import logging
 from typing import List
 
-from api.v1.api_pb2 import STATUS_COMPILE_ERROR, STATUS_ERROR, STATUS_RUN_ERROR, \
-    STATUS_RUN_TIMEOUT, \
-    STATUS_VALIDATION_ERROR, STATUS_PREPARATION_ERROR
-from config import Config, Origin
+from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA
+from api.v1.api_pb2 import (
+    STATUS_COMPILE_ERROR,
+    STATUS_ERROR,
+    STATUS_RUN_ERROR,
+    STATUS_RUN_TIMEOUT,
+    STATUS_VALIDATION_ERROR,
+    STATUS_PREPARATION_ERROR,
+)
+from config import Origin, Config
 from grpc_client import GRPCClient
-from helper import Example, get_statuses
-from repository import set_dataset_path_for_examples
+from models import Example, SdkEnum
+from helper import get_statuses
 
 
 class VerifyException(Exception):
-    def __init__(self, error: str):
-        super().__init__()
-        self.msg = error
+    pass
 
-    def __str__(self):
-        return self.msg
 
+class Verifier:
+    """Run examples and verify the results, enrich examples with produced artifacts"""
 
-class CIHelper:
-    """
-    Helper for CI step.
+    _sdk: SdkEnum
+    _origin: Origin
 
-    It is used to find and verify correctness if beam examples/katas/tests.
-    """
+    def __init__(self, sdk: SdkEnum, origin: Origin):
+        self._sdk = sdk
+        self._origin = origin
 
-    async def verify_examples(self, examples: List[Example], origin: Origin):
+    def run_verify(self, examples: List[Example]):
         """
-        Verify correctness of beam examples.
+        Save beam examples and their output in the Google Cloud Datastore.
 
-        1. Find all beam examples starting from directory os.getenv("BEAM_ROOT_DIR")
-        2. Group code of examples by their SDK.
-        3. Run processing for single-file examples to verify examples' code.
         """
-        single_file_examples = list(filter(
-            lambda example: example.tag.multifile is False, examples))
-        set_dataset_path_for_examples(single_file_examples)
-        async with GRPCClient() as client:
-            await get_statuses(client, single_file_examples)
-            await self._verify_examples(client, single_file_examples, origin)
+        logging.info("Start of executing Playground examples ...")
+        asyncio.run(self._run_and_verify(examples))
+        logging.info("Finish of executing Playground examples")
+
+    async def _run_and_verify(self, examples: List[Example]):
+        """
+        Run beam examples and keep their output.
+
+        Call the backend to start code processing for the examples.
+        Then receive code output.
 
-    async def _verify_examples(self, client: any, examples: List[Example], origin: Origin):
+        Args:
+            examples: beam examples that should be run
+        """
+
+        async def _populate_fields(example: Example):
+            try:
+                example.compile_output = await client.get_compile_output(
+                    example.pipeline_id
+                )
+                example.output = await client.get_run_output(example.pipeline_id)
+                example.logs = await client.get_log(example.pipeline_id)
+                if example.sdk in [SDK_JAVA, SDK_PYTHON]:
+                    example.graph = await client.get_graph(
+                        example.pipeline_id, example.filepath
+                    )
+            except Exception as e:
+                logging.error(example.url_vcs)
+                logging.error(example.compile_output)
+                raise RuntimeError(f"error in {example.tag.name}") from e
+
+        async with GRPCClient() as client:
+            await get_statuses(
+                client, examples
+            )  # run examples code and wait until all are executed
+            tasks = [_populate_fields(example) for example in examples]
+            await asyncio.gather(*tasks)
+            await self._verify_examples(client, examples, self._origin)
+
+    async def _verify_examples(
+        self, client: GRPCClient, examples: List[Example], origin: Origin
+    ):
         """
         Verify statuses of beam examples and the number of found default examples.
 
@@ -93,34 +123,40 @@ class CIHelper:
                 logging.error("Example: %s has preparation error", example.filepath)
             elif example.status == STATUS_ERROR:
                 logging.error(
-                    "Example: %s has error during setup run builder", example.filepath)
+                    "Example: %s has error during setup run builder", example.filepath
+                )
             elif example.status == STATUS_RUN_TIMEOUT:
                 logging.error("Example: %s failed because of timeout", example.filepath)
             elif example.status == STATUS_COMPILE_ERROR:
                 err = await client.get_compile_output(example.pipeline_id)
                 logging.error(
-                    "Example: %s has compilation error: %s", example.filepath, err)
+                    "Example: %s has compilation error: %s", example.filepath, err
+                )
             elif example.status == STATUS_RUN_ERROR:
                 err = await client.get_run_error(example.pipeline_id)
                 logging.error(
-                    "Example: %s has execution error: %s", example.filepath, err)
+                    "Example: %s has execution error: %s", example.filepath, err
+                )
             verify_status_failed = True
 
         logging.info(
             "Number of verified Playground examples: %s / %s",
             count_of_verified,
-            len(examples))
+            len(examples),
+        )
         logging.info(
             "Number of Playground examples with some error: %s / %s",
             len(examples) - count_of_verified,
-            len(examples))
+            len(examples),
+        )
 
         if origin == Origin.PG_EXAMPLES:
             if len(default_examples) == 0:
                 logging.error("Default example not found")
                 raise VerifyException(
                     "CI step failed due to finding an incorrect number "
-                    "of default examples. Default example not found")
+                    "of default examples. Default example not found"
+                )
             if len(default_examples) > 1:
                 logging.error("Many default examples found")
                 logging.error("Examples where the default_example field is true:")
@@ -128,7 +164,8 @@ class CIHelper:
                     logging.error(example.filepath)
                 raise VerifyException(
                     "CI step failed due to finding an incorrect number "
-                    "of default examples. Many default examples found")
+                    "of default examples. Many default examples found"
+                )
 
         if verify_status_failed:
             raise VerifyException("CI step failed due to errors in the examples")