You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sdap.apache.org by "skorper (via GitHub)" <gi...@apache.org> on 2023/05/10 00:59:44 UTC

[GitHub] [incubator-sdap-nexus] skorper opened a new pull request, #249: SDAP-455: Large Job Tracking

skorper opened a new pull request, #249:
URL: https://github.com/apache/incubator-sdap-nexus/pull/249

   Large job tracking work so far. This includes the following:
   
   - Matchup endpoint exits after spark submission
   - New /job endpoint for getting job status
     - Job statuses are `running` `completed` `failed` and `cancelled`
   - New /job/cancel endpoint for cancelling job
     - Cancel using `cancelJobGroup` Spark function call. This worked fine locally but with only one Spark worker. Will need to try this out on a larger scale with more jobs running at once. 
   - Matchup endpoint redirects to /job endpoint 
   - Updated Swagger UI (openapi spec)
   
   This has been tested locally but will need to be tested with more Spark workers. Work still needed:
   
   - Job prioritization
   - % completed info to job status


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@sdap.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-sdap-nexus] RKuttruff commented on a diff in pull request #249: SDAP-455: Large Job Tracking

Posted by "RKuttruff (via GitHub)" <gi...@apache.org>.
RKuttruff commented on code in PR #249:
URL: https://github.com/apache/incubator-sdap-nexus/pull/249#discussion_r1279944890


##########
analysis/webservice/algorithms_spark/Matchup.py:
##########
@@ -283,37 +329,60 @@ def calc(self, request, **args):
         if depth_max is not None:
             args["depthMax"] = float(depth_max)
 
-        total_keys = len(list(spark_result.keys()))
-        total_values = sum(len(v) for v in spark_result.values())
-        details = {
-            "timeToComplete": int((end - start).total_seconds()),
-            "numSecondaryMatched": total_values,
-            "numPrimaryMatched": total_keys
-        }
 
-        matches = Matchup.convert_to_matches(spark_result)
+        with ResultsStorage(self.config) as resultsStorage:
+            execution_id = str(resultsStorage.insertInitialExecution(
+                params=args,
+                startTime=start,
+                status=ExecutionStatus.RUNNING.value
+            ))
 
-        def do_result_insert():
-            with ResultsStorage(self.config) as storage:
-                storage.insertResults(results=matches, params=args, stats=details,
-                                      startTime=start, completeTime=end, userEmail="",
-                                      execution_id=execution_id)
+        self.log.debug("Querying for tiles in search domain")
+        # Get tile ids in box
+        tile_ids = [tile.tile_id for tile in
+                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+                                                             start_seconds_from_epoch, end_seconds_from_epoch,
+                                                             fetch_data=False, fl='id',
+                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                   'tile_min_lat asc'], rows=5000)]
 
-        threading.Thread(target=do_result_insert).start()
+        self.log.info('Found %s tile_ids', len(tile_ids))
 
-        # Get only the first "result_size_limit" results
-        # '0' means returns everything
-        if result_size_limit > 0:
-            return_matches = matches[0:result_size_limit]
-        else:
-            return_matches = matches
+        if not tile_ids:
+            # There are no matching tiles
+            end = datetime.utcnow()
+            with ResultsStorage(self.config) as storage:
+                storage.updateExecution(
+                    uuid.UUID(execution_id),
+                    completeTime=end,
+                    status=ExecutionStatus.FAILED.value,
+                    message='No tiles matched the provided domain'
+                )
 
-        result = DomsQueryResults(results=return_matches, args=args,
-                                  details=details, bounds=None,
-                                  count=len(matches), computeOptions=None,
-                                  executionId=execution_id)
+        # Start async processing with Spark. Do not wait for response
+        # before returning to user.
+        tornado_io_loop.run_in_executor(request.requestHandler.executor, functools.partial(
+            self.async_calc,
+            execution_id=execution_id,
+            tile_ids=tile_ids,
+            bounding_polygon=bounding_polygon,
+            primary_ds_name=primary_ds_name,
+            secondary_ds_names=secondary_ds_names,
+            parameter_s=parameter_s,
+            start_time=start_time,
+            end_time=end_time,
+            depth_min=depth_min,
+            depth_max=depth_max,
+            time_tolerance=time_tolerance,
+            radius_tolerance=radius_tolerance,
+            platforms=platforms,
+            match_once=match_once,
+            result_size_limit=result_size_limit,
+            start=start
+        ))

Review Comment:
   `prioritize_distance` should be passed here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@sdap.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-sdap-nexus] RKuttruff commented on a diff in pull request #249: SDAP-455: Large Job Tracking

Posted by "RKuttruff (via GitHub)" <gi...@apache.org>.
RKuttruff commented on code in PR #249:
URL: https://github.com/apache/incubator-sdap-nexus/pull/249#discussion_r1196812926


##########
analysis/webservice/algorithms/doms/ResultsStorage.py:
##########
@@ -106,24 +106,39 @@ class ResultsStorage(AbstractResultsContainer):
     def __init__(self, config=None):
         AbstractResultsContainer.__init__(self, config)
 
-    def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
-        self._log.info('Beginning results write')
+    def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None):
+        """
+        Initial insert into database for CDMS matchup request. This
+        populates the execution and params table.
+        """
         if isinstance(execution_id, str):
             execution_id = uuid.UUID(execution_id)
 
-        execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+        execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status)
         self.__insertParams(execution_id, params)
-        self.__insertStats(execution_id, stats)
-        self.__insertResults(execution_id, results)
-        self._log.info('Results write finished')
         return execution_id
 
