You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by fg...@apache.org on 2018/09/18 17:40:55 UTC
[incubator-sdap-nexus] branch master updated: SDAP-93 Clean up and
document HofMoellerSpark (#35)
This is an automated email from the ASF dual-hosted git repository.
fgreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 0e4c8cd SDAP-93 Clean up and document HofMoellerSpark (#35)
0e4c8cd is described below
commit 0e4c8cdd512a55bd97bd7bc311f54dcd9a567723
Author: fgreg <fg...@gmail.com>
AuthorDate: Tue Sep 18 10:40:51 2018 -0700
SDAP-93 Clean up and document HofMoellerSpark (#35)
* updated params and return values
* SDAP-50 Fix incorrect Hovmoller Implementation
---
.../webservice/algorithms_spark/HofMoellerSpark.py | 281 ++++++++++++++-------
1 file changed, 187 insertions(+), 94 deletions(-)
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 92d7a70..ea4a37d 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -13,22 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import sys
import itertools
import logging
-import traceback
from cStringIO import StringIO
from datetime import datetime
import matplotlib.pyplot as plt
import mpld3
import numpy as np
+import shapely.geometry
from matplotlib import cm
from matplotlib.ticker import FuncFormatter
from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
-from webservice.NexusHandler import SparkHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
-from webservice.webmodel import NexusProcessingException, NexusResults
+from webservice.NexusHandler import SparkHandler, nexus_handler
+from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%SZ'
SENTINEL = 'STOP'
LATITUDE = 0
@@ -50,9 +53,8 @@ class HofMoellerCalculator(object):
tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
min_lon, max_lon, [tile])[0]
except IndexError:
- #return None
+ # return None
return []
-
t = np.ma.min(tile.times)
stats = []
@@ -65,14 +67,13 @@ class HofMoellerCalculator(object):
# Longitude-Time Map (Average over latitudes)
data = sorted(points, key=lambda p: p.longitude)
points_by_coord = itertools.groupby(data, key=lambda p: p.longitude)
-
-
+
for coord, points_at_coord in points_by_coord:
values_at_coord = np.array([[p.data_val,
- np.cos(np.radians(p.latitude))]
+ np.cos(np.radians(p.latitude))]
for p in points_at_coord])
- vals = np.nan_to_num(values_at_coord[:,0])
- weights = values_at_coord[:,1]
+ vals = np.nan_to_num(values_at_coord[:, 0])
+ weights = values_at_coord[:, 1]
coord_cnt = len(values_at_coord)
if latlon == 0:
# Latitude-Time Map (Average over longitudes)
@@ -96,10 +97,107 @@ class HofMoellerCalculator(object):
class BaseHoffMoellerHandlerImpl(SparkHandler):
+ params = {
+ "ds": {
+ "name": "Dataset",
+ "type": "comma-delimited string",
+ "description": "The dataset(s) Used to generate the plot. Required"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+ },
+ "spark": {
+ "name": "Spark Configuration",
+ "type": "comma-delimited value",
+ "description": "Configuration used to launch in the Spark cluster. Value should be 3 elements separated by "
+ "commas. 1) Spark Master 2) Number of Spark Executors 3) Number of Spark Partitions. Only "
+ "Number of Spark Partitions is used by this function. Optional (Default: local,1,1)"
+ }
+ }
+
def __init__(self):
SparkHandler.__init__(self)
self.log = logging.getLogger(__name__)
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+
+ try:
+ ds = request.get_dataset()
+ if type(ds) == list or type(ds) == tuple:
+ ds = next(iter(ds))
+ except:
+ raise NexusProcessingException(
+ reason="'ds' argument is required. Must be a string",
+ code=400)
+
+ # Do not allow time series on Climatology
+ if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+ raise NexusProcessingException(
+ reason="Cannot compute Latitude/Longitude Time Average plot on a climatology", code=400)
+
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ request.get_min_lon = lambda: bounding_polygon.bounds[0]
+ request.get_min_lat = lambda: bounding_polygon.bounds[1]
+ request.get_max_lon = lambda: bounding_polygon.bounds[2]
+ request.get_max_lat = lambda: bounding_polygon.bounds[3]
+ except:
+ try:
+ west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
+ request.get_max_lon(), request.get_max_lat()
+ bounding_polygon = shapely.geometry.Polygon(
+ [(west, south), (east, south), (east, north), (west, north), (west, south)])
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as "
+ "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or "
+ "string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or "
+ "string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+ start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+ end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+ return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \
+ spark_master, spark_nexecs, spark_nparts
+
def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True):
shape = (len(results), len(results[0][pivot]))
if shape[0] <= 12:
@@ -146,21 +244,22 @@ def hof_tuple_time(t):
def hof_tuple_combine(t1, t2):
- return (t1[0], # Time
- t1[1], # Sequence (index)
- t1[2], # Coordinate on axis (latitude or longitude)
- t1[3] + t2[3], # Number of values
- t1[4] + t2[4], # Sum of values (weighted for lon-time maps)
- t1[5] + t2[5], # Sum of weights (= # of values for lat-time maps)
- max(t1[6], t2[6]), # Maximum value
- min(t1[7], t2[7]), # Minimum value
- parallel_variance(t1[4]/t1[5], t1[3], t1[8],
- t2[4]/t2[5], t2[3], t2[8])) # Variance
+ return (t1[0], # Time
+ t1[1], # Sequence (index)
+ t1[2], # Coordinate on axis (latitude or longitude)
+ t1[3] + t2[3], # Number of values
+ t1[4] + t2[4], # Sum of values (weighted for lon-time maps)
+ t1[5] + t2[5], # Sum of weights (= # of values for lat-time maps)
+ max(t1[6], t2[6]), # Maximum value
+ min(t1[7], t2[7]), # Minimum value
+ parallel_variance(t1[4] / t1[5], t1[3], t1[8],
+ t2[4] / t2[5], t2[3], t2[8])) # Variance
+
def hof_tuple_to_dict(t, avg_var_name):
return {avg_var_name: t[2],
'cnt': t[3],
- 'avg': t[4] / t[5],
+ 'mean': t[4] / t[5],
'std': np.sqrt(t[8]),
'max': t[6],
'min': t[7]}
@@ -176,7 +275,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
# Longitude-Time Map (Average over latitudes)
avg_var_name = 'longitude'
avg_var_name_collection = 'lons'
-
+
# Create a set of key-value pairs where the key is (time, lat|lon) and
# the value is a tuple of intermediate statistics for the specified
# coordinate within a single NEXUS tile.
@@ -184,7 +283,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
# Combine tuples across tiles with input key = (time, lat|lon)
# Output a key value pair with key = (time)
- results = results.combineByKey(lambda val: (hof_tuple_time(val),val),
+ results = results.combineByKey(lambda val: (hof_tuple_time(val), val),
lambda x, val: (hof_tuple_time(x),
hof_tuple_combine(x[1],
val)),
@@ -194,29 +293,32 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
# Convert the tuples to dictionary entries and combine coordinates
# with the same time stamp. Here we have input key = (time)
- results = results.values().\
- combineByKey(lambda val, avg_var_name=avg_var_name,
- avg_var_name_collection=avg_var_name_collection: {
- 'sequence': val[1],
- 'time': val[0],
- avg_var_name_collection: [
- hof_tuple_to_dict(val, avg_var_name)]},
- lambda x, val, avg_var_name=avg_var_name,
- avg_var_name_collection=avg_var_name_collection: {
- 'sequence': x['sequence'],
- 'time': x['time'],
- avg_var_name_collection: (
- x[avg_var_name_collection] +
- [hof_tuple_to_dict(val, avg_var_name)])},
- lambda x, y,
- avg_var_name_collection=avg_var_name_collection:
- {'sequence': x['sequence'],
- 'time': x['time'],
- avg_var_name_collection: (
- x[avg_var_name_collection] +
- y[avg_var_name_collection])}).\
- values().\
- collect()
+ results = results.values(). \
+ combineByKey(lambda val, avg_var_name=avg_var_name,
+ avg_var_name_collection=avg_var_name_collection: {
+ 'sequence': val[1],
+ 'time': val[0],
+ 'iso_time': datetime.utcfromtimestamp(val[0]).strftime(ISO_8601),
+ avg_var_name_collection: [
+ hof_tuple_to_dict(val, avg_var_name)]},
+ lambda x, val, avg_var_name=avg_var_name,
+ avg_var_name_collection=avg_var_name_collection: {
+ 'sequence': x['sequence'],
+ 'time': x['time'],
+ 'iso_time': x['iso_time'],
+ avg_var_name_collection: (
+ x[avg_var_name_collection] +
+ [hof_tuple_to_dict(val, avg_var_name)])},
+ lambda x, y,
+ avg_var_name_collection=avg_var_name_collection:
+ {'sequence': x['sequence'],
+ 'time': x['time'],
+ 'iso_time': x['iso_time'],
+ avg_var_name_collection: (
+ x[avg_var_name_collection] +
+ y[avg_var_name_collection])}). \
+ values(). \
+ collect()
return results
@@ -226,26 +328,28 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
name = "Latitude/Time HofMoeller Spark"
path = "/latitudeTimeHofMoellerSpark"
description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range"
- params = DEFAULT_PARAMETERS_SPEC
+ params = BaseHoffMoellerHandlerImpl.params
singleton = True
def __init__(self):
- self._latlon = 0 # 0 for latitude-time map, 1 for longitude-time map
+ self._latlon = 0 # 0 for latitude-time map, 1 for longitude-time map
BaseHoffMoellerHandlerImpl.__init__(self)
- def calc(self, computeOptions, **args):
- nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
- computeOptions.get_min_lat(),
- computeOptions.get_max_lat(),
- computeOptions.get_min_lon(),
- computeOptions.get_max_lon())
- for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
+ def calc(self, compute_options, **args):
+ ds, bbox, start_time, end_time, spark_master, spark_nexecs, spark_nparts = self.parse_arguments(compute_options)
+
+ min_lon, min_lat, max_lon, max_lat = bbox.bounds
+
+ nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
+ enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
+ ds, start_time, end_time,
+ fetch_data=False))]
+
print ("Got {} tiles".format(len(nexus_tiles_spark)))
if len(nexus_tiles_spark) == 0:
- raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+ raise NoDataException(reason="No data found for selected timeframe")
- results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
-
+ results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry['time'])
for i in range(len(results)):
@@ -253,7 +357,9 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
key=lambda entry: entry['latitude'])
results = self.applyDeseasonToHofMoeller(results)
- result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
+ result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LATITUDE,
+ minLat=min_lat, maxLat=max_lat, minLon=min_lon,
+ maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
return result
@@ -262,25 +368,28 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
name = "Longitude/Time HofMoeller Spark"
path = "/longitudeTimeHofMoellerSpark"
description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range"
- params = DEFAULT_PARAMETERS_SPEC
+ params = BaseHoffMoellerHandlerImpl.params
singleton = True
def __init__(self):
- self._latlon = 1 # 0 for latitude-time map; 1 for longitude-time map
+ self._latlon = 1 # 0 for latitude-time map; 1 for longitude-time map
BaseHoffMoellerHandlerImpl.__init__(self)
- def calc(self, computeOptions, **args):
- nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
- computeOptions.get_min_lat(),
- computeOptions.get_max_lat(),
- computeOptions.get_min_lon(),
- computeOptions.get_max_lon())
- for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
+ def calc(self, compute_options, **args):
+ ds, bbox, start_time, end_time, spark_master, spark_nexecs, spark_nparts = self.parse_arguments(compute_options)
+
+ min_lon, min_lat, max_lon, max_lat = bbox.bounds
+
+ nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in
+ enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
+ ds, start_time, end_time,
+ fetch_data=False))]
+ print ("Got {} tiles".format(len(nexus_tiles_spark)))
if len(nexus_tiles_spark) == 0:
- raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+ raise NoDataException(reason="No data found for selected timeframe")
- results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
+ results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry["time"])
@@ -290,7 +399,9 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
results = self.applyDeseasonToHofMoeller(results, pivot="lons")
- result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE)
+ result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LONGITUDE,
+ minLat=min_lat, maxLat=max_lat, minLon=min_lon,
+ maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
return result
@@ -298,9 +409,10 @@ class HoffMoellerResults(NexusResults):
LATITUDE = 0
LONGITUDE = 1
- def __init__(self, results=None, meta=None, stats=None, computeOptions=None, **args):
- NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions)
- self.__type = args['type']
+ def __init__(self, results=None, meta=None, stats=None, **kwargs):
+
+ NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=None, **kwargs)
+ self.__type = kwargs['type']
def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'):
cmap = cm.coolwarm
@@ -399,22 +511,3 @@ class HoffMoellerResults(NexusResults):
return self.createLongitudeHoffmueller(res, meta)
else:
raise Exception("Unsupported HoffMoeller Plot Type")
-
-
-def pool_worker(type, work_queue, done_queue):
- try:
-
- if type == LATITUDE:
- calculator = LatitudeHofMoellerCalculator()
- elif type == LONGITUDE:
- calculator = LongitudeHofMoellerCalculator()
-
- for work in iter(work_queue.get, SENTINEL):
- scifunction = work[0]
- args = work[1:]
- result = calculator.__getattribute__(scifunction)(*args)
- done_queue.put(result)
-
- except Exception as e:
- e_str = traceback.format_exc(e)
- done_queue.put({'error': e_str})