You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by fg...@apache.org on 2018/09/18 17:40:55 UTC

[incubator-sdap-nexus] branch master updated: SDAP-93 Clean up and document HofMoellerSpark (#35)

This is an automated email from the ASF dual-hosted git repository.

fgreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e4c8cd  SDAP-93 Clean up and document HofMoellerSpark (#35)
0e4c8cd is described below

commit 0e4c8cdd512a55bd97bd7bc311f54dcd9a567723
Author: fgreg <fg...@gmail.com>
AuthorDate: Tue Sep 18 10:40:51 2018 -0700

    SDAP-93 Clean up and document HofMoellerSpark (#35)
    
    * updated params and return values
    
    * SDAP-50 Fix incorrect Hovmoller Implementation
---
 .../webservice/algorithms_spark/HofMoellerSpark.py | 281 ++++++++++++++-------
 1 file changed, 187 insertions(+), 94 deletions(-)

diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 92d7a70..ea4a37d 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -13,22 +13,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import sys
 import itertools
 import logging
-import traceback
 from cStringIO import StringIO
 from datetime import datetime
 
 import matplotlib.pyplot as plt
 import mpld3
 import numpy as np
+import shapely.geometry
 from matplotlib import cm
 from matplotlib.ticker import FuncFormatter
 from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
 
-from webservice.NexusHandler import SparkHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
-from webservice.webmodel import NexusProcessingException, NexusResults
+from webservice.NexusHandler import SparkHandler, nexus_handler
+from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%SZ'
 
 SENTINEL = 'STOP'
 LATITUDE = 0
@@ -50,9 +53,8 @@ class HofMoellerCalculator(object):
             tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
                                                    min_lon, max_lon, [tile])[0]
         except IndexError:
-            #return None
+            # return None
             return []
-
         t = np.ma.min(tile.times)
         stats = []
 
@@ -65,14 +67,13 @@ class HofMoellerCalculator(object):
             # Longitude-Time Map (Average over latitudes)
             data = sorted(points, key=lambda p: p.longitude)
             points_by_coord = itertools.groupby(data, key=lambda p: p.longitude)
-            
- 
+
         for coord, points_at_coord in points_by_coord:
             values_at_coord = np.array([[p.data_val,
-                                        np.cos(np.radians(p.latitude))]
+                                         np.cos(np.radians(p.latitude))]
                                         for p in points_at_coord])
-            vals = np.nan_to_num(values_at_coord[:,0])
-            weights = values_at_coord[:,1]
+            vals = np.nan_to_num(values_at_coord[:, 0])
+            weights = values_at_coord[:, 1]
             coord_cnt = len(values_at_coord)
             if latlon == 0:
                 # Latitude-Time Map (Average over longitudes)
@@ -96,10 +97,107 @@ class HofMoellerCalculator(object):
 
 
 class BaseHoffMoellerHandlerImpl(SparkHandler):
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "comma-delimited string",
+            "description": "The dataset(s) Used to generate the plot. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+        },
+        "spark": {
+            "name": "Spark Configuration",
+            "type": "comma-delimited value",
+            "description": "Configuration used to launch in the Spark cluster. Value should be 3 elements separated by "
+                           "commas. 1) Spark Master 2) Number of Spark Executors 3) Number of Spark Partitions. Only "
+                           "Number of Spark Partitions is used by this function. Optional (Default: local,1,1)"
+        }
+    }
+
     def __init__(self):
         SparkHandler.__init__(self)
         self.log = logging.getLogger(__name__)
 
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+
+        try:
+            ds = request.get_dataset()
+            if type(ds) == list or type(ds) == tuple:
+                ds = next(iter(ds))
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be a string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(
+                reason="Cannot compute Latitude/Longitude Time Average plot on a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
+                                           request.get_max_lon(), request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \
+               spark_master, spark_nexecs, spark_nparts
+
     def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True):
         shape = (len(results), len(results[0][pivot]))
         if shape[0] <= 12:
@@ -146,21 +244,22 @@ def hof_tuple_time(t):
 
 
 def hof_tuple_combine(t1, t2):
