You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/07 08:32:13 UTC

[GitHub] [beam] eantyshev commented on a diff in pull request #23800: [Playground][Backend] Dataset processing with python scripts to deploy examples

eantyshev commented on code in PR #23800:
URL: https://github.com/apache/beam/pull/23800#discussion_r1015113708


##########
playground/infrastructure/datastore_client.py:
##########
@@ -31,6 +31,7 @@
 from helper import Example
 
 from api.v1.api_pb2 import Sdk, PrecompiledObjectType
+from storage_client import StorageClient

Review Comment:
   I think that datastore_client shouldn't depend on Cloud Storage client



##########
playground/infrastructure/datastore_client.py:
##########
@@ -83,54 +87,67 @@ def save_to_cloud_datastore(self, examples_from_rep: List[Example], sdk: Sdk, or
         examples_ids_before_updating = self._get_all_examples(sdk, origin)
 
         # loop through every example to save them to the Cloud Datastore
-        with self._datastore_client.transaction():
-            for example in tqdm(examples_from_rep):
-                sdk_key = self._get_key(DatastoreProps.SDK_KIND, Sdk.Name(example.sdk))
-                example_id = self._make_example_id(origin, sdk, example.name)
-                updated_example_ids.append(example_id)
-                examples.append(
-                    self._to_example_entity(example, example_id, sdk_key, actual_schema_version_key, origin)
-                )
-                snippets.append(
-                    self._to_snippet_entity(example, example_id, sdk_key, now, actual_schema_version_key, origin)
-                )
-                pc_objects.extend(self._pc_object_entities(example, example_id))
-                files.append(self._to_file_entity(example, example_id))
-
-            self._datastore_client.put_multi(examples)
-            self._datastore_client.put_multi(snippets)
-            self._datastore_client.put_multi(pc_objects)
-            self._datastore_client.put_multi(files)
-
-            # delete examples from the Cloud Datastore that are not in the repository
-            examples_ids_for_removing = list(filter(lambda key: key not in updated_example_ids, examples_ids_before_updating))
-            if len(examples_ids_for_removing) != 0:
-                logging.info("Start of deleting extra playground examples ...")
-                examples_keys_for_removing = list(
-                    map(lambda ex_id: self._get_key(DatastoreProps.EXAMPLE_KIND, ex_id), examples_ids_for_removing)
-                )
-                snippets_keys_for_removing = list(
-                    map(lambda ex_id: self._get_key(DatastoreProps.SNIPPET_KIND, ex_id), examples_ids_for_removing)
-                )
-                file_keys_for_removing = list(
-                    map(self._get_files_key, examples_ids_for_removing)
+
+        for example in tqdm(examples_from_rep):
+            sdk_key = self._get_key(DatastoreProps.SDK_KIND, Sdk.Name(example.sdk))
+            example_id = self._make_example_id(origin, sdk, example.name)
+            updated_example_ids.append(example_id)
+            examples.append(
+                self._to_example_entity(example, example_id, sdk_key, actual_schema_version_key, origin)
+            )
+            snippet = self._to_snippet_entity(example, example_id, sdk_key, now, actual_schema_version_key, origin)
+            pc_objects.extend(self._pc_object_entities(example, example_id))
+            files.append(self._to_file_entity(example, example_id))
+            if example.datasets and example.emulators:

Review Comment:
   `save_to_cloud_datastore` method saves both in Datastore and in Cloud Storage :(



##########
playground/infrastructure/helper.py:
##########
@@ -476,29 +506,43 @@ def validate_example_fields(example: Example):
     :param example: example from the repository
     """
     if example.filepath == "":
-        err_msg = f"Example doesn't have a file path field. Example: {example}"
-        logging.error(err_msg)
-        raise ValidationException(err_msg)
+        _log_and_rise_validation_err(f"Example doesn't have a file path field. Example: {example}")
     if example.name == "":
-        err_msg = f"Example doesn't have a name field. Path: {example.filepath}"
-        logging.error(err_msg)
-        raise ValidationException(err_msg)
+        _log_and_rise_validation_err(f"Example doesn't have a name field. Path: {example.filepath}")
     if example.sdk == SDK_UNSPECIFIED:
-        err_msg = f"Example doesn't have a sdk field. Path: {example.filepath}"
-        logging.error(err_msg)
-        raise ValidationException(err_msg)
+        _log_and_rise_validation_err(f"Example doesn't have a sdk field. Path: {example.filepath}")
     if example.code == "":
-        err_msg = f"Example doesn't have a code field. Path: {example.filepath}"
-        logging.error(err_msg)
-        raise ValidationException(err_msg)
+        _log_and_rise_validation_err(f"Example doesn't have a code field. Path: {example.filepath}")
     if example.link == "":
-        err_msg = f"Example doesn't have a link field. Path: {example.filepath}"
-        logging.error(err_msg)
-        raise ValidationException(err_msg)
+        _log_and_rise_validation_err(f"Example doesn't have a link field. Path: {example.filepath}")
     if example.complexity == "":
-        err_msg = f"Example doesn't have a complexity field. Path: {example.filepath}"
-        logging.error(err_msg)
-        raise ValidationException(err_msg)
+        _log_and_rise_validation_err(f"Example doesn't have a complexity field. Path: {example.filepath}")
+    datasets = example.datasets
+    emulators = example.emulators
+
+    if datasets and not emulators:
+        _log_and_rise_validation_err(f"Example has a datasets field but an emulators field not found. Path: {example.filepath}")
+    if emulators and not datasets:
+        _log_and_rise_validation_err(f"Example has an emulators field but a datasets field not found. Path: {example.filepath}")
+
+    dataset_names = []
+    if datasets:
+        for dataset in datasets:
+            location = dataset.location
+            dataset_format = dataset.format
+            if not location or not dataset_format or location not in ["GCS"] or dataset_format not in ["json", "avro"]:
+                _log_and_rise_validation_err(f"Example has invalid dataset value. Path: {example.filepath}")
+            dataset_names.append(dataset.name)
+
+    if emulators:
+        for emulator in emulators:
+            if emulator.name not in ["kafka"] or not emulator.topic or emulator.topic.dataset not in dataset_names or not emulator.topic.id:
+                _log_and_rise_validation_err(f"Example has invalid emulator value. Path: {example.filepath}")

Review Comment:
   ```suggestion
       for dataset in datasets:
           location = dataset.location
           dataset_format = dataset.format
           if not location or not dataset_format or location not in ["GCS"] or dataset_format not in ["json", "avro"]:
               _log_and_rise_validation_err(f"Example has invalid dataset value. Path: {example.filepath}")
           dataset_names.append(dataset.name)
   
       for emulator in emulators:
           if emulator.name not in ["kafka"] or not emulator.topic or emulator.topic.dataset not in dataset_names or not emulator.topic.id:
               _log_and_rise_validation_err(f"Example has invalid emulator value. Path: {example.filepath}")
   ```



##########
playground/infrastructure/datastore_client.py:
##########
@@ -83,54 +87,67 @@ def save_to_cloud_datastore(self, examples_from_rep: List[Example], sdk: Sdk, or
         examples_ids_before_updating = self._get_all_examples(sdk, origin)
 
         # loop through every example to save them to the Cloud Datastore
-        with self._datastore_client.transaction():
-            for example in tqdm(examples_from_rep):
-                sdk_key = self._get_key(DatastoreProps.SDK_KIND, Sdk.Name(example.sdk))
-                example_id = self._make_example_id(origin, sdk, example.name)
-                updated_example_ids.append(example_id)
-                examples.append(
-                    self._to_example_entity(example, example_id, sdk_key, actual_schema_version_key, origin)
-                )
-                snippets.append(
-                    self._to_snippet_entity(example, example_id, sdk_key, now, actual_schema_version_key, origin)
-                )
-                pc_objects.extend(self._pc_object_entities(example, example_id))
-                files.append(self._to_file_entity(example, example_id))
-
-            self._datastore_client.put_multi(examples)
-            self._datastore_client.put_multi(snippets)
-            self._datastore_client.put_multi(pc_objects)
-            self._datastore_client.put_multi(files)
-
-            # delete examples from the Cloud Datastore that are not in the repository
-            examples_ids_for_removing = list(filter(lambda key: key not in updated_example_ids, examples_ids_before_updating))
-            if len(examples_ids_for_removing) != 0:
-                logging.info("Start of deleting extra playground examples ...")
-                examples_keys_for_removing = list(
-                    map(lambda ex_id: self._get_key(DatastoreProps.EXAMPLE_KIND, ex_id), examples_ids_for_removing)
-                )
-                snippets_keys_for_removing = list(
-                    map(lambda ex_id: self._get_key(DatastoreProps.SNIPPET_KIND, ex_id), examples_ids_for_removing)
-                )
-                file_keys_for_removing = list(
-                    map(self._get_files_key, examples_ids_for_removing)
+
+        for example in tqdm(examples_from_rep):
+            sdk_key = self._get_key(DatastoreProps.SDK_KIND, Sdk.Name(example.sdk))
+            example_id = self._make_example_id(origin, sdk, example.name)
+            updated_example_ids.append(example_id)
+            examples.append(
+                self._to_example_entity(example, example_id, sdk_key, actual_schema_version_key, origin)
+            )
+            snippet = self._to_snippet_entity(example, example_id, sdk_key, now, actual_schema_version_key, origin)
+            pc_objects.extend(self._pc_object_entities(example, example_id))
+            files.append(self._to_file_entity(example, example_id))
+            if example.datasets and example.emulators:
+                dataset = example.datasets[0]
+                emulator = example.emulators[0]
+                file_name = f"{dataset.name}.{dataset.format}"
+                link = self._upload_dataset_to_bucket(file_name)
+                dataset = self._to_dataset_entity(file_name, link)
+                datasets.append(dataset)
+                dataset_nested_entity = self._to_dataset_nested_entity(file_name, example_id, emulator)
+                snippet_datasets = [dataset_nested_entity]
+                snippet.update(
+                    {
+                        "datasets": snippet_datasets
+                    }
                 )
-                pc_objs_keys_for_removing = []
-                for example_id_item in examples_ids_for_removing:
-                    for example_type in [
-                        PrecompiledExample.GRAPH_EXTENSION.upper(),
-                        PrecompiledExample.OUTPUT_EXTENSION.upper(),
-                        PrecompiledExample.LOG_EXTENSION.upper()
-                    ]:
-                        pc_objs_keys_for_removing.append(
-                            self._get_key(DatastoreProps.PRECOMPILED_OBJECT_KIND,
-                                f"{example_id_item}{config.DatastoreProps.KEY_NAME_DELIMITER}{example_type}")
-                        )
-                self._datastore_client.delete_multi(examples_keys_for_removing)
-                self._datastore_client.delete_multi(snippets_keys_for_removing)
-                self._datastore_client.delete_multi(file_keys_for_removing)
-                self._datastore_client.delete_multi(pc_objs_keys_for_removing)
-                logging.info("Finish of deleting extra playground examples ...")
+            snippets.append(snippet)
+
+        if datasets:
+            self._datastore_client.put_multi(datasets)
+        self._datastore_client.put_multi(examples)
+        self._datastore_client.put_multi(snippets)
+        self._datastore_client.put_multi(pc_objects)
+        self._datastore_client.put_multi(files)
+
+        # delete examples from the Cloud Datastore that are not in the repository
+        examples_ids_for_removing = list(filter(lambda key: key not in updated_example_ids, examples_ids_before_updating))
+        if len(examples_ids_for_removing) != 0:
+            logging.info("Start of deleting extra playground examples ...")
+            examples_keys_for_removing = list(
+                map(lambda ex_id: self._get_key(DatastoreProps.EXAMPLE_KIND, ex_id), examples_ids_for_removing)
+            )
+            snippets_keys_for_removing = list(
+                map(lambda ex_id: self._get_key(DatastoreProps.SNIPPET_KIND, ex_id), examples_ids_for_removing)
+            )
+            file_keys_for_removing = list(
+                map(self._get_files_key, examples_ids_for_removing)
+            )
+            pc_objs_keys_for_removing = []
+            for example_id_item in examples_ids_for_removing:
+                for example_type in [
+                    PrecompiledExample.GRAPH_EXTENSION.upper(),
+                    PrecompiledExample.OUTPUT_EXTENSION.upper(),
+                    PrecompiledExample.LOG_EXTENSION.upper()
+                ]:
+                    pc_objs_keys_for_removing.append(
+                        self._get_key(DatastoreProps.PRECOMPILED_OBJECT_KIND, f"{example_id_item}{config.DatastoreProps.KEY_NAME_DELIMITER}{example_type}"))
+            self._datastore_client.delete_multi(examples_keys_for_removing)
+            self._datastore_client.delete_multi(snippets_keys_for_removing)
+            self._datastore_client.delete_multi(file_keys_for_removing)
+            self._datastore_client.delete_multi(pc_objs_keys_for_removing)
+            logging.info("Finish of deleting extra playground examples ...")

Review Comment:
   What about datasets removal?



##########
playground/infrastructure/helper.py:
##########
@@ -298,10 +304,34 @@ def _get_example(filepath: str, filename: str, tag: ExampleTag) -> Example:
         type=object_type,
         link=link)
 
+    datasets_as_dict: Dict[str, str] = _get_dict_by_field(tag, TagFields.datasets)
+    if datasets_as_dict:
+        datasets = []
+        for key in datasets_as_dict:
+            dataset = Dataset.from_dict(datasets_as_dict.get(key))
+            dataset.name = key
+            datasets.append(dataset)
+        example.datasets = datasets
+
+    emulators_as_dict: Dict[str, str] = _get_dict_by_field(tag, TagFields.emulators)
+    if emulators_as_dict:
+        emulators = []
+        for key in emulators_as_dict:
+            emulator = Emulator.from_dict(emulators_as_dict.get(key))
+            emulator.name = key
+            emulators.append(emulator)
+        example.emulators = emulators
+
     validate_example_fields(example)
     return example
 
 
+def _get_dict_by_field(tag: ExampleTag, field: str) -> Dict[str, str]:

Review Comment:
   Could you please describe what is done here?



##########
playground/infrastructure/test_helper.py:
##########
@@ -211,9 +214,23 @@ def test__get_example():
         code="data",
         status=STATUS_UNSPECIFIED,
         tag=Tag(
-            "Name", "MEDIUM", "Description", "False", [""], "--option option", False, 1),
+            "Name", "MEDIUM", "{'kafka': {'topic': {'id': 'dataset', 'dataset': 'dataset'}}}", "{'dataset': {'location': 'GCS', 'format': 'json'}}", "Description", "False", [""], "--option option",

Review Comment:
   I don't think it's a good idea to serialize json to a string tag and back. Could we just extend Example schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org