You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/07/27 22:04:28 UTC

[incubator-sdap-nexus] branch bug_fixes updated: pass factory method to nexuscalchandlers to create tile service in spark nodes

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch bug_fixes
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/bug_fixes by this push:
     new f1663a5  pass factory method to nexuscalchandlers to create tile service in spark nodes
f1663a5 is described below

commit f1663a5be5acc5f5df1d279d08915170059ee085
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Mon Jul 27 15:04:17 2020 -0700

    pass factory method to nexuscalchandlers to create tile service in spark nodes
---
 analysis/webservice/algorithms/NexusCalcHandler.py | 16 +++++-----
 .../webservice/algorithms_spark/ClimMapSpark.py    |  8 ++---
 .../webservice/algorithms_spark/CorrMapSpark.py    | 29 +++++++++---------
 .../DailyDifferenceAverageSpark.py                 | 23 +++++++-------
 .../webservice/algorithms_spark/HofMoellerSpark.py | 35 +++++++++++++---------
 .../algorithms_spark/MaximaMinimaSpark.py          | 10 +++----
 .../algorithms_spark/NexusCalcSparkHandler.py      | 12 ++++----
 .../webservice/algorithms_spark/TimeAvgMapSpark.py |  9 +++---
 .../webservice/algorithms_spark/TimeSeriesSpark.py | 13 ++++----
 .../webservice/algorithms_spark/VarianceSpark.py   | 16 +++++-----
 analysis/webservice/webapp.py                      | 25 ++++++++++------
 11 files changed, 107 insertions(+), 89 deletions(-)

diff --git a/analysis/webservice/algorithms/NexusCalcHandler.py b/analysis/webservice/algorithms/NexusCalcHandler.py
index b5f220f..bea0842 100644
--- a/analysis/webservice/algorithms/NexusCalcHandler.py
+++ b/analysis/webservice/algorithms/NexusCalcHandler.py
@@ -22,13 +22,15 @@ class NexusCalcHandler(object):
         if "params" not in cls.__dict__:
             raise Exception("Property 'params' has not been defined")
 
-    def __init__(self, algorithm_config=None, skipCassandra=False, skipSolr=False):
-        self.algorithm_config = algorithm_config
-        self._skipCassandra = skipCassandra
-        self._skipSolr = skipSolr
-        self._tile_service = NexusTileService(skipDatastore=self._skipCassandra,
-                                              skipMetadatastore=self._skipSolr,
-                                              config=self.algorithm_config)
+    def __init__(self, tile_service_factory, skipCassandra=False, skipSolr=False):
+        # self.algorithm_config = algorithm_config
+        # self._skipCassandra = skipCassandra
+        # self._skipSolr = skipSolr
+        # self._tile_service = NexusTileService(skipDatastore=self._skipCassandra,
+        #                                       skipMetadatastore=self._skipSolr,
+        #                                       config=self.algorithm_config)
+        self._tile_service_factory = tile_service_factory
+        self._tile_service = tile_service_factory()
 
     def _get_tile_service(self):
         return self._tile_service
diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py b/analysis/webservice/algorithms_spark/ClimMapSpark.py
index e870a2a..78f11f8 100644
--- a/analysis/webservice/algorithms_spark/ClimMapSpark.py
+++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py
@@ -25,7 +25,7 @@ from nexustiles.nexustiles import NexusTileService
 from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC
 from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
 from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
-
+from functools import partial
 
 @nexus_handler
 class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
@@ -35,14 +35,14 @@ class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
     params = DEFAULT_PARAMETERS_SPEC
 
     @staticmethod
-    def _map(tile_in_spark):
+    def _map(tile_service_factory, tile_in_spark):
         tile_bounds = tile_in_spark[0]
         (min_lat, max_lat, min_lon, max_lon,
          min_y, max_y, min_x, max_x) = tile_bounds
         startTime = tile_in_spark[1]
         endTime = tile_in_spark[2]
         ds = tile_in_spark[3]