-    return (t1[0], # Time
-            t1[1], # Sequence (index)
-            t1[2], # Coordinate on axis (latitude or longitude)
-            t1[3] + t2[3], # Number of values
-            t1[4] + t2[4], # Sum of values (weighted for lon-time maps)
-            t1[5] + t2[5], # Sum of weights (= # of values for lat-time maps)
-            max(t1[6], t2[6]), # Maximum value
-            min(t1[7], t2[7]), # Minimum value
-            parallel_variance(t1[4]/t1[5], t1[3], t1[8], 
-                              t2[4]/t2[5], t2[3], t2[8])) # Variance
+    return (t1[0],  # Time
+            t1[1],  # Sequence (index)
+            t1[2],  # Coordinate on axis (latitude or longitude)
+            t1[3] + t2[3],  # Number of values
+            t1[4] + t2[4],  # Sum of values (weighted for lon-time maps)
+            t1[5] + t2[5],  # Sum of weights (= # of values for lat-time maps)
+            max(t1[6], t2[6]),  # Maximum value
+            min(t1[7], t2[7]),  # Minimum value
+            parallel_variance(t1[4] / t1[5], t1[3], t1[8],
+                              t2[4] / t2[5], t2[3], t2[8]))  # Variance
+
 
 def hof_tuple_to_dict(t, avg_var_name):
     return {avg_var_name: t[2],
             'cnt': t[3],
-            'avg': t[4] / t[5],
+            'mean': t[4] / t[5],
             'std': np.sqrt(t[8]),
             'max': t[6],
             'min': t[7]}
@@ -176,7 +275,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
         # Longitude-Time Map (Average over latitudes)
         avg_var_name = 'longitude'
         avg_var_name_collection = 'lons'
-        
+
     # Create a set of key-value pairs where the key is (time, lat|lon) and
     # the value is a tuple of intermediate statistics for the specified
     # coordinate within a single NEXUS tile.
@@ -184,7 +283,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
 
     # Combine tuples across tiles with input key = (time, lat|lon)
     # Output a key value pair with key = (time)
-    results = results.combineByKey(lambda val: (hof_tuple_time(val),val),
+    results = results.combineByKey(lambda val: (hof_tuple_time(val), val),
                                    lambda x, val: (hof_tuple_time(x),
                                                    hof_tuple_combine(x[1],
                                                                      val)),
@@ -194,29 +293,32 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
 
     # Convert the tuples to dictionary entries and combine coordinates
     # with the same time stamp.  Here we have input key = (time)
-    results = results.values().\
-              combineByKey(lambda val, avg_var_name=avg_var_name,
-                           avg_var_name_collection=avg_var_name_collection: {
-                               'sequence': val[1],
-                               'time': val[0],
-                               avg_var_name_collection: [
-                                   hof_tuple_to_dict(val, avg_var_name)]},
-                           lambda x, val, avg_var_name=avg_var_name,
-                           avg_var_name_collection=avg_var_name_collection: {
-                               'sequence': x['sequence'],
-                               'time': x['time'],
-                               avg_var_name_collection: (
-                                   x[avg_var_name_collection] +
-                                   [hof_tuple_to_dict(val, avg_var_name)])},
-                           lambda x, y,
-                             avg_var_name_collection=avg_var_name_collection:
-                             {'sequence': x['sequence'],
-                              'time': x['time'],
-                              avg_var_name_collection: (
-                                  x[avg_var_name_collection] +
-                                  y[avg_var_name_collection])}).\
-              values().\
-              collect()
+    results = results.values(). \
+        combineByKey(lambda val, avg_var_name=avg_var_name,
+                            avg_var_name_collection=avg_var_name_collection: {
+        'sequence': val[1],
+        'time': val[0],
+        'iso_time': datetime.utcfromtimestamp(val[0]).strftime(ISO_8601),
+        avg_var_name_collection: [
+            hof_tuple_to_dict(val, avg_var_name)]},
+                     lambda x, val, avg_var_name=avg_var_name,
+                            avg_var_name_collection=avg_var_name_collection: {
+                         'sequence': x['sequence'],
+                         'time': x['time'],
+                         'iso_time': x['iso_time'],
+                         avg_var_name_collection: (
+                                 x[avg_var_name_collection] +
+                                 [hof_tuple_to_dict(val, avg_var_name)])},
+                     lambda x, y,
+                            avg_var_name_collection=avg_var_name_collection:
+                     {'sequence': x['sequence'],
+                      'time': x['time'],
+                      'iso_time': x['iso_time'],
+                      avg_var_name_collection: (
+                              x[avg_var_name_collection] +
+                              y[avg_var_name_collection])}). \
+        values(). \
+        collect()
 
     return results
 
@@ -226,26 +328,28 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     name = "Latitude/Time HofMoeller Spark"
     path = "/latitudeTimeHofMoellerSpark"
     description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range"
-    params = DEFAULT_PARAMETERS_SPEC
+    params = BaseHoffMoellerHandlerImpl.params
     singleton = True
 
     def __init__(self):
-        self._latlon = 0 # 0 for latitude-time map, 1 for longitude-time map
+        self._latlon = 0  # 0 for latitude-time map, 1 for longitude-time map
         BaseHoffMoellerHandlerImpl.__init__(self)
 
-    def calc(self, computeOptions, **args):
-        nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
-                              computeOptions.get_min_lat(),
-                              computeOptions.get_max_lat(),
-                              computeOptions.get_min_lon(),
-                              computeOptions.get_max_lon())
-                             for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
+    def calc(self, compute_options, **args):
+        ds, bbox, start_time, end_time, spark_master, spark_nexecs, spark_nparts = self.parse_arguments(compute_options)
+
+        min_lon, min_lat, max_lon, max_lat = bbox.bounds
+
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
+                             enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
+                                                                            ds, start_time, end_time,
+                                                                            fetch_data=False))]
+
         print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
