You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@superset.apache.org by GitBox <gi...@apache.org> on 2021/11/09 15:49:10 UTC

[GitHub] [superset] betodealmeida commented on a change in pull request #17337: feat: Adds a key-value store endpoint for Superset

betodealmeida commented on a change in pull request #17337:
URL: https://github.com/apache/superset/pull/17337#discussion_r745735010



##########
File path: superset/config.py
##########
@@ -732,6 +732,10 @@ class CeleryConfig:  # pylint: disable=too-few-public-methods
             "task": "reports.prune_log",
             "schedule": crontab(minute=0, hour=0),
         },
+        "key_value.cleanup": {
+            "task": "key_value.cleanup",
+            "schedule": timedelta(seconds=10),

Review comment:
       How long does it take to run the jobs? 10 seconds looks a bit aggressive to me, maybe we could have a default of ~1 minute to be conservative on resources?

##########
File path: superset/key_value/api.py
##########
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from flask import g, request, Response
+from flask_appbuilder.api import expose, permission_name, protect, safe
+from flask_appbuilder.models.sqla.interface import SQLAInterface
+
+from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod
+from superset.extensions import event_logger
+from superset.key_value.commands.create import CreateKeyValueCommand
+from superset.key_value.commands.delete import DeleteKeyValueCommand
+from superset.key_value.commands.exceptions import (
+    KeyValueCreateFailedError,
+    KeyValueDeleteFailedError,
+    KeyValueGetFailedError,
+    KeyValueUpdateFailedError,
+)
+from superset.key_value.commands.get import GetKeyValueCommand
+from superset.key_value.commands.update import UpdateKeyValueCommand
+from superset.key_value.dao import KeyValueDAO
+from superset.key_value.schemas import KeyValueSchema
+from superset.models.key_value import KeyValue
+from superset.views.base_api import BaseSupersetModelRestApi, statsd_metrics
+
+logger = logging.getLogger(__name__)
+
+
+class KeyValueRestApi(BaseSupersetModelRestApi):
+    datamodel = SQLAInterface(KeyValue)
+    schema = KeyValueSchema()
+    class_permission_name = "KeyValue"
+    resource_name = "key_value_store"
+    method_permission_name = MODEL_API_RW_METHOD_PERMISSION_MAP
+    include_route_methods = {
+        RouteMethod.POST,
+        RouteMethod.PUT,
+        RouteMethod.GET,
+        RouteMethod.DELETE,
+    }
+    allow_browser_login = True
+    openapi_spec_tag = "Key Value Store"
+
+    @expose("/", methods=["POST"])
+    @protect()
+    @safe
+    @statsd_metrics
+    @event_logger.log_this_with_context(
+        action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.post",
+        log_to_statsd=False,
+    )
+    def post(self) -> Response:
+        """Stores a new value.
+        ---
+        post:
+          description: >-
+            Stores a new value.
+          requestBody:
+            description: Key value schema
+            required: true
+            content:
+              application/json:
+                schema:
+                    type: object
+                    properties:
+                      value:
+                        type: string
+                        description: Any type of JSON supported value.
+                        required: true
+                      duration_ms:
+                        type: number
+                        description: The duration of the value on the key store. If no duration is specified the value won't expire.
+                        required: false
+                        default: null
+                      reset_duration_on_retrieval:
+                        type: boolean
+                        description: If the duration should be reset when the value is retrieved. This is useful if you wish to expire unused values but keep the ones that are actively retrieved.
+                        required: false
+                        default: true
+          responses:
+            201:
+              description: The value was stored successfully. It returns the key to retrieve the value.
+              content:
+                application/json:
+                  schema:
+                    type: object
+                    properties:
+                      key:
+                        type: string
+                        description: The key to retrieve the value.
+            400:
+              $ref: '#/components/responses/400'
+            401:
+              $ref: '#/components/responses/401'
+            422:
+              $ref: '#/components/responses/422'
+            500:
+              $ref: '#/components/responses/500'
+        """
+        if not request.is_json:
+            return self.response_400(message="Request is not JSON")

Review comment:
       Can we make the error responses comply with SIP-40? https://github.com/apache/superset/issues/9194
   
   We should return a JSON payload with more detail instead of a string whenever an error occurs.

##########
File path: tests/integration_tests/key_value/api_tests.py
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# isort:skip_file
+import json
+
+from superset import db
+from superset.models.key_value import KeyValue
+from tests.integration_tests.base_tests import SupersetTestCase
+
+duration_ms = 10000
+reset_duration_on_retrieval = True
+value = "test"
+
+
+class KeyValueTests(SupersetTestCase):
+    def post(self):
+        payload = {
+            "duration_ms": duration_ms,
+            "reset_duration_on_retrieval": reset_duration_on_retrieval,
+            "value": value,
+        }
+        resp = self.client.post("api/v1/key_value_store/", json=payload)
+        data = json.loads(resp.data.decode("utf-8"))
+        return data.get("key")
+
+    def setUp(self):
+        self.login(username="admin")
+        rows = db.session.query(KeyValue).all()
+        for row in rows:
+            db.session.delete(row)
+        db.session.commit()
+
+    def test_post(self):
+        key = self.post()
+        result = db.session.query(KeyValue).first()
+        self.assertEqual(key, str(result.key))

Review comment:
       Nit: `pytest` works better with naked asserts, eg:
   
   ```suggestion
           assert key == str(result.key)
   ```
   
   https://docs.pytest.org/en/latest/how-to/assert.html#assert

##########
File path: superset/key_value/commands/cleanup.py
##########
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from datetime import datetime, timedelta
+
+from superset.commands.base import BaseCommand
+from superset.models.key_value import KeyValue
+from superset.key_value.utils import is_expired
+from superset.utils.celery import session_scope
+
+logger = logging.getLogger(__name__)
+
+
+class CleanupCommand(BaseCommand):
+    """
+    Expiration cleanup command.
+    """
+
+    def __init__(self, worker_context: bool = True):
+        self._worker_context = worker_context
+
+    def run(self) -> None:
+        logger.info("Key value store cleanup starting")
+        with session_scope(nullpool=True) as session:
+           for keyValue in session.query(KeyValue).all():
+                if is_expired(keyValue):

Review comment:
       It would be much better for performance to implement the logic of `is_expired` in SQL instead of Python, specially because we already have a query on line 39.

##########
File path: tests/integration_tests/key_value/api_tests.py
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# isort:skip_file
+import json
+
+from superset import db
+from superset.models.key_value import KeyValue
+from tests.integration_tests.base_tests import SupersetTestCase
+
+duration_ms = 10000
+reset_duration_on_retrieval = True
+value = "test"
+
+
+class KeyValueTests(SupersetTestCase):
+    def post(self):
+        payload = {
+            "duration_ms": duration_ms,
+            "reset_duration_on_retrieval": reset_duration_on_retrieval,
+            "value": value,
+        }
+        resp = self.client.post("api/v1/key_value_store/", json=payload)
+        data = json.loads(resp.data.decode("utf-8"))
+        return data.get("key")
+
+    def setUp(self):
+        self.login(username="admin")
+        rows = db.session.query(KeyValue).all()
+        for row in rows:
+            db.session.delete(row)
+        db.session.commit()
+
+    def test_post(self):
+        key = self.post()
+        result = db.session.query(KeyValue).first()
+        self.assertEqual(key, str(result.key))
+        self.assertEqual(duration_ms, result.duration_ms)
+        self.assertEqual(
+            reset_duration_on_retrieval, result.reset_duration_on_retrieval
+        )
+        self.assertEqual(value, result.value)
+
+    def test_get_not_found(self):
+        key = self.post()
+        resp = self.client.get(key)
+        self.assertEqual(404, resp.status_code)
+
+    def test_get(self):
+        key = self.post()
+        resp = self.client.get(f"api/v1/key_value_store/{key}/")
+        self.assertEqual(resp.status_code, 200)
+        data = json.loads(resp.data.decode("utf-8"))
+        self.assertEqual(duration_ms, data.get("duration_ms"))
+        self.assertEqual(
+            reset_duration_on_retrieval, data.get("reset_duration_on_retrieval")
+        )
+        self.assertEqual(value, data.get("value"))
+
+    def test_get_retrieved_on(self):

Review comment:
       Can you use `freezegun` and add a unit test checking that `retrived_on` is set to an expected value?
   
   https://github.com/apache/superset/blob/4dc859f89e5668ed3f94cd1ef0532a301a3ab85a/tests/integration_tests/reports/scheduler_tests.py#L45

##########
File path: superset/key_value/api.py
##########
@@ -0,0 +1,324 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from flask import g, request, Response
+from flask_appbuilder.api import expose, permission_name, protect, safe
+from flask_appbuilder.models.sqla.interface import SQLAInterface
+
+from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod
+from superset.extensions import event_logger
+from superset.key_value.commands.create import CreateKeyValueCommand
+from superset.key_value.commands.delete import DeleteKeyValueCommand
+from superset.key_value.commands.exceptions import (
+    KeyValueCreateFailedError,
+    KeyValueDeleteFailedError,
+    KeyValueGetFailedError,
+    KeyValueUpdateFailedError,
+)
+from superset.key_value.commands.get import GetKeyValueCommand
+from superset.key_value.commands.update import UpdateKeyValueCommand
+from superset.key_value.dao import KeyValueDAO
+from superset.key_value.schemas import KeyValueSchema
+from superset.models.key_value import KeyValue
+from superset.views.base_api import BaseSupersetModelRestApi, statsd_metrics
+
+logger = logging.getLogger(__name__)
+
+
+class KeyValueRestApi(BaseSupersetModelRestApi):
+    datamodel = SQLAInterface(KeyValue)
+    schema = KeyValueSchema()
+    class_permission_name = "KeyValue"
+    resource_name = "key_value_store"
+    method_permission_name = MODEL_API_RW_METHOD_PERMISSION_MAP
+    include_route_methods = {
+        RouteMethod.POST,
+        RouteMethod.PUT,
+        RouteMethod.GET,
+        RouteMethod.DELETE,
+    }
+    allow_browser_login = True
+    openapi_spec_tag = "Key Value Store"
+
+    @expose("/", methods=["POST"])
+    @protect()
+    @safe
+    @statsd_metrics
+    @event_logger.log_this_with_context(
+        action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.post",
+        log_to_statsd=False,
+    )
+    def post(self) -> Response:
+        """Stores a new value.
+        ---
+        post:
+          description: >-
+            Stores a new value.
+          requestBody:
+            description: Key value schema
+            required: true
+            content:
+              application/json:
+                schema:
+                    type: object
+                    properties:
+                      value:
+                        type: string
+                        description: Any type of JSON supported value.
+                        required: true
+                      duration_ms:
+                        type: number
+                        description: The duration of the value on the key store. If no duration is specified the value won't expire.
+                        required: false
+                        default: null
+                      reset_duration_on_retrieval:
+                        type: boolean
+                        description: If the duration should be reset when the value is retrieved. This is useful if you wish to expire unused values but keep the ones that are actively retrieved.
+                        required: false
+                        default: true
+          responses:
+            201:
+              description: The value was stored successfully. It returns the key to retrieve the value.
+              content:
+                application/json:
+                  schema:
+                    type: object
+                    properties:
+                      key:
+                        type: string
+                        description: The key to retrieve the value.
+            400:
+              $ref: '#/components/responses/400'
+            401:
+              $ref: '#/components/responses/401'
+            422:
+              $ref: '#/components/responses/422'
+            500:
+              $ref: '#/components/responses/500'
+        """
+        if not request.is_json:
+            return self.response_400(message="Request is not JSON")
+        try:
+            item = self.schema.load(request.json)
+            key = CreateKeyValueCommand(g.user, item).run()
+            return self.response(201, key=key)
+        except KeyValueCreateFailedError as ex:
+            logger.error(
+                "Error creating value %s: %s",
+                self.__class__.__name__,
+                str(ex),
+                exc_info=True,
+            )
+            return self.response_422(message=str(ex))

Review comment:
       Same here an in other methods below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org
For additional commands, e-mail: notifications-help@superset.apache.org