You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by sk...@apache.org on 2023/06/30 02:43:50 UTC
[incubator-sdap-nexus] 04/04: job prioritization
This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch SDAP-473
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit a9a9d1e63746c5ca54c4bda4f18a1879e8ed3e5e
Author: skorper <st...@gmail.com>
AuthorDate: Thu Jun 29 19:43:36 2023 -0700
job prioritization
---
analysis/setup.py | 24 ++++++++++++----------
analysis/webservice/algorithms_spark/Matchup.py | 10 +++++++++
analysis/webservice/config/scheduler.xml | 10 +++++++++
.../app_builders/SparkContextBuilder.py | 9 +++++++-
4 files changed, 41 insertions(+), 12 deletions(-)
diff --git a/analysis/setup.py b/analysis/setup.py
index 99cd707..c09fe4a 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -17,18 +17,19 @@
import setuptools
from subprocess import check_call, CalledProcessError
-with open('../VERSION.txt', 'r') as f:
- __version__ = f.read()
+# with open('../VERSION.txt', 'r') as f:
+# __version__ = f.read()
+__version__ = '1.1.0a3'
-try:
- check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
-except (CalledProcessError, IOError) as e:
- print('Failed install with mamba; falling back to conda')
- try:
- check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
- except (CalledProcessError, IOError) as e:
- raise EnvironmentError("Error installing conda packages", e)
+# try:
+# check_call(['mamba', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
+# except (CalledProcessError, IOError) as e:
+# print('Failed install with mamba; falling back to conda')
+# try:
+# check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt'])
+# except (CalledProcessError, IOError) as e:
+# raise EnvironmentError("Error installing conda packages", e)
setuptools.setup(
@@ -60,7 +61,8 @@ setuptools.setup(
'config/algorithms.ini',
'apidocs/index.html',
'apidocs/openapi.yml',
- 'apidocs/dataset-populate.js'
+ 'apidocs/dataset-populate.js',
+ 'config/scheduler.xml'
],
'webservice.algorithms.doms': ['domsconfig.ini.default'],
},
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
index 4fb40cf..30d8261 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -44,6 +44,8 @@ from webservice.webmodel.NexusExecutionResults import ExecutionStatus
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+LARGE_JOB_THRESHOLD = 4000
+
class Schema:
def __init__(self):
@@ -234,6 +236,11 @@ class Matchup(NexusCalcSparkTornadoHandler):
depth_min, depth_max, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit, prioritize_distance
+ def get_job_pool(self, tile_ids):
+ if len(tile_ids) > LARGE_JOB_THRESHOLD:
+ return 'large'
+ return 'small'
+
def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name,
secondary_ds_names, parameter_s, start_time, end_time, depth_min,
depth_max, time_tolerance, radius_tolerance, platforms, match_once,
@@ -241,8 +248,11 @@ class Matchup(NexusCalcSparkTornadoHandler):
# Call spark_matchup
self.log.debug("Calling Spark Driver")
+ job_priority = self.get_job_pool(tile_ids)
+
try:
self._sc.setJobGroup(execution_id, execution_id)
+ self._sc.setLocalProperty('spark.scheduler.pool', job_priority)
spark_result = spark_matchup_driver(
tile_ids, wkt.dumps(bounding_polygon),
primary_ds_name,
diff --git a/analysis/webservice/config/scheduler.xml b/analysis/webservice/config/scheduler.xml
new file mode 100644
index 0000000..3016017
--- /dev/null
+++ b/analysis/webservice/config/scheduler.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0"?>
+<allocations>
+ <pool name="small">
+ <weight>1000</weight>
+ <minShare>1</minShare>
+ </pool>
+ <pool name="large">
+ <weight>1</weight>
+ </pool>
+</allocations>
\ No newline at end of file
diff --git a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
index ee3fd2f..5daf279 100644
--- a/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/SparkContextBuilder.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import pkg_resources
class SparkContextBuilder:
@@ -25,7 +26,13 @@ class SparkContextBuilder:
if cls.spark_context is None:
from pyspark.sql import SparkSession
- spark = SparkSession.builder.appName("nexus-analysis").getOrCreate()
+ scheduler_path = pkg_resources.resource_filename('webservice', "config/scheduler.xml")
+
+ spark = SparkSession.builder.appName("nexus-analysis").config(
+ "spark.scheduler.allocation.file", scheduler_path
+ ).config(
+ "spark.scheduler.mode", "FAIR"
+ ).getOrCreate()
cls.spark_context = spark.sparkContext
return cls.spark_context