You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sdap.apache.org by GitBox <gi...@apache.org> on 2018/09/18 17:40:19 UTC

[GitHub] fgreg closed pull request #33: SDAP-95 Clean up and document TimeAvgMapSpark

fgreg closed pull request #33: SDAP-95 Clean up and document TimeAvgMapSpark
URL: https://github.com/apache/incubator-sdap-nexus/pull/33
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 19de786..473f4ce 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -15,131 +15,152 @@
 
 
 import logging
+from datetime import datetime
 
 import numpy as np
+import shapely.geometry
 from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
 
-# from time import time
-from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC
+from webservice.NexusHandler import nexus_handler, SparkHandler
 from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
 
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
 
 @nexus_handler
 class TimeAvgMapSparkHandlerImpl(SparkHandler):
     name = "Time Average Map Spark"
     path = "/timeAvgMapSpark"
     description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range"
-    params = DEFAULT_PARAMETERS_SPEC
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "String",
+            "description": "The dataset used to generate the map. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+        },
+        "spark": {
+            "name": "Spark Configuration",
+            "type": "comma-delimited value",
+            "description": "Configuration used to launch in the Spark cluster. Value should be 3 elements separated by "
+                           "commas. 1) Spark Master 2) Number of Spark Executors 3) Number of Spark Partitions. Only "
+                           "Number of Spark Partitions is used by this function. Optional (Default: local,1,1)"
+        }
+    }
     singleton = True
 
     def __init__(self):
         SparkHandler.__init__(self)
         self.log = logging.getLogger(__name__)
-        # self.log.setLevel(logging.DEBUG)
 
-    @staticmethod
-    def _map(tile_in_spark):
-        tile_bounds = tile_in_spark[0]
-        (min_lat, max_lat, min_lon, max_lon,
-         min_y, max_y, min_x, max_x) = tile_bounds
-        startTime = tile_in_spark[1]
-        endTime = tile_in_spark[2]
-        ds = tile_in_spark[3]
-        tile_service = NexusTileService()
-        # print 'Started tile {0}'.format(tile_bounds)
-        # sys.stdout.flush()
-        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
-        # days_at_a_time = 90
-        days_at_a_time = 30
-        # days_at_a_time = 7
-        # days_at_a_time = 1
-        # print 'days_at_a_time = {0}'.format(days_at_a_time)
-        t_incr = 86400 * days_at_a_time
-        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
-        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
-        t_start = startTime
-        while t_start <= endTime:
-            t_end = min(t_start + t_incr, endTime)
-            # t1 = time()
-            # print 'nexus call start at time {0}'.format(t1)
-            # sys.stdout.flush()
-            # nexus_tiles = \
-            #    TimeAvgMapSparkHandlerImpl.query_by_parts(tile_service,
-            #                                              min_lat, max_lat, 
-            #                                              min_lon, max_lon, 
-            #                                              ds, 
-            #                                              t_start, 
-            #                                              t_end,
-            #                                              part_dim=2)
-            nexus_tiles = \
-                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
-                                                      min_lon, max_lon,
-                                                      ds=ds,
-                                                      start_time=t_start,
-                                                      end_time=t_end)
-            # t2 = time()
-            # print 'nexus call end at time %f' % t2
-            # print 'secs in nexus call: ', t2 - t1
-            # print 't %d to %d - Got %d tiles' % (t_start, t_end,
-            #                                     len(nexus_tiles))
-            # for nt in nexus_tiles:
-            #    print nt.granule
-            #    print nt.section_spec
-            #    print 'lat min/max:', np.ma.min(nt.latitudes), np.ma.max(nt.latitudes)
-            #    print 'lon min/max:', np.ma.min(nt.longitudes), np.ma.max(nt.longitudes)
-            # sys.stdout.flush()
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
 
