You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/11 12:22:20 UTC
[GitHub] [airflow] wojsamjan commented on a change in pull request #21267: Dataproc metastore assets
wojsamjan commented on a change in pull request #21267:
URL: https://github.com/apache/airflow/pull/21267#discussion_r804600045
##########
File path: airflow/providers/google/cloud/operators/dataproc_metastore.py
##########
@@ -29,13 +30,216 @@
from googleapiclient.errors import HttpError
from airflow import AirflowException
-from airflow.models import BaseOperator
+from airflow.models import BaseOperator, BaseOperatorLink
+from airflow.models.xcom import XCom
from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook
+from airflow.providers.google.common.links.storage import StorageLink
if TYPE_CHECKING:
from airflow.utils.context import Context
+BASE_LINK = "https://console.cloud.google.com"
+METASTORE_BASE_LINK = BASE_LINK + "/dataproc/metastore/services/{region}/{service_id}"
+METASTORE_BACKUP_LINK = METASTORE_BASE_LINK + "/backups/{backup_id}?project={project_id}"
+METASTORE_BACKUPS_LINK = METASTORE_BASE_LINK + "/backuprestore?project={project_id}"
+METASTORE_EXPORT_LINK = METASTORE_BASE_LINK + "/importexport?project={project_id}"
+METASTORE_IMPORT_LINK = METASTORE_BASE_LINK + "/imports/{import_id}?project={project_id}"
+METASTORE_SERVICE_LINK = METASTORE_BASE_LINK + "/config?project={project_id}"
+
+
+class DataprocMetastoreBackupLink(BaseOperatorLink):
+ """Helper class for constructing Dataproc Metastore Backup link"""
+
+ name = "Dataproc Metastore Backup"
+ key = "backup_conf"
+
+ @staticmethod
+ def persist(context: "Context", task_instance: "DataprocMetastoreCreateBackupOperator"):
+ task_instance.xcom_push(
+ context=context,
+ key=DataprocMetastoreBackupLink.key,
+ value={
+ "region": task_instance.region,
+ "service_id": task_instance.service_id,
+ "backup_id": task_instance.backup_id,
+ "project_id": task_instance.project_id,
+ },
+ )
+
+ def get_link(self, operator: BaseOperator, dttm: datetime):
+ backup_conf = XCom.get_one(
+ dag_id=operator.dag.dag_id,
+ task_id=operator.task_id,
+ execution_date=dttm,
+ key=DataprocMetastoreBackupLink.key,
+ )
+ return (
+ METASTORE_BACKUP_LINK.format(
+ region=backup_conf["region"],
+ service_id=backup_conf["service_id"],
+ backup_id=backup_conf["backup_id"],
+ project_id=backup_conf["project_id"],
+ )
+ if backup_conf
+ else ""
+ )
Review comment:
We have just had some discussion about it. We think we can improve it to have more generic solution as you suggest. I will implement those changes, so we can later check is it a good direction for further services as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org