-    def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+    def updateExecution(self, execution_id, completeTime, status, message, stats, results):
+        if stats:
+            self.__insertStats(execution_id, stats)
+        if results:
+            self.__insertResults(execution_id, results)
+        self.__updateExecution(execution_id, completeTime, status, message)
+
+    def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status):
+        """
+        Insert new entry into execution table
+        """
         if execution_id is None:
             execution_id = uuid.uuid4()
 
-        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
-        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)"
+        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status))
+        return execution_id

Review Comment:
   Would this require an update to the table's [initializer](https://github.com/apache/incubator-sdap-nexus/blob/d0c48eb62405672fa3c2435daa7a1142142a2f60/analysis/webservice/algorithms/doms/DomsInitialization.py#L115-L126)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@sdap.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-sdap-nexus] skorper merged pull request #249: SDAP-455: Large Job Tracking

Posted by "skorper (via GitHub)" <gi...@apache.org>.
skorper merged PR #249:
URL: https://github.com/apache/incubator-sdap-nexus/pull/249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@sdap.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-sdap-nexus] skorper commented on a diff in pull request #249: SDAP-455: Large Job Tracking

Posted by "skorper (via GitHub)" <gi...@apache.org>.
skorper commented on code in PR #249:
URL: https://github.com/apache/incubator-sdap-nexus/pull/249#discussion_r1279859712


##########
analysis/webservice/algorithms/doms/ResultsStorage.py:
##########
@@ -106,24 +106,39 @@ class ResultsStorage(AbstractResultsContainer):
     def __init__(self, config=None):
         AbstractResultsContainer.__init__(self, config)
 
-    def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
-        self._log.info('Beginning results write')
+    def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None):
+        """
+        Initial insert into database for CDMS matchup request. This
+        populates the execution and params table.
+        """
         if isinstance(execution_id, str):
             execution_id = uuid.UUID(execution_id)
 
-        execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
+        execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status)
         self.__insertParams(execution_id, params)
-        self.__insertStats(execution_id, stats)
-        self.__insertResults(execution_id, results)
-        self._log.info('Results write finished')
         return execution_id
 
-    def insertExecution(self, execution_id, startTime, completeTime, userEmail):
+    def updateExecution(self, execution_id, completeTime, status, message, stats, results):
+        if stats:
+            self.__insertStats(execution_id, stats)
+        if results:
+            self.__insertResults(execution_id, results)
+        self.__updateExecution(execution_id, completeTime, status, message)
+
+    def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status):
+        """
+        Insert new entry into execution table
+        """
         if execution_id is None:
             execution_id = uuid.uuid4()
 
-        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
-        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
+        cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)"
+        self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status))
+        return execution_id

Review Comment:
   Done



##########
analysis/webservice/webmodel/NexusExecutionResults.py:
##########
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from enum import Enum
+
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+class ExecutionStatus(Enum):
+    RUNNING = 'running'
+    SUCCESS = 'success'
+    FAILED = 'failed'
+    CANCELLED = 'cancelled'
+
+
+def construct_job_status(job_state, created, updated, execution_id, params, host, message=''):
+    return {
+        'status': job_state.value,
+        'message': message,
+        'createdAt': created,
+        'updatedAt': updated,
+        'links': [{
+            'href': f'{host}/job?id={execution_id}',
+            'title': 'The current page',
+            'type': 'application/json',
+            'rel': 'self'
+        }],
+        'params': params,
+        'jobID': execution_id
+    }
+
+
+def construct_done(status, created, completed, execution_id, params, host):
+    job_body = construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host
+    )
+
+    # Construct urls
+    formats = [
+        'CSV',
+        'JSON',
+        'NETCDF'
+    ]
+    data_links = [{
+        'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
+        'title': 'Download results',
+        'rel': 'data'
+    } for output_format in formats]

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@sdap.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-sdap-nexus] RKuttruff commented on a diff in pull request #249: SDAP-455: Large Job Tracking

Posted by "RKuttruff (via GitHub)" <gi...@apache.org>.
RKuttruff commented on code in PR #249:
URL: https://github.com/apache/incubator-sdap-nexus/pull/249#discussion_r1198061233


##########
analysis/webservice/webmodel/NexusExecutionResults.py:
##########
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from enum import Enum
+
+
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+class ExecutionStatus(Enum):
+    RUNNING = 'running'
+    SUCCESS = 'success'
+    FAILED = 'failed'
+    CANCELLED = 'cancelled'
+
+
+def construct_job_status(job_state, created, updated, execution_id, params, host, message=''):
+    return {
+        'status': job_state.value,
+        'message': message,
+        'createdAt': created,
+        'updatedAt': updated,
+        'links': [{
+            'href': f'{host}/job?id={execution_id}',
+            'title': 'The current page',
+            'type': 'application/json',
+            'rel': 'self'
+        }],
+        'params': params,
+        'jobID': execution_id
+    }
+
+
+def construct_done(status, created, completed, execution_id, params, host):
+    job_body = construct_job_status(
+        status,
+        created,
+        completed,
+        execution_id,
+        params,
+        host
+    )
+
+    # Construct urls
+    formats = [
+        'CSV',
+        'JSON',
+        'NETCDF'
+    ]
+    data_links = [{
+        'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}',
+        'title': 'Download results',
+        'rel': 'data'
+    } for output_format in formats]

Review Comment:
   It may be a good idea to add a type field to these link objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@sdap.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org