-        tile_service = NexusTileService()
+        tile_service = tile_service_factory()
         # print 'Started tile', tile_bounds
         # sys.stdout.flush()
         tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
@@ -196,7 +196,7 @@ class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
         spark_nparts = self._spark_nparts(nparts_requested)
         self.log.info('Using {} partitions'.format(spark_nparts))
         rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
-        sum_count_part = rdd.map(self._map)
+        sum_count_part = rdd.map(partial(self._map, self._tile_service_factory))
         sum_count = \
             sum_count_part.combineByKey(lambda val: val,
                                         lambda x, val: (x[0] + val[0],
diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py b/analysis/webservice/algorithms_spark/CorrMapSpark.py
index 1af8cab..4d2c4fe 100644
--- a/analysis/webservice/algorithms_spark/CorrMapSpark.py
+++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py
@@ -13,15 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import json
-import math
-import logging
 from datetime import datetime
+from functools import partial
+
 import numpy as np
-from nexustiles.nexustiles import NexusTileService
 
-# from time import time
 from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC
 from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
 from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException
@@ -35,7 +32,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
     params = DEFAULT_PARAMETERS_SPEC
 
     @staticmethod
-    def _map(tile_in):
+    def _map(tile_service_factory, tile_in):
         # Unpack input
         tile_bounds, start_time, end_time, ds = tile_in
         (min_lat, max_lat, min_lon, max_lon,
@@ -60,7 +57,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
         # print 'days_at_a_time = ', days_at_a_time
         t_incr = 86400 * days_at_a_time
 
-        tile_service = NexusTileService()
+        tile_service = tile_service_factory
 
         # Compute the intermediate summations needed for the Pearson 
         # Correlation Coefficient.  We use a one-pass online algorithm
@@ -194,12 +191,12 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
         self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
 
         daysinrange = self._get_tile_service().find_days_in_range_asc(self._minLat,
-                                                                self._maxLat,
-                                                                self._minLon,
-                                                                self._maxLon,
-                                                                self._ds[0],
-                                                                self._startTime,
-                                                                self._endTime)
+                                                                      self._maxLat,
+                                                                      self._minLon,
+                                                                      self._maxLon,
+                                                                      self._ds[0],
+                                                                      self._startTime,
+                                                                      self._endTime)
         ndays = len(daysinrange)
         if ndays == 0:
             raise NoDataException(reason="No data found for selected timeframe")
@@ -224,7 +221,9 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
         max_time_parts = 72
         num_time_parts = min(max_time_parts, ndays)
 
-        spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), (len(nexus_tiles_spark),1))
+        spark_part_time_ranges = np.tile(
+            np.array([a[[0, -1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]),
+            (len(nexus_tiles_spark), 1))
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
 
@@ -233,7 +232,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
         self.log.info('Using {} partitions'.format(spark_nparts))
 
         rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
-        sum_tiles_part = rdd.map(self._map)
+        sum_tiles_part = rdd.map(partial(self._map, self._tile_service_factory))
         # print "sum_tiles_part = ",sum_tiles_part.collect()
         sum_tiles = \
             sum_tiles_part.combineByKey(lambda val: val,
diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
index 51be431..344927f 100644
--- a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
+++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
@@ -134,15 +134,18 @@ class DailyDifferenceAverageNexusImplSpark(NexusCalcSparkHandler):
         # Get tile ids in box
         tile_ids = [tile.tile_id for tile in
                     self._get_tile_service().find_tiles_in_polygon(bounding_polygon, dataset,
-                                                             start_seconds_from_epoch, end_seconds_from_epoch,
-                                                             fetch_data=False, fl='id',
-                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
-                                                                   'tile_min_lat asc'], rows=5000)]
+                                                                   start_seconds_from_epoch, end_seconds_from_epoch,
+                                                                   fetch_data=False, fl='id',
+                                                                   sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                         'tile_min_lat asc'], rows=5000)]
 
         # Call spark_matchup
