You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by nc...@apache.org on 2020/10/13 22:01:18 UTC
[incubator-sdap-nexus] branch master updated: SDAP-258: Use
pre-computed climatologies for deseason algorithm (#109)
This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new cf2149b SDAP-258: Use pre-computed climatologies for deseason algorithm (#109)
cf2149b is described below
commit cf2149b6d88fb521eb473f983b7780979dfa5c2a
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Tue Oct 13 15:01:10 2020 -0700
SDAP-258: Use pre-computed climatologies for deseason algorithm (#109)
---
.../webservice/algorithms_spark/TimeSeriesSpark.py | 72 ++++++++++++++--------
1 file changed, 47 insertions(+), 25 deletions(-)
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 43f7f6d..422fdb8 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -38,6 +38,7 @@ from webservice.webmodel import NexusResults, NoDataException, NexusProcessingEx
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+SECONDS_IN_ONE_YEAR = 31535999
logger = logging.getLogger(__name__)
@@ -117,7 +118,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
except:
try:
west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
- request.get_max_lon(), request.get_max_lat()
+ request.get_max_lon(), request.get_max_lat()
bounding_polygon = shapely.geometry.Polygon(
[(west, south), (east, south), (east, north), (west, north), (west, south)])
except:
@@ -160,7 +161,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
def calc(self, request, **args):
"""
-
+
:param request: StatsComputeOptions
:param args: dict
:return:
@@ -176,13 +177,13 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
the_time = datetime.now()
daysinrange = self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
- bounding_polygon.bounds[3],
- bounding_polygon.bounds[0],
- bounding_polygon.bounds[2],
- shortName,
- start_seconds_from_epoch,
- end_seconds_from_epoch,
- metrics_callback=metrics_record.record_metrics)
+ bounding_polygon.bounds[3],
+ bounding_polygon.bounds[0],
+ bounding_polygon.bounds[2],
+ shortName,
+ start_seconds_from_epoch,
+ end_seconds_from_epoch,
+ metrics_callback=metrics_record.record_metrics)
self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
ndays = len(daysinrange)
@@ -203,16 +204,37 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
if apply_seasonal_cycle_filter:
the_time = datetime.now()
+ # get time series for _clim dataset
+ shortName_clim = shortName + "_clim"
+ daysinrange_clim = self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
+ bounding_polygon.bounds[3],
+ bounding_polygon.bounds[0],
+ bounding_polygon.bounds[2],
+ shortName_clim,
+ 0,
+ SECONDS_IN_ONE_YEAR,
+ metrics_callback=metrics_record.record_metrics)
+ if len(daysinrange_clim) == 0:
+ raise NexusProcessingException(reason="There is no climatology data present for dataset " + shortName + ".")
+ results_clim, _ = spark_driver(daysinrange_clim,
+ bounding_polygon,
+ shortName_clim,
+ self._tile_service_factory,
+ metrics_record.record_metrics,
+ spark_nparts=spark_nparts,
+ sc=self._sc)
+ clim_indexed_by_month = {datetime.utcfromtimestamp(result['time']).month: result for result in results_clim}
+ if len(clim_indexed_by_month) < 12:
+ raise NexusProcessingException(reason="There are only " +
+ len(clim_indexed_by_month) + " months of climatology data for dataset " +
+ shortName + ". A full year of climatology data is required for computing deseasoned timeseries.")
+
for result in results:
month = datetime.utcfromtimestamp(result['time']).month
- month_mean, month_max, month_min = self.calculate_monthly_average(month, bounding_polygon.wkt,
- shortName)
- seasonal_mean = result['mean'] - month_mean
- seasonal_min = result['min'] - month_min
- seasonal_max = result['max'] - month_max
- result['meanSeasonal'] = seasonal_mean
- result['minSeasonal'] = seasonal_min
- result['maxSeasonal'] = seasonal_max
+
+ result['meanSeasonal'] = result['mean'] - clim_indexed_by_month[month]['mean']
+ result['minSeasonal'] = result['min'] - clim_indexed_by_month[month]['min']
+ result['maxSeasonal'] = result['max'] - clim_indexed_by_month[month]['max']
self.log.info(
"Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName))
@@ -288,12 +310,12 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
start = (pytz.UTC.localize(beginning_of_month) - EPOCH).total_seconds()
end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds()
tile_stats = self._get_tile_service().find_tiles_in_polygon(bounding_polygon, ds, start, end,
- fl=('id,'
- 'tile_avg_val_d,tile_count_i,'
- 'tile_min_val_d,tile_max_val_d,'
- 'tile_min_lat,tile_max_lat,'
- 'tile_min_lon,tile_max_lon'),
- fetch_data=False)
+ fl=('id,'
+ 'tile_avg_val_d,tile_count_i,'
+ 'tile_min_val_d,tile_max_val_d,'
+ 'tile_min_lat,tile_max_lat,'
+ 'tile_min_lon,tile_max_lon'),
+ fetch_data=False)
if len(tile_stats) == 0:
continue
@@ -338,8 +360,8 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
weights = np.array(monthly_counts) / count_sum
return np.average(monthly_averages, None, weights).item(), \
- np.average(monthly_averages, None, weights).item(), \
- np.average(monthly_averages, None, weights).item()
+ np.average(monthly_averages, None, weights).item(), \
+ np.average(monthly_averages, None, weights).item()
@lru_cache()
def get_min_max_date(self, ds=None):