-            raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+            raise NoDataException(reason="No data found for selected timeframe")
 
-        results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
-                                                        
+        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry['time'])
         for i in range(len(results)):
@@ -253,7 +357,9 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
                                         key=lambda entry: entry['latitude'])
         results = self.applyDeseasonToHofMoeller(results)
 
-        result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
+        result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LATITUDE,
+                                    minLat=min_lat, maxLat=max_lat, minLon=min_lon,
+                                    maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
         return result
 
 
@@ -262,25 +368,28 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     name = "Longitude/Time HofMoeller Spark"
     path = "/longitudeTimeHofMoellerSpark"
     description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range"
-    params = DEFAULT_PARAMETERS_SPEC
+    params = BaseHoffMoellerHandlerImpl.params
     singleton = True
 
     def __init__(self):
-        self._latlon = 1 # 0 for latitude-time map; 1 for longitude-time map
+        self._latlon = 1  # 0 for latitude-time map; 1 for longitude-time map
         BaseHoffMoellerHandlerImpl.__init__(self)
 
-    def calc(self, computeOptions, **args):
-        nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
-                              computeOptions.get_min_lat(),
-                              computeOptions.get_max_lat(),
-                              computeOptions.get_min_lon(),
-                              computeOptions.get_max_lon())
-                             for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
+    def calc(self, compute_options, **args):
+        ds, bbox, start_time, end_time, spark_master, spark_nexecs, spark_nparts = self.parse_arguments(compute_options)
+
+        min_lon, min_lat, max_lon, max_lat = bbox.bounds
+
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
+                             enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
+                                                                            ds, start_time, end_time,
+                                                                            fetch_data=False))]
 
+        print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
-            raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+            raise NoDataException(reason="No data found for selected timeframe")
 
-        results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
+        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
 
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry["time"])
@@ -290,7 +399,9 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
 
         results = self.applyDeseasonToHofMoeller(results, pivot="lons")
 
-        result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE)
+        result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LONGITUDE,
+                                    minLat=min_lat, maxLat=max_lat, minLon=min_lon,
+                                    maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
         return result
 
 
@@ -298,9 +409,10 @@ class HoffMoellerResults(NexusResults):
     LATITUDE = 0
     LONGITUDE = 1
 
-    def __init__(self, results=None, meta=None, stats=None, computeOptions=None, **args):
-        NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions)
-        self.__type = args['type']
+    def __init__(self, results=None, meta=None, stats=None, **kwargs):
+
+        NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=None, **kwargs)
+        self.__type = kwargs['type']
 
     def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'):
         cmap = cm.coolwarm
@@ -399,22 +511,3 @@ class HoffMoellerResults(NexusResults):
             return self.createLongitudeHoffmueller(res, meta)
         else:
             raise Exception("Unsupported HoffMoeller Plot Type")
-
-
-def pool_worker(type, work_queue, done_queue):
-    try:
-
-        if type == LATITUDE:
-            calculator = LatitudeHofMoellerCalculator()
-        elif type == LONGITUDE:
-            calculator = LongitudeHofMoellerCalculator()
-
-        for work in iter(work_queue.get, SENTINEL):
-            scifunction = work[0]
-            args = work[1:]
-            result = calculator.__getattribute__(scifunction)(*args)
-            done_queue.put(result)
-
-    except Exception as e:
-        e_str = traceback.format_exc(e)
-        done_queue.put({'error': e_str})