-        self.log.debug("Calling Spark Driver")
         try:
-            spark_result = spark_anomolies_driver(tile_ids, wkt.dumps(bounding_polygon), dataset, climatology,
+            spark_result = spark_anomalies_driver(self._tile_service_factory,
+                                                  tile_ids,
+                                                  wkt.dumps(bounding_polygon),
+                                                  dataset,
+                                                  climatology,
                                                   sc=self._sc)
         except Exception as e:
             self.log.exception(e)
@@ -264,7 +267,7 @@ def determine_parllelism(num_tiles):
     return num_partitions
 
 
-def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None):
+def spark_anomalies_driver(tile_service_driver, tile_ids, bounding_wkt, dataset, climatology, sc=None):
     from functools import partial
 
     with DRIVER_LOCK:
@@ -297,7 +300,7 @@ def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None
         return sum_cnt_var_tuple[0] / sum_cnt_var_tuple[1], np.sqrt(sum_cnt_var_tuple[2])
 
     result = rdd \
-        .mapPartitions(partial(calculate_diff, bounding_wkt=bounding_wkt_b, dataset=dataset_b,
+        .mapPartitions(partial(calculate_diff, tile_service_driver, bounding_wkt=bounding_wkt_b, dataset=dataset_b,
                                climatology=climatology_b)) \
         .reduceByKey(add_tuple_elements) \
         .mapValues(compute_avg_and_std) \
@@ -307,7 +310,7 @@ def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None
     return result
 
 
-def calculate_diff(tile_ids, bounding_wkt, dataset, climatology):
+def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climatology):
     from itertools import chain
 
     # Construct a list of generators that yield (day, sum, count, variance)
@@ -316,7 +319,7 @@ def calculate_diff(tile_ids, bounding_wkt, dataset, climatology):
     tile_ids = list(tile_ids)
     if len(tile_ids) == 0:
         return []
-    tile_service = NexusTileService()
+    tile_service = tile_service_factory()
 
     for tile_id in tile_ids:
         # Get the dataset tile
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index c4bc019..6616ae2 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 import itertools
-import logging
 from cStringIO import StringIO
 from datetime import datetime
 from functools import partial
@@ -25,8 +24,8 @@ import numpy as np
 import shapely.geometry
 from matplotlib import cm
 from matplotlib.ticker import FuncFormatter
-from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
+
 from webservice.NexusHandler import nexus_handler
 from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
 from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
@@ -41,12 +40,12 @@ LONGITUDE = 1
 
 class HofMoellerCalculator(object):
     @staticmethod
-    def hofmoeller_stats(metrics_callback, tile_in_spark):
+    def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark):
 
         (latlon, tile_id, index,
          min_lat, max_lat, min_lon, max_lon) = tile_in_spark
 
-        tile_service = NexusTileService()
+        tile_service = tile_service_factory()
         try:
             # Load the dataset tile
             tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0]
@@ -263,7 +262,7 @@ def hof_tuple_to_dict(t, avg_var_name):
             'min': t[7]}
 
 
-def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback):
+def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback):
     # Parallelize list of tile ids
     rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
     if latlon == 0:
@@ -279,7 +278,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback):
     # the value is a tuple of intermediate statistics for the specified
     # coordinate within a single NEXUS tile.
     metrics_callback(partitions=rdd.getNumPartitions())
-    results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, metrics_callback))
+    results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, tile_service_factory, metrics_callback))
 
     # Combine tuples across tiles with input key = (time, lat|lon)
     # Output a key value pair with key = (time)
@@ -349,15 +348,19 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
 
         nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
                              enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
-                                                                            ds, start_time, end_time,
-                                                                            metrics_callback=metrics_record.record_metrics,
-                                                                            fetch_data=False))]
+                                                                                  ds, start_time, end_time,
+                                                                                  metrics_callback=metrics_record.record_metrics,
+                                                                                  fetch_data=False))]
 
         print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
             raise NoDataException(reason="No data found for selected timeframe")
 
