You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/06/30 03:09:43 UTC

[incubator-sdap-nexus] 02/03: job prioritization

This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-473a
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit a2c1f109b5cbeac458c0c86fa6643264783e1a39
Author: skorper <st...@gmail.com>
AuthorDate: Thu Jun 29 19:43:36 2023 -0700

    job prioritization
---
 analysis/setup.py                                  | 24 ++++++++++++----------
 analysis/webservice/algorithms_spark/Matchup.py    | 10 +++++++++
 analysis/webservice/config/scheduler.xml           | 10 +++++++++
 .../app_builders/SparkContextBuilder.py            |  9 +++++++-
 4 files changed, 41 insertions(+), 12 deletions(-)

diff --git a/analysis/setup.py b/analysis/setup.py
index 99cd707..c09fe4a 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -17,18 +17,19 @@
 import setuptools
 from subprocess import check_call, CalledProcessError
 
-with open('../VERSION.txt', 'r') as f:
-    __version__ = f.read()
+# with open('../VERSION.txt', 'r') as f:
+#     __version__ = f.read()
+__version__ = '1.1.0a3'
 
 
-try:
-    check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
-except (CalledProcessError, IOError) as e:
-    print('Failed install with mamba; falling back to conda')
-    try:
-        check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
-    except (CalledProcessError, IOError) as e:
-        raise EnvironmentError("Error installing conda packages", e)
+# try:
+#     check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
+# except (CalledProcessError, IOError) as e:
+#     print('Failed install with mamba; falling back to conda')
+#     try:
+#         check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
+#     except (CalledProcessError, IOError) as e:
+#         raise EnvironmentError("Error installing conda packages", e)
 
 
 setuptools.setup(
@@ -60,7 +61,8 @@ setuptools.setup(
             'config/algorithms.ini',
             'apidocs/index.html',
             'apidocs/openapi.yml',
-            'apidocs/dataset-populate.js'
+            'apidocs/dataset-populate.js',
+            'config/scheduler.xml'
         ],
         'webservice.algorithms.doms': ['domsconfig.ini.default'],
     },
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 4fb40cf..30d8261 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -44,6 +44,8 @@ from webservice.webmodel.NexusExecutionResults import ExecutionStatus
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
 
+LARGE_JOB_THRESHOLD = 4000
+
 
 class Schema:
     def __init__(self):
@@ -234,6 +236,11 @@ class Matchup(NexusCalcSparkTornadoHandler):
                depth_min, depth_max, time_tolerance, radius_tolerance, \
                platforms, match_once, result_size_limit, prioritize_distance
 
+    def get_job_pool(self, tile_ids):
+        if len(tile_ids) > LARGE_JOB_THRESHOLD:
+            return 'large'
+        return 'small'
+
     def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name,
                    secondary_ds_names, parameter_s, start_time, end_time, depth_min,
                    depth_max, time_tolerance, radius_tolerance, platforms, match_once,
@@ -241,8 +248,11 @@ class Matchup(NexusCalcSparkTornadoHandler):
         # Call spark_matchup
         self.log.debug("Calling Spark Driver")
 
+        job_priority = self.get_job_pool(tile_ids)
+
         try:
             self._sc.setJobGroup(execution_id, execution_id)
+            self._sc.setLocalProperty('spark.scheduler.pool', job_priority)
             spark_result = spark_matchup_driver(
                 tile_ids, wkt.dumps(bounding_polygon),
                 primary_ds_name,
diff --git a/analysis/webservice/config/scheduler.xml b/analysis/webservice/config/scheduler.xml
new file mode 100644
index 0000000..3016017
--- /dev/null
+++ b/analysis/webservice/config/scheduler.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0"?>
+<allocations>
+  <pool name="small">
+    <weight>1000</weight>
+    <minShare>1</minShare>
+  </pool>
+  <pool name="large">
+    <weight>1</weight>
+  </pool>
+</allocations>
\ No newline at end of file
diff --git a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
index ee3fd2f..5daf279 100644
--- a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import pkg_resources
 
 
 class SparkContextBuilder:
@@ -25,7 +26,13 @@ class SparkContextBuilder:
         if cls.spark_context is None:
             from pyspark.sql import SparkSession
 
-            spark = SparkSession.builder.appName("nexus-analysis").getOrCreate()
+            scheduler_path = pkg_resources.resource_filename('webservice', "config/scheduler.xml")
+
+            spark = SparkSession.builder.appName("nexus-analysis").config(
+                "spark.scheduler.allocation.file", scheduler_path
+            ).config(
+                "spark.scheduler.mode", "FAIR"
+            ).getOrCreate()
             cls.spark_context = spark.sparkContext
 
         return cls.spark_context