You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@superset.apache.org by GitBox <gi...@apache.org> on 2020/09/01 00:34:50 UTC

[GitHub] [incubator-superset] bkyryliuk commented on a change in pull request #10605: feat: refractored SQL-based alerting framework

bkyryliuk commented on a change in pull request #10605:
URL: https://github.com/apache/incubator-superset/pull/10605#discussion_r480493154



##########
File path: superset/models/alerts.py
##########
@@ -100,3 +98,105 @@ class AlertLog(Model):
     @property
     def duration(self) -> int:
         return (self.dttm_end - self.dttm_start).total_seconds()
+
+
+# TODO: Currently SQLObservation table will constantly grow with no limit,
+# add some retention restriction or more to a more scalable db e.g.
+# https://github.com/apache/incubator-superset/blob/master/superset/utils/log.py#L32
+class SQLObserver(Model):

Review comment:
       AuditMixinNullable as well

##########
File path: superset/models/alerts.py
##########
@@ -100,3 +98,105 @@ class AlertLog(Model):
     @property
     def duration(self) -> int:
         return (self.dttm_end - self.dttm_start).total_seconds()
+
+
+# TODO: Currently SQLObservation table will constantly grow with no limit,
+# add some retention restriction or more to a more scalable db e.g.
+# https://github.com/apache/incubator-superset/blob/master/superset/utils/log.py#L32
+class SQLObserver(Model):
+    """Runs SQL-based queries for alerts"""
+
+    __tablename__ = "sql_observers"
+
+    id = Column(Integer, primary_key=True)
+    sql = Column(Text, nullable=False)
+
+    @declared_attr
+    def alert_id(self) -> int:
+        return Column(Integer, ForeignKey("alerts.id"), nullable=False)
+
+    @declared_attr
+    def alert(self) -> RelationshipProperty:
+        return relationship(
+            "Alert",
+            foreign_keys=[self.alert_id],
+            backref=backref("sql_observer", cascade="all, delete-orphan"),
+        )
+
+    @declared_attr
+    def database_id(self) -> int:
+        return Column(Integer, ForeignKey("dbs.id"), nullable=False)
+
+    @declared_attr
+    def database(self) -> RelationshipProperty:
+        return relationship(
+            "Database",
+            foreign_keys=[self.database_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    def get_last_observation(self) -> Optional[Any]:
+        observations = (
+            db.session.query(SQLObservation)
+            .filter_by(observer_id=self.id)
+            .order_by(SQLObservation.dttm.desc())
+            .limit(1)
+        )
+        if observations:
+            return observations[0]
+
+        return None
+
+
+class SQLObservation(Model):  # pylint: disable=too-few-public-methods
+    """Keeps track of values retrieved from SQLObservers"""
+
+    __tablename__ = "sql_observations"
+
+    id = Column(Integer, primary_key=True)
+    dttm = Column(DateTime, default=datetime.utcnow, index=True)
+    observer_id = Column(Integer, ForeignKey("sql_observers.id"), nullable=False)
+    observer = relationship(
+        "SQLObserver",
+        foreign_keys=[observer_id],
+        backref=backref("observations", cascade="all, delete-orphan"),
+    )
+    alert_id = Column(Integer, ForeignKey("alerts.id"))
+    alert = relationship(
+        "Alert",
+        foreign_keys=[alert_id],
+        backref=backref("observations", cascade="all, delete-orphan"),
+    )
+    value = Column(Float)
+    error_msg = Column(String(500))
+
+
+class Validator(Model):

Review comment:
       AuditMixinNullable as well

##########
File path: tests/alerts_tests.py
##########
@@ -41,114 +49,267 @@
 def setup_database():
     with app.app_context():
         slice_id = db.session.query(Slice).all()[0].id
-        database_id = utils.get_example_database().id
+        example_database = utils.get_example_database()
+        example_database_id = example_database.id
+        example_database.get_sqla_engine().execute(
+            "CREATE TABLE test_table AS SELECT 1 as first, 2 as second"
+        )
+        example_database.get_sqla_engine().execute(
+            "INSERT INTO test_table (first, second) VALUES (3, 4)"
+        )
 
+        common_data = dict(
+            active=True,
+            crontab="* * * * *",
+            slice_id=slice_id,
+            recipients="recipient1@superset.com",
+            slack_channel="#test_channel",
+        )
         alerts = [
-            Alert(
-                id=1,
-                label="alert_1",
-                active=True,
-                crontab="*/1 * * * *",
+            Alert(**common_data, label="alert_1"),

Review comment:
       it seems like there are too many alerts here, try to keep bare minimum for the needed ones for the test e.g. reuse alerts / observers. 
   maybe create a utility function that will take alerts name, config and observer sql and will create a set of objects for you etc

##########
File path: superset/tasks/schedules.py
##########
@@ -705,7 +707,23 @@ def run_alert_query(
     )
     db.session.commit()
 
-    return None
+
+def validate_observations(alert_id: int, label: str) -> bool:

Review comment:
       add unit test

##########
File path: superset/tasks/alerts/validator.py
##########
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import enum
+import json
+from operator import eq, ge, gt, le, lt, ne
+from typing import Callable
+
+import numpy as np
+
+from superset.exceptions import SupersetException
+from superset.models.alerts import SQLObserver
+
+VALID_ALERT_OPERATORS = {"<", "<=", ">", ">=", "==", "!="}
+
+
+class AlertValidatorType(enum.Enum):
+    not_null = "not null"
+    operator = "operator"
+
+    @classmethod
+    def valid_type(cls, validator_type: str) -> bool:
+        return any(val_type.value == validator_type for val_type in cls)
+
+
+def check_validator(validator_type: str, config: str) -> None:
+    if not AlertValidatorType.valid_type(validator_type):
+        raise SupersetException(
+            f"Error: {validator_type} is not a valid validator type."
+        )
+
+    config_dict = json.loads(config)
+
+    if validator_type == AlertValidatorType.operator.value:
+
+        if not (config_dict.get("op") and config_dict.get("threshold")):
+            raise SupersetException(
+                "Error: Operator Validator needs specified operator and threshold "
+                'values. Add "op" and "threshold" to config.'
+            )
+
+        if not config_dict["op"] in VALID_ALERT_OPERATORS:
+            raise SupersetException(
+                f'Error: {config_dict["op"]} is an invalid operator type. Change '
+                f'the "op" value in the config to one of '
+                f'["<", "<=", ">", ">=", "==", "!="]'
+            )
+
+        if not isinstance(config_dict["threshold"], (int, float)):
+            raise SupersetException(
+                f'Error: {config_dict["threshold"]} is an invalid threshold value.'
+                f' Change the "threshold" value in the config.'
+            )
+
+
+def not_null_validator(
+    observer: SQLObserver, validator_config: str  # pylint: disable=unused-argument
+) -> bool:
+    """Returns True if a SQLObserver's recent observation is not NULL"""
+
+    observation = observer.get_last_observation()
+    # TODO: Validate malformed observations/observations with errors separately
+    if (
+        not observation
+        or observation.error_msg
+        or observation.value in (0, None, np.nan)
+    ):
+        return False
+    return True
+
+
+def operator_validator(observer: SQLObserver, validator_config: str) -> bool:
+    """
+    Returns True if a SQLObserver's recent observation is greater than or equal to
+    the value given in the validator config
+    """
+    operator_functions = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne}

Review comment:
       VALID_ALERT_OPERATORS is the same as  operator_functions.keys
   use only operator_functions const

##########
File path: tests/alerts_tests.py
##########
@@ -41,114 +49,267 @@
 def setup_database():
     with app.app_context():
         slice_id = db.session.query(Slice).all()[0].id
-        database_id = utils.get_example_database().id
+        example_database = utils.get_example_database()
+        example_database_id = example_database.id
+        example_database.get_sqla_engine().execute(
+            "CREATE TABLE test_table AS SELECT 1 as first, 2 as second"
+        )
+        example_database.get_sqla_engine().execute(
+            "INSERT INTO test_table (first, second) VALUES (3, 4)"
+        )
 
+        common_data = dict(
+            active=True,
+            crontab="* * * * *",
+            slice_id=slice_id,
+            recipients="recipient1@superset.com",
+            slack_channel="#test_channel",
+        )
         alerts = [
-            Alert(
-                id=1,
-                label="alert_1",
-                active=True,
-                crontab="*/1 * * * *",
+            Alert(**common_data, label="alert_1"),
+            Alert(**common_data, label="alert_2"),
+            Alert(**common_data, label="alert_3"),
+            Alert(**common_data, label="alert_4"),
+            Alert(crontab="* * * * *", active=False, label="alert_5"),
+            Alert(**common_data, label="alert_6"),
+            Alert(**common_data, label="alert_7"),
+            Alert(**common_data, label="alert_8"),
+            Alert(**common_data, label="alert_9"),
+            Alert(**common_data, label="alert_10"),
+        ]
+
+        db.session.bulk_save_objects(alerts)
+
+        observers = [
+            SQLObserver(
                 sql="SELECT 0",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+                alert_id=db.session.query(Alert).filter_by(label="alert_1").one().id,
+                database_id=example_database_id,
+            ),
+            SQLObserver(
+                sql="SELECT first FROM test_table WHERE first = -1",
+                alert_id=db.session.query(Alert).filter_by(label="alert_2").one().id,
+                database_id=example_database_id,
+            ),
+            SQLObserver(
+                sql="$%^&",
+                alert_id=db.session.query(Alert).filter_by(label="alert_3").one().id,
+                database_id=example_database_id,
             ),
-            Alert(
-                id=2,
-                label="alert_2",
-                active=True,
-                crontab="*/1 * * * *",
+            SQLObserver(
                 sql="SELECT 55",
-                alert_type="email",
-                slice_id=slice_id,
-                recipients="recipient1@superset.com",
-                slack_channel="#test_channel",
-                database_id=database_id,
+                alert_id=db.session.query(Alert).filter_by(label="alert_4").one().id,
+                database_id=example_database_id,
             ),
-            Alert(
-                id=3,
-                label="alert_3",
-                active=False,
-                crontab="*/1 * * * *",
-                sql="UPDATE 55",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+            SQLObserver(
+                sql="SELECT 'test_string' as string_value",
+                alert_id=db.session.query(Alert).filter_by(label="alert_5").one().id,
+                database_id=example_database_id,
+            ),
+            SQLObserver(
+                sql="SELECT null as null_result",
+                alert_id=db.session.query(Alert).filter_by(label="alert_6").one().id,
+                database_id=example_database_id,
+            ),
+            SQLObserver(
+                sql="SELECT 30.0 as wage",
+                alert_id=db.session.query(Alert).filter_by(label="alert_7").one().id,
+                database_id=example_database_id,
+            ),
+            SQLObserver(
+                sql="SELECT first FROM test_table",
+                alert_id=db.session.query(Alert).filter_by(label="alert_8").one().id,
+                database_id=example_database_id,
+            ),
+            SQLObserver(
+                sql="SELECT first, second FROM test_table WHERE first = 1",
+                alert_id=db.session.query(Alert).filter_by(label="alert_9").one().id,
+                database_id=example_database_id,
             ),
-            Alert(id=4, active=False, label="alert_4", database_id=-1),
-            Alert(id=5, active=False, label="alert_5", database_id=database_id),
         ]
 
-        db.session.bulk_save_objects(alerts)
-        db.session.commit()
+        db.session.bulk_save_objects(observers)
         yield db.session
 
+        db.session.query(SQLObservation).delete()
+        db.session.query(SQLObserver).delete()
+        db.session.query(Validator).delete()
         db.session.query(AlertLog).delete()
         db.session.query(Alert).delete()
 
 
+def test_alert_observer(setup_database):
+    dbsession = setup_database
+
+    # Test SQLObserver with int SQL return
+    alert4 = dbsession.query(Alert).filter_by(label="alert_4").one()

Review comment:
       I would prefer a bit different setup as mentioned before:
   ```
   NOTNULLCONFIG="""
     "type": "notnull"
   """
   alert = create_alert(sql='....', config=NOTNULLCONFIG)
   ...
   ```
   
   and create alert would create alert, observer and validator 
   
   It will help to keep sql text & config closer to the test code & make it more readable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org
For additional commands, e-mail: notifications-help@superset.apache.org