-        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics)
+        results = spark_driver(self._sc,
+                               self._latlon,
+                               self._tile_service_factory,
+                               nexus_tiles_spark,
+                               metrics_record.record_metrics)
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry['time'])
         for i in range(len(results)):
@@ -400,15 +403,19 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
 
         nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
                              enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
-                                                                            ds, start_time, end_time,
-                                                                            metrics_callback=metrics_record.record_metrics,
-                                                                            fetch_data=False))]
+                                                                                  ds, start_time, end_time,
+                                                                                  metrics_callback=metrics_record.record_metrics,
+                                                                                  fetch_data=False))]
 
         print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
             raise NoDataException(reason="No data found for selected timeframe")
 
-        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics)
+        results = spark_driver(self._sc,
+                               self._latlon,
+                               nexus_tiles_spark,
+                               self._tile_service_factory,
+                               metrics_record.record_metrics)
 
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
index 3bd9698..5b4bd83 100644
--- a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
+++ b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
@@ -14,13 +14,11 @@
 # limitations under the License.
 
 
-import math
-import logging
 from datetime import datetime
+from functools import partial
 
 import numpy as np
 import shapely.geometry
-from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
 
 from webservice.NexusHandler import nexus_handler
@@ -207,7 +205,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
         self.log.info('Using {} partitions'.format(spark_nparts))
 
         rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
-        max_min_part = rdd.map(self._map)
+        max_min_part = rdd.map(partial(self._map, self._tile_service_factory))
         max_min_count = \
             max_min_part.combineByKey(lambda val: val,
                                         lambda x, val: (np.maximum(x[0], val[0]),   # Max
@@ -283,7 +281,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
 
     # this operates on only one nexus tile bound over time. Can assume all nexus_tiles are the same shape
     @staticmethod
-    def _map(tile_in_spark):
+    def _map(tile_service_factory, tile_in_spark):
         # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area
         tile_bounds = tile_in_spark[0]
         (min_lat, max_lat, min_lon, max_lon,
@@ -291,7 +289,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
         startTime = tile_in_spark[1]
         endTime = tile_in_spark[2]
         ds = tile_in_spark[3]
-        tile_service = NexusTileService()
+        tile_service = tile_service_factory()
 
         tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
 
diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
index 9e77887..fe3541a 100644
--- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
@@ -8,6 +8,7 @@ from webservice.webmodel import NexusProcessingException
 
 logger = logging.getLogger(__name__)
 
+
 class NexusCalcSparkHandler(NexusCalcHandler):
     class SparkJobContext(object):
 
@@ -33,14 +34,15 @@ class NexusCalcSparkHandler(NexusCalcHandler):
                 self.log.debug("Returning %s" % self.job_name)
                 self.spark_job_stack.append(self.job_name)
 
-    def __init__(self, algorithm_config=None, sc=None, **kwargs):
+    def __init__(self, tile_service_factory, sc=None, **kwargs):
         import inspect
 
-        NexusCalcHandler.__init__(self, algorithm_config=algorithm_config, **kwargs)
+        NexusCalcHandler.__init__(self, tile_service_factory=tile_service_factory, **kwargs)
         self.spark_job_stack = []
         self._sc = sc
-        max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section(
-            "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10
+        # max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section(
+        #     "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10
+        max_concurrent_jobs = 10
         self.spark_job_stack = list(["Job %s" % x for x in xrange(1, max_concurrent_jobs + 1)])
         self.log = logging.getLogger(__name__)
 
@@ -350,4 +352,4 @@ class NexusCalcSparkHandler(NexusCalcHandler):
                                          accumulator=self._sc.accumulator(0)),
             NumberMetricsField(key='reduce', description='Actual time to reduce results'),
             NumberMetricsField(key="actual_time", description="Total (actual) time")
-        ])
\ No newline at end of file
+        ])
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index c668130..6231873 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -13,14 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
 from datetime import datetime
 from functools import partial
 
 import numpy as np
 import shapely.geometry