-            for tile in nexus_tiles:
-                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
-                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
-                cnt_tile += (~tile.data.mask[0,
-                              min_y:max_y + 1,
-                              min_x:max_x + 1]).astype(np.uint8)
-            t_start = t_end + 1
+        try:
+            ds = request.get_dataset()
+            if type(ds) == list or type(ds) == tuple:
+                ds = next(iter(ds))
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be a string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(
+                reason="Cannot compute Latitude/Longitude Time Average plot on a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
+                                           request.get_max_lon(), request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \
+               spark_master, spark_nexecs, spark_nparts
 
-        # print 'cnt_tile = ', cnt_tile
-        # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
-        # sum_tile.mask = cnt_tile.mask
-        # avg_tile = sum_tile / cnt_tile
-        # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in range(tile_inbounds_shape[0])]
-        # print 'Finished tile', tile_bounds
-        # print 'Tile avg = ', avg_tile
-        # sys.stdout.flush()
-        return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
-
-    def calc(self, computeOptions, **args):
+    def calc(self, compute_options, **args):
         """
 
-        :param computeOptions: StatsComputeOptions
+        :param compute_options: StatsComputeOptions
         :param args: dict
         :return:
         """
 
-        spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg()
-        self._setQueryParams(computeOptions.get_dataset()[0],
-                             (float(computeOptions.get_min_lat()),
-                              float(computeOptions.get_max_lat()),
-                              float(computeOptions.get_min_lon()),
-                              float(computeOptions.get_max_lon())),
-                             computeOptions.get_start_time(),
-                             computeOptions.get_end_time(),
+        ds, bbox, start_time, end_time, spark_master, spark_nexecs, spark_nparts = self.parse_arguments(compute_options)
+
+        compute_options.get_spark_cfg()
+
+        self._setQueryParams(ds,
+                             (float(bbox.bounds[1]),
+                              float(bbox.bounds[3]),
+                              float(bbox.bounds[0]),
+                              float(bbox.bounds[2])),
+                             start_time,
+                             end_time,
                              spark_master=spark_master,
                              spark_nexecs=spark_nexecs,
                              spark_nparts=spark_nparts)
 
-        if 'CLIM' in self._ds:
-            raise NexusProcessingException(
-                reason="Cannot compute Latitude/Longitude Time Average plot on a climatology", code=400)
-
         nexus_tiles = self._find_global_tile_set()
-        # print 'tiles:'
-        # for tile in nexus_tiles:
-        #     print tile.granule
-        #     print tile.section_spec
-        #     print 'lat:', tile.latitudes
-        #     print 'lon:', tile.longitudes
-
-        #                                                          nexus_tiles)
+
         if len(nexus_tiles) == 0:
             raise NoDataException(reason="No data found for selected timeframe")
 
@@ -152,14 +173,11 @@ def calc(self, computeOptions, **args):
         self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
                                                               self._maxLonCent))
 
-        # for tile in nexus_tiles:
-        #    print 'lats: ', tile.latitudes.compressed()
-        #    print 'lons: ', tile.longitudes.compressed()
         # Create array of tuples to pass to Spark map function
         nexus_tiles_spark = [[self._find_tile_bounds(t),
                               self._startTime, self._endTime,
                               self._ds] for t in nexus_tiles]
-        # print 'nexus_tiles_spark = ', nexus_tiles_spark
+
         # Remove empty tiles (should have bounds set to None)
         bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
         for i in np.flipud(bad_tile_inds):
@@ -168,7 +186,7 @@ def calc(self, computeOptions, **args):
         # Expand Spark map tuple array by duplicating each entry N times,
         # where N is the number of ways we want the time dimension carved up.
         num_time_parts = 72
-        # num_time_parts = 1
+
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
         self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark)))
 
@@ -183,9 +201,6 @@ def calc(self, computeOptions, **args):
                       len(nexus_tiles_spark) / num_time_parts, axis=0).reshape((len(nexus_tiles_spark), 2))
         self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
-        # print 'nexus_tiles_spark final = '
-        # for i in range(len(nexus_tiles_spark)):
-        #    print nexus_tiles_spark[i]
 
         # Launch Spark computations
         rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
@@ -246,13 +261,48 @@ def calc(self, computeOptions, **args):
         self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
 
         # Create dict for JSON response
-        results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+        results = [[{'mean': a[y, x], 'cnt': int(n[y, x]),
                      'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
                     for x in range(a.shape[1])] for y in range(a.shape[0])]
 
-        return TimeAvgMapSparkResults(results=results, meta={}, computeOptions=computeOptions)
+        return NexusResults(results=results, meta={}, stats=None,
+                            computeOptions=None, minLat=bbox.bounds[1],
+                            maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
+                            maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
+                            endTime=end_time)
+
+    @staticmethod
+    def _map(tile_in_spark):
+        tile_bounds = tile_in_spark[0]
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+        startTime = tile_in_spark[1]
+        endTime = tile_in_spark[2]
+        ds = tile_in_spark[3]
+        tile_service = NexusTileService()
+
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+
+        days_at_a_time = 30
+
+        t_incr = 86400 * days_at_a_time
+        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+        t_start = startTime
+        while t_start <= endTime:
+            t_end = min(t_start + t_incr, endTime)
+
+            nexus_tiles = \
+                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+                                                      min_lon, max_lon,
+                                                      ds=ds,
+                                                      start_time=t_start,
+                                                      end_time=t_end)
 
+            for tile in nexus_tiles:
+                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+                cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 1]).astype(np.uint8)
+            t_start = t_end + 1
 
-class TimeAvgMapSparkResults(NexusResults):
-    def __init__(self, results=None, meta=None, computeOptions=None):
-        NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)
+        return (min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services