-from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
+
 from webservice.NexusHandler import nexus_handler
 from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
 from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
@@ -198,7 +197,7 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
 
         rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
         metrics_record.record_metrics(partitions=rdd.getNumPartitions())
-        sum_count_part = rdd.map(partial(self._map, metrics_record.record_metrics))
+        sum_count_part = rdd.map(partial(self._map, self._tile_service_factory, metrics_record.record_metrics))
         reduce_duration = 0
         reduce_start = datetime.now()
         sum_count = sum_count_part.combineByKey(lambda val: val,
@@ -264,14 +263,14 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
                             endTime=end_time)
 
     @staticmethod
-    def _map(metrics_callback, tile_in_spark):
+    def _map(tile_service_factory, metrics_callback, tile_in_spark):
         tile_bounds = tile_in_spark[0]
         (min_lat, max_lat, min_lon, max_lon,
          min_y, max_y, min_x, max_x) = tile_bounds
         startTime = tile_in_spark[1]
         endTime = tile_in_spark[2]
         ds = tile_in_spark[3]
-        tile_service = NexusTileService()
+        tile_service = tile_service_factory()
 
         tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
 
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index bf5963e..43f7f6d 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -195,7 +195,10 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
             spark_nparts = self._spark_nparts(nparts_requested)
             self.log.info('Using {} partitions'.format(spark_nparts))
             results, meta = spark_driver(daysinrange, bounding_polygon,
-                                         shortName, metrics_record.record_metrics, spark_nparts=spark_nparts,
+                                         shortName,
+                                         self._tile_service_factory,
+                                         metrics_record.record_metrics,
+                                         spark_nparts=spark_nparts,
                                          sc=self._sc)
 
             if apply_seasonal_cycle_filter:
@@ -487,7 +490,7 @@ class TimeSeriesResults(NexusResults):
         return sio.getvalue()
 
 
-def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999.,
+def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, fill=-9999.,
                  spark_nparts=1, sc=None):
     nexus_tiles_spark = [(bounding_polygon.wkt, ds,
                           list(daysinrange_part), fill)
@@ -497,14 +500,14 @@ def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999
     # Launch Spark computations
     rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
     metrics_callback(partitions=rdd.getNumPartitions())
-    results = rdd.flatMap(partial(calc_average_on_day, metrics_callback)).collect()
+    results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback)).collect()
     results = list(itertools.chain.from_iterable(results))
     results = sorted(results, key=lambda entry: entry["time"])
 
     return results, {}
 
 
-def calc_average_on_day(metrics_callback, tile_in_spark):
+def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark):
     import shapely.wkt
     from datetime import datetime
     from pytz import timezone
@@ -513,7 +516,7 @@ def calc_average_on_day(metrics_callback, tile_in_spark):
     (bounding_wkt, dataset, timestamps, fill) = tile_in_spark
     if len(timestamps) == 0:
         return []
-    tile_service = NexusTileService()
+    tile_service = tile_service_factory()
     ds1_nexus_tiles = \
         tile_service.get_tiles_bounded_by_polygon(shapely.wkt.loads(bounding_wkt),
                                                   dataset,
diff --git a/analysis/webservice/algorithms_spark/VarianceSpark.py b/analysis/webservice/algorithms_spark/VarianceSpark.py
index 698385d..24ffbf0 100644
--- a/analysis/webservice/algorithms_spark/VarianceSpark.py
+++ b/analysis/webservice/algorithms_spark/VarianceSpark.py
@@ -14,13 +14,11 @@
 # limitations under the License.
 
 
-import math
-import logging
 from datetime import datetime
+from functools import partial
 
 import numpy as np
 import shapely.geometry
-from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
 
 from webservice.NexusHandler import nexus_handler
@@ -207,7 +205,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
         self.log.info('Using {} partitions'.format(spark_nparts))
 
         rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
-        sum_count_part = rdd.map(self._map)
+        sum_count_part = rdd.map(partial(self._map, self._tile_service_factory))
         sum_count = \
             sum_count_part.combineByKey(lambda val: val,
                                         lambda x, val: (x[0] + val[0],
@@ -235,7 +233,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
         self.log.info('Using {} partitions'.format(spark_nparts))
         rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
 
-        anomaly_squared_part = rdd.map(self._calc_variance)
+        anomaly_squared_part = rdd.map(partial(self._calc_variance, self._tile_service_factory))
         anomaly_squared = \
             anomaly_squared_part.combineByKey(lambda val: val,
                                         lambda x, val: (x[0] + val[0],
@@ -303,7 +301,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
                             endTime=end_time)
 
     @staticmethod
-    def _map(tile_in_spark):
+    def _map(tile_service_factory, tile_in_spark):
         # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area
         tile_bounds = tile_in_spark[0]
         (min_lat, max_lat, min_lon, max_lon,
@@ -311,7 +309,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
         startTime = tile_in_spark[1]
         endTime = tile_in_spark[2]
         ds = tile_in_spark[3]
-        tile_service = NexusTileService()
+        tile_service = tile_service_factory()
 
         tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
 
@@ -345,7 +343,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
         return tile_bounds, (sum_tile, cnt_tile)
 
     @staticmethod
-    def _calc_variance(tile_in_spark):
+    def _calc_variance(tile_service_factory, tile_in_spark):
         # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area
         tile_bounds = tile_in_spark[0]
         (min_lat, max_lat, min_lon, max_lon,
@@ -354,7 +352,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
         endTime = tile_in_spark[2]
         ds = tile_in_spark[3]
         x_bar = tile_in_spark[4]
-        tile_service = NexusTileService()
+        tile_service = tile_service_factory()
 
         tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
 
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index bc232b4..3ee2f09 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -13,16 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import os
 import ConfigParser
 import importlib
 import logging
 import sys
+from functools import partial
+
 import pkg_resources
 import tornado.web
-import webservice.algorithms_spark.NexusCalcSparkHandler
 from tornado.options import define, options, parse_command_line
 
+import webservice.algorithms_spark.NexusCalcSparkHandler
+from nexustiles.nexustiles import NexusTileService
 from webservice import NexusHandler
 from webservice.nexus_tornado.request.handlers import NexusRequestHandler
 
@@ -101,6 +103,7 @@ if __name__ == "__main__":
     log.info("Initializing request ThreadPool to %s" % max_request_threads)
     request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
 
+    tile_service_factory = partial(NexusTileService, False, False, algorithm_config)
     spark_context = None
     for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
         if issubclass(clazzWrapper, webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
@@ -110,14 +113,18 @@ if __name__ == "__main__":
                 spark = SparkSession.builder.appName("nexus-analysis").getOrCreate()
                 spark_context = spark.sparkContext
 
-            handlers.append(
-                (clazzWrapper.path, NexusRequestHandler,
-                 dict(clazz=clazzWrapper, algorithm_config=algorithm_config, sc=spark_context,
-                      thread_pool=request_thread_pool)))
+            handlers.append((clazzWrapper.path,
+                             NexusRequestHandler,
+                             dict(clazz=clazzWrapper,
+                                  tile_service_factory=tile_service_factory,
+                                  sc=spark_context,
+                                  thread_pool=request_thread_pool)))
         else:
-            handlers.append(
-                (clazzWrapper.path, NexusRequestHandler,
-                 dict(clazz=clazzWrapper, algorithm_config=algorithm_config, thread_pool=request_thread_pool)))
+            handlers.append((clazzWrapper.path,
+                             NexusRequestHandler,
+                             dict(clazz=clazzWrapper,
+                                  tile_service_factory=tile_service_factory,
+                                  thread_pool=request_thread_pool)))
 
 
     class VersionHandler(tornado.web.RequestHandler):